calculation_repetition_trade.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import sys
  2. import re
  3. from dw_base.spark.spark_sql import SparkSQL
  4. from dw_base.utils.config_utils import parse_args
  5. # md5值相同的情况下去判断
  6. # 1、核心字段缺失,认定不重复,生成新的 MD5值
  7. # 2、核心字段不缺失,满足量价信息组2个条件满足,认定重复,共享原始md5值,否则生成新的MD5值
  8. # -tbl dim.cts_trade_distinct_end
  9. def get_md5_sql(tbl):
  10. sql = f'SHOW CREATE TABLE {tbl}'
  11. spark = SparkSQL()
  12. spark._final_spark_config = {'hive.exec.dynamic.partition': 'true',
  13. 'hive.exec.dynamic.partition.mode': 'nonstrict',
  14. 'spark.yarn.queue': 'cts',
  15. 'spark.sql.crossJoin.enabled': 'true',
  16. 'spark.executor.memory': '6g',
  17. 'spark.executor.memoryOverhead': '2048',
  18. 'spark.driver.memory': '4g',
  19. 'spark.executor.instances': "5",
  20. 'spark.executor.cores': '2'
  21. }
  22. ctbl = spark.query(sql)[0].collect()[0]['createtab_stmt']
  23. cols_list = re.findall(r'(`[^`]+`) ', ctbl)
  24. suff = 'i.'
  25. modified_list = [suff + s for s in cols_list]
  26. remove_id_sql = 'md5(concat_ws(\'-\',\n {}) ) '.format(
  27. ', '.join([f'nvl(cast({col} as string), "null")' for col in modified_list if col != "i.`id`"]))
  28. contain_id_sql = 'md5(concat_ws(\'-\',\n {}) ) '.format(
  29. ', '.join([f'nvl(cast({col} as string), "null")' for col in modified_list]))
  30. sel_end = (
  31. f"if((check_core_fields(i.`date`, array(i.jksmc, i.cksmc), array(i.cpms, i.hgbm)) = 1 AND"
  32. f" check_non_core_fields(i.`myzj`, i.`zl`, i.`sl`) = 0) OR "
  33. f"check_core_fields(i.`date`, array(i.jksmc, i.cksmc), array(i.cpms, i.hgbm)) = 0,"
  34. f"{contain_id_sql} ,{remove_id_sql} ) as md5")
  35. return sel_end
  36. def check_core_fields(date: str, company_names: list, products: list):
  37. """
  38. 检查核心字段 date,进口商名称或出口商名称,产品描述或海关编码
  39. Args:
  40. total_dollars:
  41. weights:
  42. quantities:
  43. Returns:
  44. """
  45. if date is None or str(date).strip() == '':
  46. return 0
  47. if not any(name is not None and str(name).strip() != '' for name in company_names):
  48. return 0
  49. if not any(product is not None and str(product).strip() != '' for product in products):
  50. return 0
  51. return 1
  52. def check_non_core_fields(total_dollars: str, weights: str, quantities: str):
  53. """
  54. 检查非核心字段
  55. Args:
  56. total_dollars:
  57. weights:
  58. quantities:
  59. Returns:
  60. """
  61. non_empty_count = 0
  62. if total_dollars is not None and str(total_dollars).strip() != '':
  63. non_empty_count += 1
  64. if weights is not None and str(weights).strip() != '':
  65. non_empty_count += 1
  66. if quantities is not None and str(quantities).strip() != '':
  67. non_empty_count += 1
  68. return 1 if non_empty_count >= 2 else 0