dingtalk_task_monitor.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. # 用于钉钉监控T+1任务是否需要重跑
  2. import sys
  3. import re
  4. import os
  5. import requests
  6. import json
  7. abspath = os.path.abspath(__file__)
  8. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  9. sys.path.append(root_path)
  10. from dw_base.spark.spark_sql import SparkSQL
  11. from dw_base.utils.log_utils import pretty_print
  12. from configparser import ConfigParser
  13. from datetime import time,datetime
  14. from pymongo import MongoClient
  15. from dw_base import *
  16. from dw_base.scheduler.polling_scheduler import get_mongo_client
  17. from dw_base.utils.config_utils import parse_args
  18. from dw_base.scheduler.mg2es.conf_reader import ConfReader
  19. from dw_base.scheduler.mg2es.es_operator import ESOperator
  20. from elasticsearch.exceptions import NotFoundError
  21. call_count = 0
  22. def check_call_count():
  23. global call_count
  24. if call_count == 0:
  25. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  26. f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 1 '
  27. f'{NORM_MGT} call_count =>{call_count}')
  28. print('${setValue(is_run=%s)}' % '1')
  29. else:
  30. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  31. f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 0 '
  32. f'{NORM_MGT} call_count =>{call_count}')
  33. print('${setValue(is_run=%s)}' % '0')
  34. def send_dingtalk_notification(msg):
  35. global call_count
  36. call_count += 1
  37. headers = {"Content-Type": "application/json"}
  38. data = {
  39. "msgtype": "text",
  40. "text": {"content": msg}
  41. }
  42. json_data = json.dumps(data)
  43. # 下面的url用于测试
  44. url = 'http://m1.node.cdh/dingtalk/api/robot/send?access_token=a4a48ed82627149f3317ee86e249fd7d973f5bed40fcac55cc2e7ca8d9ae0c61'
  45. response = requests.post(url=url, data=json_data, headers=headers)
  46. response.raise_for_status()
  47. def send_dingtalk_notification_es(msg):
  48. headers = {"Content-Type": "application/json"}
  49. data = {
  50. "msgtype": "text",
  51. "text": {"content": msg}
  52. }
  53. json_data = json.dumps(data)
  54. # 下面的url用于测试
  55. url = 'http://m1.node.cdh/dingtalk/api/robot/send?access_token=a4a48ed82627149f3317ee86e249fd7d973f5bed40fcac55cc2e7ca8d9ae0c61'
  56. response = requests.post(url=url, data=json_data, headers=headers)
  57. response.raise_for_status()
  58. def get_mongo_client(conf_path):
  59. config_parser = ConfigParser()
  60. config_parser.read(root_path + conf_path)
  61. url = config_parser.get('base', 'address')
  62. return MongoClient(url)
  63. def get_count(client, mgdb, mgtbl):
  64. db = client[mgdb]
  65. collection = db[mgtbl]
  66. return collection.count()
  67. def get_count_null(client, mgdb, mgtbl):
  68. db = client[mgdb]
  69. collection = db[mgtbl]
  70. # 计数`date`字段不为null的文档
  71. # return collection.count_documents({'date': {'$ne': None}})
  72. # 计数`date` 为null的文档
  73. return collection.count_documents({'date': None})
  74. def get_count_range_date(mgdb, mgtbl, target_date):
  75. """
  76. 统计 date 字段值小于目标日期的文档总数
  77. Args:
  78. client: MongoDB客户端实例
  79. mgdb: 数据库名称
  80. mgtbl: 集合名称
  81. target_date_str: 目标日期字符串 (格式: "YYYYMMDD")
  82. Returns:
  83. int: 符合条件的文档数量
  84. """
  85. client = get_mongo_client('/../datasource/mongo/mongo-cluster-cts-prod.ini')
  86. db = client[mgdb]
  87. collection = db[mgtbl]
  88. # 将输入的字符串转换为 datetime 对象
  89. target_date = datetime.strptime(target_date, "%Y%m%d").replace(
  90. tzinfo=None # 如果数据库时间不带时区,可以移除此行
  91. )
  92. count = collection.count_documents({'date': {'$lt': target_date}})
  93. return count
  94. def get_old_count(mgdb, mgtbl):
  95. client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini')
  96. result = get_count(client, mgdb, mgtbl)
  97. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  98. f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  99. f'{NORM_MGT} old data count: {NORM_GRN}{result}')
  100. return result
  101. def get_clu_count_null(mgdb, mgtbl):
  102. client = get_mongo_client('/../datasource/mongo/mongo-cluster-cts-prod.ini')
  103. result = get_count_null(client, mgdb, mgtbl)
  104. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  105. f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  106. f'{NORM_MGT} old data count: {NORM_GRN}{result}')
  107. return result
  108. def get_dev_count_null(mgdb, mgtbl):
  109. client = get_mongo_client('/../datasource/mongo/mongo-cts-dev-rw-200-test.ini')
  110. result = get_count_null(client, mgdb, mgtbl)
  111. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  112. f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  113. f'{NORM_MGT} old data count: {NORM_GRN}{result}')
  114. return result
  115. def get_clu_count(mgdb, mgtbl):
  116. client = get_mongo_client('/../datasource/mongo/mongo-cluster-cts-prod.ini')
  117. result = get_count(client, mgdb, mgtbl)
  118. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  119. f'{NORM_MGT} 大数据集群mongo sink mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  120. f'{NORM_MGT} 大数据集群mongo data count: {NORM_GRN}{result}')
  121. return result
  122. def get_bigdata_count(mgdb, mgtbl, dt, spark,cdt):
  123. sql = (f"select count(1) cnt "
  124. f"from dwd.cts_{mgdb}_{mgtbl} "
  125. f" where dt in ('19700101', {dt},{cdt}) ")
  126. res = spark.query(sql)[0].collect()
  127. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  128. f'{NORM_MGT} 大数据dwd表名: {NORM_GRN}dwd.cts_{mgdb}_{mgtbl} '
  129. f'{NORM_MGT} 大数据dwd 1970+昨日分区+当日分区 count: {NORM_GRN}{res[0].cnt}')
  130. return res[0].cnt
  131. def get_bigdata_global_bol_count(catalog, dt, spark):
  132. sql = (f"""
  133. select sum(cnt) cnt from (select count(1) cnt from dwd.`cts_north_america_bol_{catalog}` where dt in ('19700101', {dt})
  134. union all select count(1) from dwd.`cts_central_america_bol_{catalog}` where dt in ('19700101', {dt})
  135. union all select count(1) from dwd.`cts_south_america_bol_{catalog}` where dt in ('19700101', {dt})
  136. union all select count(1) from dwd.`cts_asia_bol_{catalog}` where dt in ('19700101', {dt})
  137. union all select count(1) from dwd.`cts_middle_east_bol_{catalog}` where dt in ('19700101', {dt})
  138. union all select count(1) from dwd.`cts_europe_bol_{catalog}` where dt in ('19700101', {dt})
  139. union all select count(1) from dwd.`cts_africa_bol_{catalog}` where dt in ('19700101', {dt})
  140. union all select count(1) from dwd.`cts_oceania_bol_{catalog}` where dt in ('19700101', {dt}) ) a""")
  141. res = spark.query(sql)[0].collect()
  142. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  143. f'{NORM_MGT} 大数据dwd表名: global_bol 1拆8 '
  144. f'{NORM_MGT} 大数据dwd 1970+昨日分区count: {NORM_GRN}{res[0].cnt}')
  145. return res[0].cnt
  146. def get_year_count(mgdb, catalog, dt, spark):
  147. if mgdb != "global_bol":
  148. sql = (f"select from_unixtime(cast(`date`/1000 as int)- 8 * 60 * 60, 'yyyy') as year,count(1) hive_cnt "
  149. f"from dwd.cts_{mgdb}_{catalog} "
  150. f" where dt in ('19700101', {dt}) "
  151. f" group by from_unixtime(cast(`date`/1000 as int)- 8 * 60 * 60, 'yyyy')"
  152. f" order by from_unixtime(cast(`date`/1000 as int)- 8 * 60 * 60, 'yyyy')")
  153. res = spark.query(sql)[0].collect()
  154. hive_year_cnt_dict = {}
  155. es_year_cnt_dict = {}
  156. host, port = ConfReader().get_es_conf()
  157. es_operator = ESOperator(host, port)
  158. for record in res:
  159. year = record['year']
  160. hive_cnt = record['hive_cnt']
  161. hive_year_cnt_dict[year] = hive_cnt
  162. # index_name = 'customs_' + str(catalogs[catalog]) + '_' + mgdb + '-' + year
  163. index_name = str(catalog) + '_' + mgdb + '-' + year
  164. try:
  165. ES_year_cnt = es_operator.get_index_document_count(index_name)
  166. except NotFoundError:
  167. # 因为钉钉关键词所以没有发钉钉
  168. msg7 = (f"ES Index {index_name} not found.\n"
  169. f" 请检查原因\n"
  170. )
  171. # print(msg7)
  172. send_dingtalk_notification_es(msg7)
  173. ES_year_cnt = 0
  174. if ES_year_cnt is None:
  175. ES_year_cnt = 0
  176. es_year_cnt_dict[year] = ES_year_cnt
  177. es_diff = ES_year_cnt - hive_cnt
  178. if es_diff != 0:
  179. msg5 = (
  180. f"-----------------------------\n"
  181. f"\n"
  182. f"{mgdb}_{catalog} - 数据一致性警告:ES{year}与大数据DWD的{year}数量不一致。\n\n"
  183. f"详细差异报告:\n"
  184. f"-----------------------------------------------------------------------\n"
  185. f"年份:{year}\n"
  186. f"ES{year} 计数:{ES_year_cnt}\n"
  187. f"大数据{year} 计数:{hive_cnt}\n"
  188. f"差异值:{es_diff}\n"
  189. f"-----------------------------------------------------------------------\n"
  190. f"\n"
  191. f"请检查原因 \n"
  192. f"\n"
  193. f"-----------------------------\n"
  194. )
  195. # print(msg5)
  196. send_dingtalk_notification_es(msg5)
  197. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  198. f'{NORM_MGT} 大数据dwd表名: {NORM_GRN}dwd.cts_{mgdb}_{catalog} '
  199. f'{NORM_MGT} 大数据hive_year_cnt_dict {NORM_GRN}{hive_year_cnt_dict}'
  200. f'{NORM_MGT} es_year_cnt_dict {NORM_GRN}{es_year_cnt_dict}'
  201. )
  202. def main():
  203. CONFIG, _ = parse_args(sys.argv[1:])
  204. dt = CONFIG.get('dt')
  205. cdt = CONFIG.get('cdt')
  206. spark = SparkSQL()
  207. spark._final_spark_config = {'hive.exec.dynamic.partition': 'true',
  208. 'hive.exec.dynamic.partition.mode': 'nonstrict',
  209. 'spark.yarn.queue': 'cts',
  210. 'spark.sql.crossJoin.enabled': 'true',
  211. 'spark.executor.memory': '6g',
  212. 'spark.executor.memoryOverhead': '2048',
  213. 'spark.driver.memory': '4g',
  214. 'spark.executor.instances': "15",
  215. 'spark.executor.cores': '2'
  216. }
  217. sql = (f"select mgdb, catalog from task.mg_count_monitor "
  218. f"where is_deleted = '0'")
  219. res = spark.query(sql)[0].collect()
  220. mgdbs_prod = {
  221. 'dwd表名': '大数据mongo库名',
  222. 'un_global_trade_tatistics': 'united_nations_stat',
  223. "global_bol": "global_bol"
  224. }
  225. mgdbs_old = {
  226. 'dwd表名': 'old_mongo库名',
  227. 'un_global_trade_tatistics': 'united_nations_stat',
  228. "global_bol": "global_sea"
  229. }
  230. catalogs = {
  231. 'im': 'shipments_imports',
  232. 'ex': 'shipments_exports',
  233. }
  234. # 添加需要排除的读 old_mongo 的数据库名称
  235. excluded_dbs = ["un_global_trade_tatistics",
  236. "north_america_bol",
  237. "central_america_bol",
  238. "south_america_bol",
  239. "asia_bol",
  240. "middle_east_bol",
  241. "europe_bol",
  242. "africa_bol",
  243. "oceania_bol"]
  244. # 以下用于测试
  245. # res = [{"mgdb": "global_bol", "catalog": "im"}]
  246. # res = [{"mgdb": "ethiopia", "catalog": "ex"}]
  247. mirror_dbs = ["fiji"]
  248. mirror_dbs_date = {"fiji_im": "20211101", "fiji_ex": "20211101"}
  249. for record in res:
  250. mgdb = record['mgdb']
  251. catalog = record['catalog']
  252. prod_mgdb = mgdbs_prod.get(record['mgdb'], mgdb)
  253. old_mgdb = mgdbs_old.get(record['mgdb'], mgdb)
  254. if mgdb == "global_bol":
  255. old_cnt = get_old_count(old_mgdb, catalogs[catalog])
  256. # oldmongo和dwd拆分表
  257. clu_cnt = get_bigdata_global_bol_count(catalog, dt, spark)
  258. bigdata_count = get_bigdata_global_bol_count(catalog, dt, spark)
  259. date_null_cnt=get_clu_count_null(mgdb, catalogs[catalog])
  260. else:
  261. old_cnt = get_old_count(prod_mgdb, catalogs[catalog])
  262. clu_cnt = get_clu_count(prod_mgdb, catalogs[catalog])
  263. bigdata_count = get_bigdata_count(mgdb, catalog, dt, spark,cdt)
  264. date_null_cnt = get_clu_count_null(mgdb, catalogs[catalog])
  265. # get_year_count(mgdb, catalog, dt, spark)
  266. if mgdb in mirror_dbs:
  267. clu_cnt = get_count_range_date(mgdb, catalogs[catalog], target_date=mirror_dbs_date[f"{mgdb}_{catalog}"])
  268. print(f"{mgdb}{catalogs[catalog]} clu_cnt: {clu_cnt}")
  269. # 两个mongo数据量对比
  270. cnt_diff = old_cnt - clu_cnt
  271. # oldmongo和dwd 对比
  272. bd_diff = old_cnt - bigdata_count
  273. if bd_diff != 0 or cnt_diff != 0 or date_null_cnt != 0:
  274. msg3 = (
  275. f"\n"
  276. f"--------------------------------\n"
  277. f"数据一致性警告\n"
  278. f"--------------------------------\n"
  279. f"在 {mgdb}_{catalog} 详细差异报告:\n\n"
  280. f"\n"
  281. f"--------------------------------\n"
  282. f"计数对比:\n"
  283. f" old_mongo 计数: {old_cnt}\n"
  284. f" 大数据_mongo 计数: {clu_cnt}\n"
  285. f" 大数据平台 DWD 计数: {bigdata_count}\n"
  286. f" 大数据_mongo `date`字段为空 计数: {date_null_cnt}\n"
  287. f"\n"
  288. f"请检查原因 \n"
  289. f"\n"
  290. f"--------------------------------\n"
  291. )
  292. if mgdb not in excluded_dbs:
  293. send_dingtalk_notification(msg3)
  294. # 添加最终各个国家的统计数据量
  295. statistical_time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  296. sql_insert_cnt=f"""
  297. insert into table task.cts_country_count
  298. select '{mgdb}','{catalog}',{clu_cnt},'{statistical_time}','{dt}'
  299. """
  300. spark.query(sql_insert_cnt)[0].collect()
  301. sql_overwrite_cnt = f"""
  302. INSERT overwrite TABLE task.cts_country_count
  303. SELECT country,
  304. catalog,
  305. cnt,
  306. creat_time,
  307. dt
  308. FROM
  309. ( SELECT *,
  310. row_number() over (partition BY country,catalog
  311. ORDER BY `creat_time` DESC) AS rk
  312. FROM task.cts_country_count
  313. WHERE dt ={dt} ) tmp
  314. where rk =1
  315. """
  316. spark.query(sql_overwrite_cnt)[0].collect()
  317. check_call_count()
  318. if __name__ == '__main__':
  319. main()
  320. # CREATE TABLE task.cts_country_count
  321. # (
  322. # `country` string COMMENT 'mgdb',
  323. # `catalog` string COMMENT '进出口类型',
  324. # `cnt` bigint comment '数据量',
  325. # `creat_time` STRING COMMENT '统计时间'
  326. # )
  327. # PARTITIONED BY ( `dt` string )
  328. # TBLPROPERTIES ( 'COMMENT' = '同步到大数据平台的数据量统计');