import sys import re from dw_base.spark.spark_sql import SparkSQL from dw_base.utils.config_utils import parse_args # md5值相同的情况下去判断 # 1、核心字段缺失,认定不重复,生成新的 MD5值 # 2、核心字段不缺失,满足量价信息组2个条件满足,认定重复,共享原始md5值,否则生成新的MD5值 # -tbl dim.cts_trade_distinct_end def get_md5_sql(tbl): sql = f'SHOW CREATE TABLE {tbl}' spark = SparkSQL() spark._final_spark_config = {'hive.exec.dynamic.partition': 'true', 'hive.exec.dynamic.partition.mode': 'nonstrict', 'spark.yarn.queue': 'cts', 'spark.sql.crossJoin.enabled': 'true', 'spark.executor.memory': '6g', 'spark.executor.memoryOverhead': '2048', 'spark.driver.memory': '4g', 'spark.executor.instances': "5", 'spark.executor.cores': '2' } ctbl = spark.query(sql)[0].collect()[0]['createtab_stmt'] cols_list = re.findall(r'(`[^`]+`) ', ctbl) suff = 'i.' modified_list = [suff + s for s in cols_list] remove_id_sql = 'md5(concat_ws(\'-\',\n {}) ) '.format( ', '.join([f'nvl(cast({col} as string), "null")' for col in modified_list if col != "i.`id`"])) contain_id_sql = 'md5(concat_ws(\'-\',\n {}) ) '.format( ', '.join([f'nvl(cast({col} as string), "null")' for col in modified_list])) sel_end = ( f"if((check_core_fields(i.`date`, array(i.jksmc, i.cksmc), array(i.cpms, i.hgbm)) = 1 AND" f" check_non_core_fields(i.`myzj`, i.`zl`, i.`sl`) = 0) OR " f"check_core_fields(i.`date`, array(i.jksmc, i.cksmc), array(i.cpms, i.hgbm)) = 0," f"{contain_id_sql} ,{remove_id_sql} ) as md5") return sel_end def check_core_fields(date: str, company_names: list, products: list): """ 检查核心字段 date,进口商名称或出口商名称,产品描述或海关编码 Args: total_dollars: weights: quantities: Returns: """ if date is None or str(date).strip() == '': return 0 if not any(name is not None and str(name).strip() != '' for name in company_names): return 0 if not any(product is not None and str(product).strip() != '' for product in products): return 0 return 1 def check_non_core_fields(total_dollars: str, weights: str, quantities: str): """ 检查非核心字段 Args: total_dollars: weights: quantities: Returns: """ non_empty_count = 0 if total_dollars is not None and str(total_dollars).strip() != '': non_empty_count += 1 if weights is not None and str(weights).strip() != '': non_empty_count += 1 if quantities is not None and str(quantities).strip() != '': non_empty_count += 1 return 1 if non_empty_count >= 2 else 0