dingtalk_task_monitor_new.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. # 用于钉钉监控T+1任务是否需要重跑
  2. import sys
  3. import re
  4. import os
  5. import requests
  6. import json
  7. abspath = os.path.abspath(__file__)
  8. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  9. sys.path.append(root_path)
  10. from dw_base.spark.spark_sql import SparkSQL
  11. from dw_base.utils.log_utils import pretty_print
  12. from configparser import ConfigParser
  13. from datetime import time, datetime
  14. from pymongo import MongoClient
  15. from dw_base import *
  16. from dw_base.scheduler.polling_scheduler import get_mongo_client
  17. from dw_base.utils.config_utils import parse_args
  18. from dw_base.scheduler.mg2es.conf_reader import ConfReader
  19. from dw_base.scheduler.mg2es.es_operator import ESOperator
  20. from elasticsearch.exceptions import NotFoundError
  21. call_count = 0
  22. def check_call_count():
  23. global call_count
  24. if call_count == 0:
  25. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  26. f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 1 '
  27. f'{NORM_MGT} call_count =>{call_count}')
  28. print('${setValue(is_run=%s)}' % '1')
  29. else:
  30. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  31. f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 0 '
  32. f'{NORM_MGT} call_count =>{call_count}')
  33. print('${setValue(is_run=%s)}' % '0')
  34. def send_dingtalk_notification(msg):
  35. global call_count
  36. call_count += 1
  37. headers = {"Content-Type": "application/json"}
  38. data = {
  39. "msgtype": "text",
  40. "text": {"content": msg}
  41. }
  42. json_data = json.dumps(data)
  43. # 下面的url用于测试
  44. url = 'https://oapi.dingtalk.com/robot/send?access_token=d4955560edf9d78fbf5273fe3ea4022ecf5955570a68ff710f7fe81926dff71e'
  45. response = requests.post(url=url, data=json_data, headers=headers)
  46. response.raise_for_status()
  47. def send_dingtalk_notification_es(msg):
  48. headers = {"Content-Type": "application/json"}
  49. data = {
  50. "msgtype": "text",
  51. "text": {"content": msg}
  52. }
  53. json_data = json.dumps(data)
  54. # 下面的url用于测试
  55. url = 'http://m1.node.cdh/dingtalk/api/robot/send?access_token=a4a48ed82627149f3317ee86e249fd7d973f5bed40fcac55cc2e7ca8d9ae0c61'
  56. response = requests.post(url=url, data=json_data, headers=headers)
  57. response.raise_for_status()
  58. def get_mongo_client(conf_path):
  59. config_parser = ConfigParser()
  60. config_parser.read(root_path + conf_path)
  61. url = config_parser.get('base', 'address')
  62. return MongoClient(url)
  63. def get_count(client, mgdb, mgtbl):
  64. db = client[mgdb]
  65. collection = db[mgtbl]
  66. return collection.count()
  67. def get_count_null(client, mgdb, mgtbl):
  68. db = client[mgdb]
  69. collection = db[mgtbl]
  70. # 计数`date`字段不为null的文档
  71. # return collection.count_documents({'date': {'$ne': None}})
  72. # 计数`date` 为null的文档
  73. return collection.count_documents({'date': None})
  74. def get_old_count(mgdb, mgtbl):
  75. client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini')
  76. result = get_count(client, mgdb, mgtbl)
  77. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  78. f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  79. f'{NORM_MGT} old data count: {NORM_GRN}{result}')
  80. return result
  81. def get_clu_count_null(mgdb, mgtbl):
  82. client = get_mongo_client('/../datasource/mongo/mongo-cluster-cts-prod.ini')
  83. result = get_count_null(client, mgdb, mgtbl)
  84. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  85. f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  86. f'{NORM_MGT} old data count: {NORM_GRN}{result}')
  87. return result
  88. def get_dev_count_null(mgdb, mgtbl):
  89. client = get_mongo_client('/../datasource/mongo/mongo-cts-dev-rw-200-test.ini')
  90. result = get_count_null(client, mgdb, mgtbl)
  91. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  92. f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  93. f'{NORM_MGT} old data count: {NORM_GRN}{result}')
  94. return result
  95. def get_clu_count(mgdb, mgtbl):
  96. client = get_mongo_client('/../datasource/mongo/mongo-cluster-cts-prod.ini')
  97. result = get_count(client, mgdb, mgtbl)
  98. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  99. f'{NORM_MGT} 大数据集群mongo sink mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  100. f'{NORM_MGT} 大数据集群mongo data count: {NORM_GRN}{result}')
  101. return result
  102. def get_bigdata_count(mgdb, mgtbl, dt, spark,cdt):
  103. sql = (f"select count(1) cnt "
  104. f"from dwd.cts_{mgdb}_{mgtbl} "
  105. f" where dt in ('19700101', {dt},{cdt}) ")
  106. res = spark.query(sql)[0].collect()
  107. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  108. f'{NORM_MGT} 大数据dwd表名: {NORM_GRN}dwd.cts_{mgdb}_{mgtbl} '
  109. f'{NORM_MGT} 大数据dwd 1970+昨日分区+当日分区 count: {NORM_GRN}{res[0].cnt}')
  110. return res[0].cnt
  111. def get_bigdata_global_bol_count(catalog, dt, spark):
  112. sql = (f"""
  113. select sum(cnt) cnt from (select count(1) cnt from dwd.`cts_north_america_bol_{catalog}` where dt in ('19700101', {dt})
  114. union all select count(1) from dwd.`cts_central_america_bol_{catalog}` where dt in ('19700101', {dt})
  115. union all select count(1) from dwd.`cts_south_america_bol_{catalog}` where dt in ('19700101', {dt})
  116. union all select count(1) from dwd.`cts_asia_bol_{catalog}` where dt in ('19700101', {dt})
  117. union all select count(1) from dwd.`cts_middle_east_bol_{catalog}` where dt in ('19700101', {dt})
  118. union all select count(1) from dwd.`cts_europe_bol_{catalog}` where dt in ('19700101', {dt})
  119. union all select count(1) from dwd.`cts_africa_bol_{catalog}` where dt in ('19700101', {dt})
  120. union all select count(1) from dwd.`cts_oceania_bol_{catalog}` where dt in ('19700101', {dt}) ) a""")
  121. res = spark.query(sql)[0].collect()
  122. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  123. f'{NORM_MGT} 大数据dwd表名: global_bol 1拆8 '
  124. f'{NORM_MGT} 大数据dwd 1970+昨日分区count: {NORM_GRN}{res[0].cnt}')
  125. return res[0].cnt
  126. def get_year_count(mgdb, catalog, dt, spark):
  127. if mgdb != "global_bol":
  128. sql = (f"select from_unixtime(cast(`date`/1000 as int)- 8 * 60 * 60, 'yyyy') as year,count(1) hive_cnt "
  129. f"from dwd.cts_{mgdb}_{catalog} "
  130. f" where dt in ('19700101', {dt}) "
  131. f" group by from_unixtime(cast(`date`/1000 as int)- 8 * 60 * 60, 'yyyy')"
  132. f" order by from_unixtime(cast(`date`/1000 as int)- 8 * 60 * 60, 'yyyy')")
  133. res = spark.query(sql)[0].collect()
  134. hive_year_cnt_dict = {}
  135. es_year_cnt_dict = {}
  136. host, port = ConfReader().get_es_conf()
  137. es_operator = ESOperator(host, port)
  138. for record in res:
  139. year = record['year']
  140. hive_cnt = record['hive_cnt']
  141. hive_year_cnt_dict[year] = hive_cnt
  142. # index_name = 'customs_' + str(catalogs[catalog]) + '_' + mgdb + '-' + year
  143. index_name = str(catalog) + '_' + mgdb + '-' + year
  144. try:
  145. ES_year_cnt = es_operator.get_index_document_count(index_name)
  146. except NotFoundError:
  147. # 因为钉钉关键词所以没有发钉钉
  148. msg7 = (f"ES Index {index_name} not found.\n"
  149. f" 请检查原因\n"
  150. )
  151. # print(msg7)
  152. send_dingtalk_notification_es(msg7)
  153. ES_year_cnt = 0
  154. if ES_year_cnt is None:
  155. ES_year_cnt = 0
  156. es_year_cnt_dict[year] = ES_year_cnt
  157. es_diff = ES_year_cnt - hive_cnt
  158. if es_diff != 0:
  159. msg5 = (
  160. f"-----------------------------\n"
  161. f"\n"
  162. f"{mgdb}_{catalog} - 数据一致性警告:ES{year}与大数据DWD的{year}数量不一致。\n\n"
  163. f"详细差异报告:\n"
  164. f"-----------------------------------------------------------------------\n"
  165. f"年份:{year}\n"
  166. f"ES{year} 计数:{ES_year_cnt}\n"
  167. f"大数据{year} 计数:{hive_cnt}\n"
  168. f"差异值:{es_diff}\n"
  169. f"-----------------------------------------------------------------------\n"
  170. f"\n"
  171. f"请检查原因 \n"
  172. f"\n"
  173. f"-----------------------------\n"
  174. )
  175. # print(msg5)
  176. send_dingtalk_notification_es(msg5)
  177. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  178. f'{NORM_MGT} 大数据dwd表名: {NORM_GRN}dwd.cts_{mgdb}_{catalog} '
  179. f'{NORM_MGT} 大数据hive_year_cnt_dict {NORM_GRN}{hive_year_cnt_dict}'
  180. f'{NORM_MGT} es_year_cnt_dict {NORM_GRN}{es_year_cnt_dict}'
  181. )
  182. def get_count_range_date(mgdb, mgtbl, target_date):
  183. """
  184. 统计 date 字段值小于目标日期的文档总数
  185. Args:
  186. client: MongoDB客户端实例
  187. mgdb: 数据库名称
  188. mgtbl: 集合名称
  189. target_date_str: 目标日期字符串 (格式: "YYYYMMDD")
  190. Returns:
  191. int: 符合条件的文档数量
  192. """
  193. client = get_mongo_client('/../datasource/mongo/mongo-cluster-cts-prod.ini')
  194. db = client[mgdb]
  195. collection = db[mgtbl]
  196. # 将输入的字符串转换为 datetime 对象
  197. target_date = datetime.strptime(target_date, "%Y%m%d").replace(
  198. tzinfo=None # 如果数据库时间不带时区,可以移除此行
  199. )
  200. count = collection.count_documents({'date': {'$lt': target_date}})
  201. return count
  202. def main():
  203. CONFIG, _ = parse_args(sys.argv[1:])
  204. dt = CONFIG.get('dt')
  205. cdt = CONFIG.get('cdt')
  206. spark = SparkSQL()
  207. spark._final_spark_config = {'hive.exec.dynamic.partition': 'true',
  208. 'hive.exec.dynamic.partition.mode': 'nonstrict',
  209. 'spark.yarn.queue': 'cts',
  210. 'spark.sql.crossJoin.enabled': 'true',
  211. 'spark.executor.memory': '6g',
  212. 'spark.executor.memoryOverhead': '2048',
  213. 'spark.driver.memory': '4g',
  214. 'spark.executor.instances': "15",
  215. 'spark.executor.cores': '2'
  216. }
  217. sql = (f"select mgdb, catalog from task.mg_count_monitor "
  218. f"where is_deleted = '0'")
  219. res = spark.query(sql)[0].collect()
  220. mgdbs_prod = {
  221. 'dwd表名': '大数据mongo库名',
  222. 'un_global_trade_tatistics': 'united_nations_stat',
  223. "global_bol": "global_bol"
  224. }
  225. mgdbs_old = {
  226. 'dwd表名': 'old_mongo库名',
  227. 'un_global_trade_tatistics': 'united_nations_stat',
  228. "global_bol": "global_sea"
  229. }
  230. catalogs = {
  231. 'im': 'shipments_imports',
  232. 'ex': 'shipments_exports',
  233. }
  234. # 添加需要排除的读 old_mongo 的数据库名称
  235. excluded_dbs = ["un_global_trade_tatistics",
  236. "north_america_bol",
  237. "central_america_bol",
  238. "south_america_bol",
  239. "asia_bol",
  240. "middle_east_bol",
  241. "europe_bol",
  242. "africa_bol",
  243. "oceania_bol"]
  244. # 以下用于测试
  245. # res = [{"mgdb": "global_bol", "catalog": "im"}]
  246. # res = [{"mgdb": "ethiopia", "catalog": "ex"}]
  247. mirror_dbs = ["fiji"]
  248. mirror_dbs_date = {"fiji_im": "20211101", "fiji_ex": "20211101"}
  249. for record in res:
  250. mgdb = record['mgdb']
  251. catalog = record['catalog']
  252. prod_mgdb = mgdbs_prod.get(record['mgdb'], mgdb)
  253. old_mgdb = mgdbs_old.get(record['mgdb'], mgdb)
  254. if mgdb == "global_bol":
  255. old_cnt = get_old_count(old_mgdb, catalogs[catalog])
  256. # oldmongo和dwd拆分表
  257. clu_cnt = get_bigdata_global_bol_count(catalog, dt, spark)
  258. bigdata_count = get_bigdata_global_bol_count(catalog, dt, spark)
  259. date_null_cnt=get_clu_count_null(mgdb, catalogs[catalog])
  260. else:
  261. old_cnt = get_old_count(prod_mgdb, catalogs[catalog])
  262. clu_cnt = get_clu_count(prod_mgdb, catalogs[catalog])
  263. bigdata_count = get_bigdata_count(mgdb, catalog, dt, spark,cdt)
  264. date_null_cnt = get_clu_count_null(mgdb, catalogs[catalog])
  265. # get_year_count(mgdb, catalog, dt, spark)
  266. if mgdb in mirror_dbs:
  267. clu_cnt = get_count_range_date(mgdb, catalogs[catalog], target_date=mirror_dbs_date[f"{mgdb}_{catalog}"])
  268. print(f"{mgdb}{catalogs[catalog]} clu_cnt: {clu_cnt}")
  269. # 两个mongo数据量对比
  270. cnt_diff = old_cnt - clu_cnt
  271. # oldmongo和dwd 对比
  272. bd_diff = old_cnt - bigdata_count
  273. if bd_diff != 0 or cnt_diff != 0 or date_null_cnt != 0:
  274. msg3 = (
  275. f"\n"
  276. f"--------------------------------\n"
  277. f"数据一致性警告\n"
  278. f"--------------------------------\n"
  279. f"在 {mgdb}_{catalog} 详细差异报告:\n\n"
  280. f"\n"
  281. f"--------------------------------\n"
  282. f"计数对比:\n"
  283. f" old_mongo 计数: {old_cnt}\n"
  284. f" 大数据_mongo 计数: {clu_cnt}\n"
  285. f" 大数据平台 DWD 计数: {bigdata_count}\n"
  286. f" 大数据_mongo `date`字段为空 计数: {date_null_cnt}\n"
  287. f"\n"
  288. f"请检查原因 \n"
  289. f"\n"
  290. f"--------------------------------\n"
  291. )
  292. if mgdb not in excluded_dbs:
  293. send_dingtalk_notification(msg3)
  294. # 添加最终各个国家的统计数据量
  295. statistical_time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  296. sql_insert_cnt=f"""
  297. insert into table task.cts_country_count
  298. select '{mgdb}','{catalog}',{clu_cnt},'{statistical_time}','{dt}'
  299. """
  300. spark.query(sql_insert_cnt)[0].collect()
  301. sql_overwrite_cnt = f"""
  302. INSERT overwrite TABLE task.cts_country_count
  303. SELECT country,
  304. catalog,
  305. cnt,
  306. creat_time,
  307. dt
  308. FROM
  309. ( SELECT *,
  310. row_number() over (partition BY country,catalog
  311. ORDER BY `creat_time` DESC) AS rk
  312. FROM task.cts_country_count
  313. WHERE dt ={dt} ) tmp
  314. where rk =1
  315. """
  316. spark.query(sql_overwrite_cnt)[0].collect()
  317. check_call_count()
  318. if __name__ == '__main__':
  319. main()
  320. # CREATE TABLE task.cts_country_count
  321. # (
  322. # `country` string COMMENT 'mgdb',
  323. # `catalog` string COMMENT '进出口类型',
  324. # `cnt` bigint comment '数据量',
  325. # `creat_time` STRING COMMENT '统计时间'
  326. # )
  327. # PARTITIONED BY ( `dt` string )
  328. # TBLPROPERTIES ( 'COMMENT' = '同步到大数据平台的数据量统计');