dingtalk_notifier.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # 调用钉钉机器人通知相关人员更新ES
  2. import sys
  3. import re
  4. import os
  5. abspath = os.path.abspath(__file__)
  6. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  7. sys.path.append(root_path)
  8. from dw_base.utils.log_utils import pretty_print
  9. from dw_base import *
  10. import requests
  11. import json
  12. import time
  13. from dw_base.scheduler.polling_scheduler import get_sink_count
  14. from dw_base.utils.config_utils import parse_args
  15. from dw_base.spark.spark_sql import SparkSQL
  16. import random
  17. def send_dingtalk_notification(msg):
  18. headers = {"Content-Type": "application/json"}
  19. data = {
  20. "msgtype": "text",
  21. "text": {"content": msg},
  22. "at": {"atMobiles": ["13924570409"]}
  23. }
  24. json_data = json.dumps(data)
  25. url = 'https://oapi.dingtalk.com/robot/send?access_token=bda512e1f980c8d126361afbae9d744e9885705ce6ed047395a1f6bc4114114d'
  26. response = requests.post(url=url, data=json_data, headers=headers)
  27. response.raise_for_status()
  28. if __name__ == '__main__':
  29. CONFIG, _ = parse_args(sys.argv[1:])
  30. start_date = CONFIG.get('start-date')
  31. stop_date = CONFIG.get('stop-date')
  32. mgdb = CONFIG.get('mgdb')
  33. mgtbl = CONFIG.get('mgtbl')
  34. batch_id = CONFIG.get('batch_id')
  35. cdt =f"{time.strftime('%Y%m%d', time.localtime())}"
  36. count = get_sink_count(mgdb, mgtbl, start_date, stop_date)
  37. spark = SparkSQL()
  38. spark._final_spark_config = {'hive.exec.dynamic.partition': 'true',
  39. 'hive.exec.dynamic.partition.mode': 'nonstrict',
  40. 'spark.yarn.queue': 'cts',
  41. 'spark.sql.crossJoin.enabled': 'true',
  42. 'spark.executor.memory': '6g',
  43. 'spark.executor.memoryOverhead': '2048',
  44. 'spark.driver.memory': '4g',
  45. 'spark.executor.instances': "15",
  46. 'spark.executor.cores': '2'
  47. }
  48. if count > 0 :
  49. try:
  50. # 定义延时时间列表
  51. delay_times = [0,10,18,26,39,45,52,60,70,80,90,100]
  52. delay = random.choice(delay_times)
  53. print(f"随机延时时间为:{delay}秒")
  54. time.sleep(delay)
  55. sql = (f"select count(1) as cnt from task.cts_incr_updated_data_cnt "
  56. f"where dt = '{cdt}'")
  57. res = spark.query(sql)[0].collect()
  58. order_id= int(res[0].cnt +1)
  59. sql_insert_cnt = f"""
  60. insert into table task.cts_incr_updated_data_cnt
  61. select '{mgdb}','{mgtbl}',{count},'{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}','{cdt}'
  62. """
  63. spark.query(sql_insert_cnt)[0].collect()
  64. msg = (f"数据上新提醒 @13924570409\n"
  65. f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} ({order_id})\n"
  66. f"{mgdb}.{mgtbl} 今日新增数据量: {count} 已入库完毕,\n调用接口成功,正在刷es索引!"
  67. f"本批数据batch_id为: {batch_id} "
  68. )
  69. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ({order_id})'
  70. f'{NORM_MGT}已发送通知: {NORM_GRN} {msg} ')
  71. send_dingtalk_notification(msg)
  72. except Exception as e:
  73. print(f"发生错误: {e}")
  74. #
  75. #
  76. # CREATE TABLE task.`cts_incr_updated_data_cnt`
  77. # (
  78. # `mgdb` STRING COMMENT 'mgdb',
  79. # `mgtbl` STRING COMMENT 'mgtbl',
  80. # `count` int COMMENT '计数',
  81. # `created_time` STRING COMMENT '统计时间'
  82. # )
  83. # COMMENT 'cts_incr_updated_data_cnt'
  84. # PARTITIONED BY (`dt` STRING)
  85. # STORED AS ORC
  86. # tblproperties ('orc.compress' = 'ZLIB')