| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- # 指标
- # 参数示例: -mgdb kazakhstan -dt 20240304
- import sys
- import re
- import os
- import requests
- import json
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from dw_base.spark.spark_sql import SparkSQL
- from dw_base.utils.config_utils import parse_args
- def send_dingtalk_notification(msg):
- headers = {"Content-Type": "application/json"}
- data = {
- "msgtype": "text",
- "text": {"content": msg}
- }
- json_data = json.dumps(data)
- url = f'http://m1.node.cdh/dingtalk/api/robot/send?access_token=72cbdfb0a30fa51defca1dcba1c7b68feaace79c08e69da8cf9a7ea321481b06'
- # 下面的url用于测试
- # url = f'http://m1.node.cdh/dingtalk/api/robot/send?access_token=89974c66ec5a33c67acd71c0544fe323dd76c5d7a6f0b92acd09175745b737a0'
- response = requests.post(url=url, data=json_data, headers=headers)
- response.raise_for_status()
- def main():
- # 解析命令行参数
- CONFIG, _ = parse_args(sys.argv[1:])
- mgdb = CONFIG.get('mgdb')
- dt = CONFIG.get('dt')
- with SparkSQL() as spark:
- country_im_colm = {
- 'russia': 'shrmc',
- 'india': 'jksmc',
- 'india_exp': 'jksmc',
- 'vietnam': 'jksmc',
- 'turkey': 'jksmc',
- 'kazakhstan': 'jksmc',
- 'mexico': 'jksmc',
- 'mexico_bol': 'jksmc'
- }
- country_ex_colm = {
- 'russia': 'fhrmc',
- 'india': 'cksmc',
- 'india_exp': 'cksmc',
- 'vietnam': 'cksmc',
- 'turkey': 'cksmc',
- 'kazakhstan': 'cksmc',
- 'mexico': 'cksmc',
- 'mexico_bol': 'cksmc'
- }
- sql_query1 = (f"select count(1) AS total_tid_count from ( select id "
- f"from (select jkstid as id "
- f" from dwd.cts_{mgdb}_im "
- f" where dt in ('19700101', '20240303') "
- f" union all "
- f" select ckstid as id "
- f" from dwd.cts_{mgdb}_ex "
- f" where dt in ('19700101', '20240303')) a "
- f"group by id)b ")
- res = spark.query(sql_query1)[0].collect()
- cnt1 = res[0]['total_tid_count']
- sql_query2 = (f"select count(1) AS total_tid_count "
- f"from ( "
- f"select id "
- f"from (select id, count(1) "
- f" from (select jkstid as id, {country_im_colm[mgdb]} as mc "
- f" from dwd.cts_{mgdb}_im "
- f" where dt in ('19700101', '20240303') "
- f" union all "
- f" select ckstid as id, {country_ex_colm[mgdb]} as mc "
- f" from dwd.cts_{mgdb}_ex "
- f" where dt in ('19700101', '20240303')) a "
- f" group by id, mc) b "
- f"group by id "
- f"having count(1) = 1)b ")
- res = spark.query(sql_query2)[0].collect()
- cnt2 = res[0]['total_tid_count']
- sql_query3 = (f"select count(1) AS total_tid_count "
- f"from ( "
- f"select id "
- f"from (select id "
- f" from (select jkstid as id, {country_im_colm[mgdb]} as mc "
- f" from dwd.cts_{mgdb}_im "
- f" where dt in ('19700101', '20240303') "
- f" union all "
- f" select ckstid as id, {country_ex_colm[mgdb]} as mc "
- f" from dwd.cts_{mgdb}_ex "
- f" where dt in ('19700101', '20240303')) a "
- f" group by id, mc) b "
- f"group by id "
- f"having count(1) > 1)b ")
- res = spark.query(sql_query3)[0].collect()
- cnt3 = res[0]['total_tid_count']
- sql_query4 = (f"select (select count(1) "
- f" from dwd.cts_{mgdb}_im "
- f" where dt in ('19700101', '20240303')) + "
- f" (select count(1) "
- f" from dwd.cts_{mgdb}_ex "
- f" where dt in ('19700101', '20240303')) as total_tid_count ")
- res = spark.query(sql_query4)[0].collect()
- cnt4 = res[0]['total_tid_count']
- sql_query5 = (f"select count(1) AS total_tid_count "
- f"from (select jkstid as id "
- f" from dwd.cts_{mgdb}_im "
- f" where dt in ('19700101', '20240303') "
- f" union all "
- f" select ckstid as id "
- f" from dwd.cts_{mgdb}_ex "
- f" where dt in ('19700101', '20240303'))c "
- f"where id in (select id "
- f" from (select id "
- f" from (select jkstid as id, {country_im_colm[mgdb]} as mc "
- f" from dwd.cts_{mgdb}_im "
- f" where dt in ('19700101', '20240303') "
- f" union all "
- f" select ckstid as id, {country_ex_colm[mgdb]} as mc "
- f" from dwd.cts_{mgdb}_ex "
- f" where dt in ('19700101', '20240303')) a "
- f" group by id, mc) b "
- f" group by id "
- f" having count(1) = 1) ")
- res = spark.query(sql_query5)[0].collect()
- cnt5 = res[0]['total_tid_count']
- sql_query6 = (f"select count(1) AS total_tid_count "
- f"from (select jkstid as id "
- f" from dwd.cts_{mgdb}_im "
- f" where dt in ('19700101', '20240303') "
- f" union all "
- f" select ckstid as id "
- f" from dwd.cts_{mgdb}_ex "
- f" where dt in ('19700101', '20240303'))c "
- f"where id in (select id "
- f" from (select id "
- f" from (select jkstid as id, {country_im_colm[mgdb]} as mc "
- f" from dwd.cts_{mgdb}_im "
- f" where dt in ('19700101', '20240303') "
- f" union all "
- f" select ckstid as id, {country_ex_colm[mgdb]} as mc "
- f" from dwd.cts_{mgdb}_ex "
- f" where dt in ('19700101', '20240303')) a "
- f" group by id, mc) b "
- f" group by id "
- f" having count(1) > 1) ")
- res = spark.query(sql_query6)[0].collect()
- cnt6 = res[0]['total_tid_count']
- sql_query7 = (f"select count(1) AS total_tid_count "
- f"from (select jkstid as id "
- f" from dwd.cts_{mgdb}_im "
- f" where dt in ('19700101', '20240303') "
- f" union all "
- f" select ckstid as id "
- f" from dwd.cts_{mgdb}_ex "
- f" where dt in ('19700101', '20240303'))c "
- f"where id is null ")
- res = spark.query(sql_query7)[0].collect()
- cnt7 = res[0]['total_tid_count']
- msg = (f"{mgdb}数据量指标 \n"
- f"-----------------------------------\n"
- f"{mgdb}进出口统计:\n\n"
- f"总tid数量:\t\t\t{cnt1}\n"
- f"一对一的tid数量:\t\t{cnt2}\n"
- f"一对多的tid数量:\t\t{cnt3}\n\n"
- f"详单总数据量:\t\t{cnt4}\n"
- f"一对一的tid的详单数量:\t{cnt5}\n"
- f"一对多的tid的详单数量:\t{cnt6}\n"
- f"tid为空的详单数量:\t\t{cnt7}\n"
- f" \n"
- )
- send_dingtalk_notification(msg)
- if __name__ == '__main__':
- main()
|