country_count_dingtalk.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. # 指标
  2. # 参数示例: -mgdb kazakhstan -dt 20240304
  3. import sys
  4. import re
  5. import os
  6. import requests
  7. import json
  8. abspath = os.path.abspath(__file__)
  9. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  10. sys.path.append(root_path)
  11. from dw_base.spark.spark_sql import SparkSQL
  12. from dw_base.utils.config_utils import parse_args
  13. def send_dingtalk_notification(msg):
  14. headers = {"Content-Type": "application/json"}
  15. data = {
  16. "msgtype": "text",
  17. "text": {"content": msg}
  18. }
  19. json_data = json.dumps(data)
  20. url = f'http://m1.node.cdh/dingtalk/api/robot/send?access_token=72cbdfb0a30fa51defca1dcba1c7b68feaace79c08e69da8cf9a7ea321481b06'
  21. # 下面的url用于测试
  22. # url = f'http://m1.node.cdh/dingtalk/api/robot/send?access_token=89974c66ec5a33c67acd71c0544fe323dd76c5d7a6f0b92acd09175745b737a0'
  23. response = requests.post(url=url, data=json_data, headers=headers)
  24. response.raise_for_status()
  25. def main():
  26. # 解析命令行参数
  27. CONFIG, _ = parse_args(sys.argv[1:])
  28. mgdb = CONFIG.get('mgdb')
  29. dt = CONFIG.get('dt')
  30. with SparkSQL() as spark:
  31. country_im_colm = {
  32. 'russia': 'shrmc',
  33. 'india': 'jksmc',
  34. 'india_exp': 'jksmc',
  35. 'vietnam': 'jksmc',
  36. 'turkey': 'jksmc',
  37. 'kazakhstan': 'jksmc',
  38. 'mexico': 'jksmc',
  39. 'mexico_bol': 'jksmc'
  40. }
  41. country_ex_colm = {
  42. 'russia': 'fhrmc',
  43. 'india': 'cksmc',
  44. 'india_exp': 'cksmc',
  45. 'vietnam': 'cksmc',
  46. 'turkey': 'cksmc',
  47. 'kazakhstan': 'cksmc',
  48. 'mexico': 'cksmc',
  49. 'mexico_bol': 'cksmc'
  50. }
  51. sql_query1 = (f"select count(1) AS total_tid_count from ( select id "
  52. f"from (select jkstid as id "
  53. f" from dwd.cts_{mgdb}_im "
  54. f" where dt in ('19700101', '20240303') "
  55. f" union all "
  56. f" select ckstid as id "
  57. f" from dwd.cts_{mgdb}_ex "
  58. f" where dt in ('19700101', '20240303')) a "
  59. f"group by id)b ")
  60. res = spark.query(sql_query1)[0].collect()
  61. cnt1 = res[0]['total_tid_count']
  62. sql_query2 = (f"select count(1) AS total_tid_count "
  63. f"from ( "
  64. f"select id "
  65. f"from (select id, count(1) "
  66. f" from (select jkstid as id, {country_im_colm[mgdb]} as mc "
  67. f" from dwd.cts_{mgdb}_im "
  68. f" where dt in ('19700101', '20240303') "
  69. f" union all "
  70. f" select ckstid as id, {country_ex_colm[mgdb]} as mc "
  71. f" from dwd.cts_{mgdb}_ex "
  72. f" where dt in ('19700101', '20240303')) a "
  73. f" group by id, mc) b "
  74. f"group by id "
  75. f"having count(1) = 1)b ")
  76. res = spark.query(sql_query2)[0].collect()
  77. cnt2 = res[0]['total_tid_count']
  78. sql_query3 = (f"select count(1) AS total_tid_count "
  79. f"from ( "
  80. f"select id "
  81. f"from (select id "
  82. f" from (select jkstid as id, {country_im_colm[mgdb]} as mc "
  83. f" from dwd.cts_{mgdb}_im "
  84. f" where dt in ('19700101', '20240303') "
  85. f" union all "
  86. f" select ckstid as id, {country_ex_colm[mgdb]} as mc "
  87. f" from dwd.cts_{mgdb}_ex "
  88. f" where dt in ('19700101', '20240303')) a "
  89. f" group by id, mc) b "
  90. f"group by id "
  91. f"having count(1) > 1)b ")
  92. res = spark.query(sql_query3)[0].collect()
  93. cnt3 = res[0]['total_tid_count']
  94. sql_query4 = (f"select (select count(1) "
  95. f" from dwd.cts_{mgdb}_im "
  96. f" where dt in ('19700101', '20240303')) + "
  97. f" (select count(1) "
  98. f" from dwd.cts_{mgdb}_ex "
  99. f" where dt in ('19700101', '20240303')) as total_tid_count ")
  100. res = spark.query(sql_query4)[0].collect()
  101. cnt4 = res[0]['total_tid_count']
  102. sql_query5 = (f"select count(1) AS total_tid_count "
  103. f"from (select jkstid as id "
  104. f" from dwd.cts_{mgdb}_im "
  105. f" where dt in ('19700101', '20240303') "
  106. f" union all "
  107. f" select ckstid as id "
  108. f" from dwd.cts_{mgdb}_ex "
  109. f" where dt in ('19700101', '20240303'))c "
  110. f"where id in (select id "
  111. f" from (select id "
  112. f" from (select jkstid as id, {country_im_colm[mgdb]} as mc "
  113. f" from dwd.cts_{mgdb}_im "
  114. f" where dt in ('19700101', '20240303') "
  115. f" union all "
  116. f" select ckstid as id, {country_ex_colm[mgdb]} as mc "
  117. f" from dwd.cts_{mgdb}_ex "
  118. f" where dt in ('19700101', '20240303')) a "
  119. f" group by id, mc) b "
  120. f" group by id "
  121. f" having count(1) = 1) ")
  122. res = spark.query(sql_query5)[0].collect()
  123. cnt5 = res[0]['total_tid_count']
  124. sql_query6 = (f"select count(1) AS total_tid_count "
  125. f"from (select jkstid as id "
  126. f" from dwd.cts_{mgdb}_im "
  127. f" where dt in ('19700101', '20240303') "
  128. f" union all "
  129. f" select ckstid as id "
  130. f" from dwd.cts_{mgdb}_ex "
  131. f" where dt in ('19700101', '20240303'))c "
  132. f"where id in (select id "
  133. f" from (select id "
  134. f" from (select jkstid as id, {country_im_colm[mgdb]} as mc "
  135. f" from dwd.cts_{mgdb}_im "
  136. f" where dt in ('19700101', '20240303') "
  137. f" union all "
  138. f" select ckstid as id, {country_ex_colm[mgdb]} as mc "
  139. f" from dwd.cts_{mgdb}_ex "
  140. f" where dt in ('19700101', '20240303')) a "
  141. f" group by id, mc) b "
  142. f" group by id "
  143. f" having count(1) > 1) ")
  144. res = spark.query(sql_query6)[0].collect()
  145. cnt6 = res[0]['total_tid_count']
  146. sql_query7 = (f"select count(1) AS total_tid_count "
  147. f"from (select jkstid as id "
  148. f" from dwd.cts_{mgdb}_im "
  149. f" where dt in ('19700101', '20240303') "
  150. f" union all "
  151. f" select ckstid as id "
  152. f" from dwd.cts_{mgdb}_ex "
  153. f" where dt in ('19700101', '20240303'))c "
  154. f"where id is null ")
  155. res = spark.query(sql_query7)[0].collect()
  156. cnt7 = res[0]['total_tid_count']
  157. msg = (f"{mgdb}数据量指标 \n"
  158. f"-----------------------------------\n"
  159. f"{mgdb}进出口统计:\n\n"
  160. f"总tid数量:\t\t\t{cnt1}\n"
  161. f"一对一的tid数量:\t\t{cnt2}\n"
  162. f"一对多的tid数量:\t\t{cnt3}\n\n"
  163. f"详单总数据量:\t\t{cnt4}\n"
  164. f"一对一的tid的详单数量:\t{cnt5}\n"
  165. f"一对多的tid的详单数量:\t{cnt6}\n"
  166. f"tid为空的详单数量:\t\t{cnt7}\n"
  167. f" \n"
  168. )
  169. send_dingtalk_notification(msg)
  170. if __name__ == '__main__':
  171. main()