data_distinct.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. # 用来解析字段
  2. #
  3. # -tbl cts_turkey_im
  4. import sys
  5. import re
  6. import os
  7. abspath = os.path.abspath(__file__)
  8. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  9. sys.path.append(root_path)
  10. from dw_base.spark.spark_sql import SparkSQL
  11. from dw_base.utils.log_utils import pretty_print
  12. from datetime import time
  13. from dw_base import *
  14. from dw_base.utils.config_utils import parse_args
  15. NORM_RED: str = '\033[0;31m'
  16. NORM_GRN: str = '\033[0;32m'
  17. NORM_YEL: str = '\033[0;33m'
  18. NORM_BLU: str = '\033[0;34m'
  19. NORM_MGT: str = '\033[0;35m'
  20. NORM_CYN: str = '\033[0;36m'
  21. def get_cols_with_type(dt: str):
  22. global_output = {}
  23. spark = SparkSQL()
  24. spark._final_spark_config = {'hive.exec.dynamic.partition': 'true',
  25. 'hive.exec.dynamic.partition.mode': 'nonstrict',
  26. 'spark.yarn.queue': 'cts',
  27. 'spark.sql.crossJoin.enabled': 'true',
  28. 'spark.executor.memory': '6g',
  29. 'spark.executor.memoryOverhead': '2024',
  30. 'spark.driver.memory': '4g',
  31. 'spark.executor.instances': "10",
  32. 'spark.executor.cores': '4'
  33. }
  34. # sql = (f"select concat('cts_',mgdb,'_',catalog) as `tbl` from task.mg_count_monitor "
  35. # f"where is_deleted = '0'")
  36. # tbls = spark.query(sql)[0].collect()
  37. # for tbl2 in tbls:
  38. # tbl = tbl2['tbl']
  39. tbls = [
  40. 'cts_argentina_im',
  41. ]
  42. for tbl in tbls:
  43. sql = f'SHOW CREATE TABLE from_mongo.{tbl}_incr'
  44. if tbl == "cts_sao_tome_and_principe_im":
  45. sql = f"SHOW CREATE TABLE from_mongo.cts_stp_im_incr"
  46. if tbl == "cts_sao_tome_and_principe_ex":
  47. sql = f"SHOW CREATE TABLE from_mongo.cts_stp_ex_incr"
  48. ctbl = spark.query(sql)[0].collect()[0]['createtab_stmt']
  49. cols_with_type = re.findall(r'`([^`]+)` ([A-Z]+)', ctbl)
  50. if cols_with_type[0][0] != 'id':
  51. exit('请检查表结构,id列必须是第一个字段')
  52. fields = {tup[0] for tup in cols_with_type if tup[0] != 'id'}
  53. # 对每个字段名加上反引号
  54. quoted_fields = [f"`{field}`" for field in fields]
  55. group_str = ', '.join(quoted_fields)
  56. query0 = (f" select count(1) as `all_cnt`,'all_cnt2' from dwd.{tbl} where dt in('19700101','{dt}') "
  57. f"union all select count(1) as `unique_cnt`,'dwd.{tbl}' as `tbl` "
  58. f"from ( select count(1) as `cnt` FROM (select {group_str} FROM dwd.{tbl} where dt in('19700101','{dt}')) t2 "
  59. f"GROUP BY {group_str} ) b ")
  60. cnt = spark.query(query0)[0].collect()
  61. all_cnt = cnt[0].all_cnt
  62. unique_cnt = cnt[1].all_cnt
  63. duplicate_cnt = all_cnt - unique_cnt
  64. #
  65. # print(all_cnt )
  66. # print( unique_cnt)
  67. # print(unique_cnt/all_cnt)
  68. cts_dict = {
  69. "country": tbl,
  70. "all_cnt": all_cnt,
  71. "group_cnt": unique_cnt,
  72. "duplicate_cnt": duplicate_cnt,
  73. "duplicate_ratio": (all_cnt - unique_cnt) / all_cnt
  74. }
  75. duplicate_ratio = (all_cnt - unique_cnt) / all_cnt
  76. # queryend = (f" insert into table tmp.cts_distinct"
  77. # f" values('{tbl}','{all_cnt}','{unique_cnt}','{duplicate_cnt}','{duplicate_ratio}') ")
  78. # spark.query(queryend)[0].collect()
  79. # print(cts_dict)
  80. global_output[tbl] = cts_dict
  81. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  82. f'{NORM_MGT} 大数据dwd表名: {NORM_GRN} {tbl} '
  83. f'{NORM_MGT} 本表的指标 {cts_dict}'
  84. )
  85. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  86. f'{NORM_MGT} 大数据dwd表名: {NORM_GRN} {tbl} '
  87. f'{NORM_MGT} 这是最终的 {global_output}'
  88. )
  89. if __name__ == '__main__':
  90. get_cols_with_type('20240812')