# 用来解析字段 # # -tbl cts_turkey_im import sys import re import os abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.spark.spark_sql import SparkSQL from dw_base.utils.log_utils import pretty_print from datetime import time from dw_base import * from dw_base.utils.config_utils import parse_args NORM_RED: str = '\033[0;31m' NORM_GRN: str = '\033[0;32m' NORM_YEL: str = '\033[0;33m' NORM_BLU: str = '\033[0;34m' NORM_MGT: str = '\033[0;35m' NORM_CYN: str = '\033[0;36m' def get_cols_with_type(dt: str): global_output = {} spark = SparkSQL() spark._final_spark_config = {'hive.exec.dynamic.partition': 'true', 'hive.exec.dynamic.partition.mode': 'nonstrict', 'spark.yarn.queue': 'cts', 'spark.sql.crossJoin.enabled': 'true', 'spark.executor.memory': '6g', 'spark.executor.memoryOverhead': '2024', 'spark.driver.memory': '4g', 'spark.executor.instances': "10", 'spark.executor.cores': '4' } # sql = (f"select concat('cts_',mgdb,'_',catalog) as `tbl` from task.mg_count_monitor " # f"where is_deleted = '0'") # tbls = spark.query(sql)[0].collect() # for tbl2 in tbls: # tbl = tbl2['tbl'] tbls = [ 'cts_argentina_im', ] for tbl in tbls: sql = f'SHOW CREATE TABLE from_mongo.{tbl}_incr' if tbl == "cts_sao_tome_and_principe_im": sql = f"SHOW CREATE TABLE from_mongo.cts_stp_im_incr" if tbl == "cts_sao_tome_and_principe_ex": sql = f"SHOW CREATE TABLE from_mongo.cts_stp_ex_incr" ctbl = spark.query(sql)[0].collect()[0]['createtab_stmt'] cols_with_type = re.findall(r'`([^`]+)` ([A-Z]+)', ctbl) if cols_with_type[0][0] != 'id': exit('请检查表结构,id列必须是第一个字段') fields = {tup[0] for tup in cols_with_type if tup[0] != 'id'} # 对每个字段名加上反引号 quoted_fields = [f"`{field}`" for field in fields] group_str = ', '.join(quoted_fields) query0 = (f" select count(1) as `all_cnt`,'all_cnt2' from dwd.{tbl} where dt in('19700101','{dt}') " f"union all select count(1) as `unique_cnt`,'dwd.{tbl}' as `tbl` " f"from ( select count(1) as `cnt` FROM (select {group_str} FROM dwd.{tbl} where dt in('19700101','{dt}')) t2 " f"GROUP BY {group_str} ) b ") cnt = spark.query(query0)[0].collect() all_cnt = cnt[0].all_cnt unique_cnt = cnt[1].all_cnt duplicate_cnt = all_cnt - unique_cnt # # print(all_cnt ) # print( unique_cnt) # print(unique_cnt/all_cnt) cts_dict = { "country": tbl, "all_cnt": all_cnt, "group_cnt": unique_cnt, "duplicate_cnt": duplicate_cnt, "duplicate_ratio": (all_cnt - unique_cnt) / all_cnt } duplicate_ratio = (all_cnt - unique_cnt) / all_cnt # queryend = (f" insert into table tmp.cts_distinct" # f" values('{tbl}','{all_cnt}','{unique_cnt}','{duplicate_cnt}','{duplicate_ratio}') ") # spark.query(queryend)[0].collect() # print(cts_dict) global_output[tbl] = cts_dict pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ' f'{NORM_MGT} 大数据dwd表名: {NORM_GRN} {tbl} ' f'{NORM_MGT} 本表的指标 {cts_dict}' ) pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ' f'{NORM_MGT} 大数据dwd表名: {NORM_GRN} {tbl} ' f'{NORM_MGT} 这是最终的 {global_output}' ) if __name__ == '__main__': get_cols_with_type('20240812')