dingtalk_mirror_monitor.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. # 用于钉钉监控T+1任务是否需要重跑
  2. import sys
  3. import re
  4. import os
  5. import requests
  6. import json
  7. abspath = os.path.abspath(__file__)
  8. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  9. sys.path.append(root_path)
  10. from dw_base.spark.spark_sql import SparkSQL
  11. from dw_base.utils.log_utils import pretty_print
  12. from configparser import ConfigParser
  13. from datetime import time
  14. from pymongo import MongoClient
  15. from dw_base import *
  16. from dw_base.scheduler.polling_scheduler import get_mongo_client
  17. from dw_base.utils.config_utils import parse_args
  18. from dw_base.scheduler.mg2es.conf_reader import ConfReader
  19. from dw_base.scheduler.mg2es.es_operator import ESOperator
  20. from elasticsearch.exceptions import NotFoundError
  21. call_count = 0
  22. def check_call_count():
  23. global call_count
  24. if call_count == 0:
  25. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  26. f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 1 '
  27. f'{NORM_MGT} call_count =>{call_count}')
  28. print('${setValue(is_run=%s)}' % '1')
  29. else:
  30. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  31. f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 0 '
  32. f'{NORM_MGT} call_count =>{call_count}')
  33. print('${setValue(is_run=%s)}' % '0')
  34. def send_dingtalk_notification(msg):
  35. global call_count
  36. call_count += 1
  37. headers = {"Content-Type": "application/json"}
  38. data = {
  39. "msgtype": "text",
  40. "text": {"content": msg}
  41. }
  42. json_data = json.dumps(data)
  43. # 下面的url用于测试
  44. url = 'http://m1.node.cdh/dingtalk/api/robot/send?access_token=a4a48ed82627149f3317ee86e249fd7d973f5bed40fcac55cc2e7ca8d9ae0c61'
  45. response = requests.post(url=url, data=json_data, headers=headers)
  46. response.raise_for_status()
  47. def send_dingtalk_notification_es(msg):
  48. headers = {"Content-Type": "application/json"}
  49. data = {
  50. "msgtype": "text",
  51. "text": {"content": msg}
  52. }
  53. json_data = json.dumps(data)
  54. # 下面的url用于测试
  55. url = 'http://m1.node.cdh/dingtalk/api/robot/send?access_token=a4a48ed82627149f3317ee86e249fd7d973f5bed40fcac55cc2e7ca8d9ae0c61'
  56. response = requests.post(url=url, data=json_data, headers=headers)
  57. response.raise_for_status()
  58. def get_mongo_client(conf_path):
  59. config_parser = ConfigParser()
  60. config_parser.read(root_path + conf_path)
  61. url = config_parser.get('base', 'address')
  62. return MongoClient(url)
  63. def get_count(client, mgdb, mgtbl):
  64. db = client[mgdb]
  65. collection = db[mgtbl]
  66. return collection.count()
  67. def get_count_null(client, mgdb, mgtbl):
  68. db = client[mgdb]
  69. collection = db[mgtbl]
  70. # 计数`date`字段不为null的文档
  71. # return collection.count_documents({'date': {'$ne': None}})
  72. # 计数`date` 为null的文档
  73. return collection.count_documents({'date': None})
  74. def get_old_count(mgdb, mgtbl):
  75. client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini')
  76. result = get_count(client, mgdb, mgtbl)
  77. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  78. f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  79. f'{NORM_MGT} old data count: {NORM_GRN}{result}')
  80. return result
  81. def get_clu_count_null(mgdb, mgtbl):
  82. client = get_mongo_client('/../datasource/mongo/mongo-cluster-cts-prod.ini')
  83. result = get_count_null(client, mgdb, mgtbl)
  84. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  85. f'{NORM_MGT} 集群 mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  86. f'{NORM_MGT} 集群date字段为空 count: {NORM_GRN}{result}')
  87. return result
  88. def get_dev_count_null(mgdb, mgtbl):
  89. client = get_mongo_client('/../datasource/mongo/mongo-cts-dev-rw-200-test.ini')
  90. result = get_count_null(client, mgdb, mgtbl)
  91. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  92. f'{NORM_MGT} dev source mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  93. f'{NORM_MGT} dev data count: {NORM_GRN}{result}')
  94. return result
  95. def get_clu_count(mgdb, mgtbl):
  96. client = get_mongo_client('/../datasource/mongo/mongo-cluster-cts-prod.ini')
  97. result = get_count(client, mgdb, mgtbl)
  98. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  99. f'{NORM_MGT} 大数据集群mongo sink mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  100. f'{NORM_MGT} 大数据集群mongo data count: {NORM_GRN}{result}')
  101. return result
  102. def get_diff_logic(spark,record,dt):
  103. mgdb = record['mgdb']
  104. catalog = record['catalog']
  105. bigdata_count = record['cnt']
  106. clu_cnt = get_clu_count(mgdb, catalog)
  107. date_null_cnt = get_clu_count_null(mgdb, catalog)
  108. # 两个mongo数据量对比
  109. cnt_diff = clu_cnt - bigdata_count
  110. # if cnt_diff != 0 or date_null_cnt != 0:
  111. if date_null_cnt != 0:
  112. msg3 = (
  113. f"\n"
  114. f"--------------------------------\n"
  115. f"镜像_mir 数据一致性警告\n"
  116. f"--------------------------------\n"
  117. f"在 {mgdb}_{catalog} 详细差异报告:\n\n"
  118. f"\n"
  119. f"--------------------------------\n"
  120. f"计数对比:\n"
  121. f" 大数据_镜像mongo 计数: {clu_cnt}\n"
  122. f" 大数据平台 DWD 计数: {bigdata_count}\n"
  123. f" 大数据_镜像mongo `date`字段为空 计数: {date_null_cnt}\n"
  124. f"\n"
  125. f"请检查原因 \n"
  126. f"\n"
  127. f"--------------------------------\n"
  128. )
  129. print(msg3)
  130. # send_dingtalk_notification(msg3)
  131. # 添加最终各个国家的统计数据量
  132. statistical_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  133. sql_insert_cnt = f"""
  134. insert into table task.cts_mirror_count
  135. select '{mgdb}','{catalog}',{bigdata_count},{clu_cnt},'{statistical_time}','{dt}'
  136. """
  137. spark.query(sql_insert_cnt)[0].collect()
  138. def main():
  139. CONFIG, _ = parse_args(sys.argv[1:])
  140. dt = CONFIG.get('dt')
  141. ydt = CONFIG.get('ydt')
  142. spark = SparkSQL()
  143. spark._final_spark_config = {'hive.exec.dynamic.partition': 'true',
  144. 'hive.exec.dynamic.partition.mode': 'nonstrict',
  145. 'spark.yarn.queue': 'cts',
  146. 'spark.sql.crossJoin.enabled': 'true',
  147. 'spark.executor.memory': '8g',
  148. 'spark.executor.memoryOverhead': '2048',
  149. 'spark.driver.memory': '4g',
  150. 'spark.executor.instances': "12",
  151. 'spark.executor.cores': '4',
  152. "spark.sql.hive.filesourcePartitionFileCacheSize":"536870912"
  153. }
  154. im_sql = (
  155. f"select i.code3 as code3,code.english_name as country_name,concat(code.english_name,'_mir') as mgdb,cnt,'shipments_imports' as catalog"
  156. f" from"
  157. f"( select country_code as code3 ,count(1) as cnt from (select country_code from dwd.cts_mirror_country_im where dt ='{ydt}') im "
  158. f"group by country_code) i left join dim.cts_mirror_monitor code"
  159. f" on i.code3 = code.code3 where code.english_name is not null")
  160. ex_sql = (
  161. f"select i.code3 as code3,code.english_name as country_name,concat(code.english_name,'_mir') as mgdb,cnt,'shipments_exports' as catalog "
  162. f" from"
  163. f"( select country_code as code3 ,count(1) as cnt from (select country_code from dwd.cts_mirror_country_ex where dt ='{ydt}') ex "
  164. f"group by country_code) i left join dim.cts_mirror_monitor code"
  165. f" on i.code3 = code.code3 where code.english_name is not null")
  166. res_im = spark.query(im_sql)[0].collect()
  167. res_ex = spark.query(ex_sql)[0].collect()
  168. for record in res_im:
  169. get_diff_logic(spark, record,dt)
  170. for record in res_ex:
  171. get_diff_logic(spark, record,dt)
  172. sql_overwrite_cnt = f"""
  173. INSERT overwrite TABLE task.cts_mirror_count
  174. SELECT country,
  175. catalog,
  176. dwd_cnt,
  177. mongo_cnt,
  178. creat_time,
  179. dt
  180. FROM
  181. ( SELECT *,
  182. row_number() over (partition BY country,catalog
  183. ORDER BY `creat_time` DESC) AS rk
  184. FROM task.cts_mirror_count
  185. WHERE dt ={dt} ) tmp
  186. where rk =1
  187. """
  188. spark.query(sql_overwrite_cnt)[0].collect()
  189. check_call_count()
  190. if __name__ == '__main__':
  191. main()
  192. # CREATE TABLE task.cts_mirror_count
  193. # (
  194. # `country` string COMMENT 'mgdb',
  195. # `catalog` string COMMENT '进出口类型',
  196. # `cnt` bigint comment '数据量',
  197. # `creat_time` STRING COMMENT '统计时间'
  198. # )
  199. # PARTITIONED BY ( `dt` string )
  200. # TBLPROPERTIES ( 'COMMENT' = '同步到大数据平台的数据量统计');