| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- # 用来解析字段
- #
- # -tbl cts_turkey_im
- import sys
- import re
- import os
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from dw_base.spark.spark_sql import SparkSQL
- from dw_base.utils.log_utils import pretty_print
- from datetime import time
- from dw_base import *
- from dw_base.utils.config_utils import parse_args
- NORM_RED: str = '\033[0;31m'
- NORM_GRN: str = '\033[0;32m'
- NORM_YEL: str = '\033[0;33m'
- NORM_BLU: str = '\033[0;34m'
- NORM_MGT: str = '\033[0;35m'
- NORM_CYN: str = '\033[0;36m'
- def get_cols_with_type(dt: str):
- global_output = {}
- spark = SparkSQL()
- spark._final_spark_config = {'hive.exec.dynamic.partition': 'true',
- 'hive.exec.dynamic.partition.mode': 'nonstrict',
- 'spark.yarn.queue': 'cts',
- 'spark.sql.crossJoin.enabled': 'true',
- 'spark.executor.memory': '6g',
- 'spark.executor.memoryOverhead': '2024',
- 'spark.driver.memory': '4g',
- 'spark.executor.instances': "10",
- 'spark.executor.cores': '4'
- }
- # sql = (f"select concat('cts_',mgdb,'_',catalog) as `tbl` from task.mg_count_monitor "
- # f"where is_deleted = '0'")
- # tbls = spark.query(sql)[0].collect()
- # for tbl2 in tbls:
- # tbl = tbl2['tbl']
- tbls = [
- 'cts_argentina_im',
- ]
- for tbl in tbls:
- sql = f'SHOW CREATE TABLE from_mongo.{tbl}_incr'
- if tbl == "cts_sao_tome_and_principe_im":
- sql = f"SHOW CREATE TABLE from_mongo.cts_stp_im_incr"
- if tbl == "cts_sao_tome_and_principe_ex":
- sql = f"SHOW CREATE TABLE from_mongo.cts_stp_ex_incr"
- ctbl = spark.query(sql)[0].collect()[0]['createtab_stmt']
- cols_with_type = re.findall(r'`([^`]+)` ([A-Z]+)', ctbl)
- if cols_with_type[0][0] != 'id':
- exit('请检查表结构,id列必须是第一个字段')
- fields = {tup[0] for tup in cols_with_type if tup[0] != 'id'}
- # 对每个字段名加上反引号
- quoted_fields = [f"`{field}`" for field in fields]
- group_str = ', '.join(quoted_fields)
- query0 = (f" select count(1) as `all_cnt`,'all_cnt2' from dwd.{tbl} where dt in('19700101','{dt}') "
- f"union all select count(1) as `unique_cnt`,'dwd.{tbl}' as `tbl` "
- f"from ( select count(1) as `cnt` FROM (select {group_str} FROM dwd.{tbl} where dt in('19700101','{dt}')) t2 "
- f"GROUP BY {group_str} ) b ")
- cnt = spark.query(query0)[0].collect()
- all_cnt = cnt[0].all_cnt
- unique_cnt = cnt[1].all_cnt
- duplicate_cnt = all_cnt - unique_cnt
- #
- # print(all_cnt )
- # print( unique_cnt)
- # print(unique_cnt/all_cnt)
- cts_dict = {
- "country": tbl,
- "all_cnt": all_cnt,
- "group_cnt": unique_cnt,
- "duplicate_cnt": duplicate_cnt,
- "duplicate_ratio": (all_cnt - unique_cnt) / all_cnt
- }
- duplicate_ratio = (all_cnt - unique_cnt) / all_cnt
- # queryend = (f" insert into table tmp.cts_distinct"
- # f" values('{tbl}','{all_cnt}','{unique_cnt}','{duplicate_cnt}','{duplicate_ratio}') ")
- # spark.query(queryend)[0].collect()
- # print(cts_dict)
- global_output[tbl] = cts_dict
- pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
- f'{NORM_MGT} 大数据dwd表名: {NORM_GRN} {tbl} '
- f'{NORM_MGT} 本表的指标 {cts_dict}'
- )
- pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
- f'{NORM_MGT} 大数据dwd表名: {NORM_GRN} {tbl} '
- f'{NORM_MGT} 这是最终的 {global_output}'
- )
- if __name__ == '__main__':
- get_cols_with_type('20240812')
|