# 指标 # 参数示例: -mgdb kazakhstan -dt 20240304 import sys import re import os import requests import json abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.spark.spark_sql import SparkSQL from dw_base.utils.config_utils import parse_args def send_dingtalk_notification(msg): headers = {"Content-Type": "application/json"} data = { "msgtype": "text", "text": {"content": msg} } json_data = json.dumps(data) url = f'http://m1.node.cdh/dingtalk/api/robot/send?access_token=72cbdfb0a30fa51defca1dcba1c7b68feaace79c08e69da8cf9a7ea321481b06' # 下面的url用于测试 # url = f'http://m1.node.cdh/dingtalk/api/robot/send?access_token=89974c66ec5a33c67acd71c0544fe323dd76c5d7a6f0b92acd09175745b737a0' response = requests.post(url=url, data=json_data, headers=headers) response.raise_for_status() def main(): # 解析命令行参数 CONFIG, _ = parse_args(sys.argv[1:]) mgdb = CONFIG.get('mgdb') dt = CONFIG.get('dt') with SparkSQL() as spark: country_im_colm = { 'russia': 'shrmc', 'india': 'jksmc', 'india_exp': 'jksmc', 'vietnam': 'jksmc', 'turkey': 'jksmc', 'kazakhstan': 'jksmc', 'mexico': 'jksmc', 'mexico_bol': 'jksmc' } country_ex_colm = { 'russia': 'fhrmc', 'india': 'cksmc', 'india_exp': 'cksmc', 'vietnam': 'cksmc', 'turkey': 'cksmc', 'kazakhstan': 'cksmc', 'mexico': 'cksmc', 'mexico_bol': 'cksmc' } sql_query1 = (f"select count(1) AS total_tid_count from ( select id " f"from (select jkstid as id " f" from dwd.cts_{mgdb}_im " f" where dt in ('19700101', '20240303') " f" union all " f" select ckstid as id " f" from dwd.cts_{mgdb}_ex " f" where dt in ('19700101', '20240303')) a " f"group by id)b ") res = spark.query(sql_query1)[0].collect() cnt1 = res[0]['total_tid_count'] sql_query2 = (f"select count(1) AS total_tid_count " f"from ( " f"select id " f"from (select id, count(1) " f" from (select jkstid as id, {country_im_colm[mgdb]} as mc " f" from dwd.cts_{mgdb}_im " f" where dt in ('19700101', '20240303') " f" union all " f" select ckstid as id, {country_ex_colm[mgdb]} as mc " f" from dwd.cts_{mgdb}_ex " f" where dt in ('19700101', '20240303')) a " f" group by id, mc) b " f"group by id " f"having count(1) = 1)b ") res = spark.query(sql_query2)[0].collect() cnt2 = res[0]['total_tid_count'] sql_query3 = (f"select count(1) AS total_tid_count " f"from ( " f"select id " f"from (select id " f" from (select jkstid as id, {country_im_colm[mgdb]} as mc " f" from dwd.cts_{mgdb}_im " f" where dt in ('19700101', '20240303') " f" union all " f" select ckstid as id, {country_ex_colm[mgdb]} as mc " f" from dwd.cts_{mgdb}_ex " f" where dt in ('19700101', '20240303')) a " f" group by id, mc) b " f"group by id " f"having count(1) > 1)b ") res = spark.query(sql_query3)[0].collect() cnt3 = res[0]['total_tid_count'] sql_query4 = (f"select (select count(1) " f" from dwd.cts_{mgdb}_im " f" where dt in ('19700101', '20240303')) + " f" (select count(1) " f" from dwd.cts_{mgdb}_ex " f" where dt in ('19700101', '20240303')) as total_tid_count ") res = spark.query(sql_query4)[0].collect() cnt4 = res[0]['total_tid_count'] sql_query5 = (f"select count(1) AS total_tid_count " f"from (select jkstid as id " f" from dwd.cts_{mgdb}_im " f" where dt in ('19700101', '20240303') " f" union all " f" select ckstid as id " f" from dwd.cts_{mgdb}_ex " f" where dt in ('19700101', '20240303'))c " f"where id in (select id " f" from (select id " f" from (select jkstid as id, {country_im_colm[mgdb]} as mc " f" from dwd.cts_{mgdb}_im " f" where dt in ('19700101', '20240303') " f" union all " f" select ckstid as id, {country_ex_colm[mgdb]} as mc " f" from dwd.cts_{mgdb}_ex " f" where dt in ('19700101', '20240303')) a " f" group by id, mc) b " f" group by id " f" having count(1) = 1) ") res = spark.query(sql_query5)[0].collect() cnt5 = res[0]['total_tid_count'] sql_query6 = (f"select count(1) AS total_tid_count " f"from (select jkstid as id " f" from dwd.cts_{mgdb}_im " f" where dt in ('19700101', '20240303') " f" union all " f" select ckstid as id " f" from dwd.cts_{mgdb}_ex " f" where dt in ('19700101', '20240303'))c " f"where id in (select id " f" from (select id " f" from (select jkstid as id, {country_im_colm[mgdb]} as mc " f" from dwd.cts_{mgdb}_im " f" where dt in ('19700101', '20240303') " f" union all " f" select ckstid as id, {country_ex_colm[mgdb]} as mc " f" from dwd.cts_{mgdb}_ex " f" where dt in ('19700101', '20240303')) a " f" group by id, mc) b " f" group by id " f" having count(1) > 1) ") res = spark.query(sql_query6)[0].collect() cnt6 = res[0]['total_tid_count'] sql_query7 = (f"select count(1) AS total_tid_count " f"from (select jkstid as id " f" from dwd.cts_{mgdb}_im " f" where dt in ('19700101', '20240303') " f" union all " f" select ckstid as id " f" from dwd.cts_{mgdb}_ex " f" where dt in ('19700101', '20240303'))c " f"where id is null ") res = spark.query(sql_query7)[0].collect() cnt7 = res[0]['total_tid_count'] msg = (f"{mgdb}数据量指标 \n" f"-----------------------------------\n" f"{mgdb}进出口统计:\n\n" f"总tid数量:\t\t\t{cnt1}\n" f"一对一的tid数量:\t\t{cnt2}\n" f"一对多的tid数量:\t\t{cnt3}\n\n" f"详单总数据量:\t\t{cnt4}\n" f"一对一的tid的详单数量:\t{cnt5}\n" f"一对多的tid的详单数量:\t{cnt6}\n" f"tid为空的详单数量:\t\t{cnt7}\n" f" \n" ) send_dingtalk_notification(msg) if __name__ == '__main__': main()