| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- import sys
- import re
- from dw_base.spark.spark_sql import SparkSQL
- from dw_base.utils.config_utils import parse_args
- # md5值相同的情况下去判断
- # 1、核心字段缺失,认定不重复,生成新的 MD5值
- # 2、核心字段不缺失,满足量价信息组2个条件满足,认定重复,共享原始md5值,否则生成新的MD5值
- # -tbl dim.cts_trade_distinct_end
- def get_md5_sql(tbl):
- sql = f'SHOW CREATE TABLE {tbl}'
- spark = SparkSQL()
- spark._final_spark_config = {'hive.exec.dynamic.partition': 'true',
- 'hive.exec.dynamic.partition.mode': 'nonstrict',
- 'spark.yarn.queue': 'cts',
- 'spark.sql.crossJoin.enabled': 'true',
- 'spark.executor.memory': '6g',
- 'spark.executor.memoryOverhead': '2048',
- 'spark.driver.memory': '4g',
- 'spark.executor.instances': "5",
- 'spark.executor.cores': '2'
- }
- ctbl = spark.query(sql)[0].collect()[0]['createtab_stmt']
- cols_list = re.findall(r'(`[^`]+`) ', ctbl)
- suff = 'i.'
- modified_list = [suff + s for s in cols_list]
- remove_id_sql = 'md5(concat_ws(\'-\',\n {}) ) '.format(
- ', '.join([f'nvl(cast({col} as string), "null")' for col in modified_list if col != "i.`id`"]))
- contain_id_sql = 'md5(concat_ws(\'-\',\n {}) ) '.format(
- ', '.join([f'nvl(cast({col} as string), "null")' for col in modified_list]))
- sel_end = (
- f"if((check_core_fields(i.`date`, array(i.jksmc, i.cksmc), array(i.cpms, i.hgbm)) = 1 AND"
- f" check_non_core_fields(i.`myzj`, i.`zl`, i.`sl`) = 0) OR "
- f"check_core_fields(i.`date`, array(i.jksmc, i.cksmc), array(i.cpms, i.hgbm)) = 0,"
- f"{contain_id_sql} ,{remove_id_sql} ) as md5")
- return sel_end
- def check_core_fields(date: str, company_names: list, products: list):
- """
- 检查核心字段 date,进口商名称或出口商名称,产品描述或海关编码
- Args:
- total_dollars:
- weights:
- quantities:
- Returns:
- """
- if date is None or str(date).strip() == '':
- return 0
- if not any(name is not None and str(name).strip() != '' for name in company_names):
- return 0
- if not any(product is not None and str(product).strip() != '' for product in products):
- return 0
- return 1
- def check_non_core_fields(total_dollars: str, weights: str, quantities: str):
- """
- 检查非核心字段
- Args:
- total_dollars:
- weights:
- quantities:
- Returns:
- """
- non_empty_count = 0
- if total_dollars is not None and str(total_dollars).strip() != '':
- non_empty_count += 1
- if weights is not None and str(weights).strip() != '':
- non_empty_count += 1
- if quantities is not None and str(quantities).strip() != '':
- non_empty_count += 1
- return 1 if non_empty_count >= 2 else 0
|