# 调用钉钉机器人通知相关人员更新ES import sys import re import os abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.utils.log_utils import pretty_print from dw_base import * import requests import json import time from dw_base.scheduler.polling_scheduler import get_sink_count from dw_base.utils.config_utils import parse_args from dw_base.spark.spark_sql import SparkSQL import random def send_dingtalk_notification(msg): headers = {"Content-Type": "application/json"} data = { "msgtype": "text", "text": {"content": msg}, "at": {"atMobiles": ["13924570409"]} } json_data = json.dumps(data) url = 'https://oapi.dingtalk.com/robot/send?access_token=bda512e1f980c8d126361afbae9d744e9885705ce6ed047395a1f6bc4114114d' response = requests.post(url=url, data=json_data, headers=headers) response.raise_for_status() if __name__ == '__main__': CONFIG, _ = parse_args(sys.argv[1:]) start_date = CONFIG.get('start-date') stop_date = CONFIG.get('stop-date') mgdb = CONFIG.get('mgdb') mgtbl = CONFIG.get('mgtbl') batch_id = CONFIG.get('batch_id') cdt =f"{time.strftime('%Y%m%d', time.localtime())}" count = get_sink_count(mgdb, mgtbl, start_date, stop_date) spark = SparkSQL() spark._final_spark_config = {'hive.exec.dynamic.partition': 'true', 'hive.exec.dynamic.partition.mode': 'nonstrict', 'spark.yarn.queue': 'cts', 'spark.sql.crossJoin.enabled': 'true', 'spark.executor.memory': '6g', 'spark.executor.memoryOverhead': '2048', 'spark.driver.memory': '4g', 'spark.executor.instances': "15", 'spark.executor.cores': '2' } if count > 0 : try: # 定义延时时间列表 delay_times = [0,10,18,26,39,45,52,60,70,80,90,100] delay = random.choice(delay_times) print(f"随机延时时间为:{delay}秒") time.sleep(delay) sql = (f"select count(1) as cnt from task.cts_incr_updated_data_cnt " f"where dt = '{cdt}'") res = spark.query(sql)[0].collect() order_id= int(res[0].cnt +1) sql_insert_cnt = f""" insert into table task.cts_incr_updated_data_cnt select '{mgdb}','{mgtbl}',{count},'{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}','{cdt}' """ spark.query(sql_insert_cnt)[0].collect() msg = (f"数据上新提醒 @13924570409\n" f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} ({order_id})\n" f"{mgdb}.{mgtbl} 今日新增数据量: {count} 已入库完毕,\n调用接口成功,正在刷es索引!" f"本批数据batch_id为: {batch_id} " ) pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ({order_id})' f'{NORM_MGT}已发送通知: {NORM_GRN} {msg} ') send_dingtalk_notification(msg) except Exception as e: print(f"发生错误: {e}") # # # CREATE TABLE task.`cts_incr_updated_data_cnt` # ( # `mgdb` STRING COMMENT 'mgdb', # `mgtbl` STRING COMMENT 'mgtbl', # `count` int COMMENT '计数', # `created_time` STRING COMMENT '统计时间' # ) # COMMENT 'cts_incr_updated_data_cnt' # PARTITIONED BY (`dt` STRING) # STORED AS ORC # tblproperties ('orc.compress' = 'ZLIB')