| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- # 调用钉钉机器人通知相关人员更新ES
- import sys
- import re
- import os
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from dw_base.utils.log_utils import pretty_print
- from dw_base import *
- import requests
- import json
- import time
- from dw_base.scheduler.polling_scheduler import get_sink_count
- from dw_base.utils.config_utils import parse_args
- from dw_base.spark.spark_sql import SparkSQL
- import random
- def send_dingtalk_notification(msg):
- headers = {"Content-Type": "application/json"}
- data = {
- "msgtype": "text",
- "text": {"content": msg},
- "at": {"atMobiles": ["13924570409"]}
- }
- json_data = json.dumps(data)
- url = 'https://oapi.dingtalk.com/robot/send?access_token=bda512e1f980c8d126361afbae9d744e9885705ce6ed047395a1f6bc4114114d'
- response = requests.post(url=url, data=json_data, headers=headers)
- response.raise_for_status()
- if __name__ == '__main__':
- CONFIG, _ = parse_args(sys.argv[1:])
- start_date = CONFIG.get('start-date')
- stop_date = CONFIG.get('stop-date')
- mgdb = CONFIG.get('mgdb')
- mgtbl = CONFIG.get('mgtbl')
- batch_id = CONFIG.get('batch_id')
- cdt =f"{time.strftime('%Y%m%d', time.localtime())}"
- count = get_sink_count(mgdb, mgtbl, start_date, stop_date)
- spark = SparkSQL()
- spark._final_spark_config = {'hive.exec.dynamic.partition': 'true',
- 'hive.exec.dynamic.partition.mode': 'nonstrict',
- 'spark.yarn.queue': 'cts',
- 'spark.sql.crossJoin.enabled': 'true',
- 'spark.executor.memory': '6g',
- 'spark.executor.memoryOverhead': '2048',
- 'spark.driver.memory': '4g',
- 'spark.executor.instances': "15",
- 'spark.executor.cores': '2'
- }
- if count > 0 :
- try:
- # 定义延时时间列表
- delay_times = [0,10,18,26,39,45,52,60,70,80,90,100]
- delay = random.choice(delay_times)
- print(f"随机延时时间为:{delay}秒")
- time.sleep(delay)
- sql = (f"select count(1) as cnt from task.cts_incr_updated_data_cnt "
- f"where dt = '{cdt}'")
- res = spark.query(sql)[0].collect()
- order_id= int(res[0].cnt +1)
- sql_insert_cnt = f"""
- insert into table task.cts_incr_updated_data_cnt
- select '{mgdb}','{mgtbl}',{count},'{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}','{cdt}'
- """
- spark.query(sql_insert_cnt)[0].collect()
- msg = (f"数据上新提醒 @13924570409\n"
- f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} ({order_id})\n"
- f"{mgdb}.{mgtbl} 今日新增数据量: {count} 已入库完毕,\n调用接口成功,正在刷es索引!"
- f"本批数据batch_id为: {batch_id} "
- )
- pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ({order_id})'
- f'{NORM_MGT}已发送通知: {NORM_GRN} {msg} ')
- send_dingtalk_notification(msg)
- except Exception as e:
- print(f"发生错误: {e}")
- #
- #
- # CREATE TABLE task.`cts_incr_updated_data_cnt`
- # (
- # `mgdb` STRING COMMENT 'mgdb',
- # `mgtbl` STRING COMMENT 'mgtbl',
- # `count` int COMMENT '计数',
- # `created_time` STRING COMMENT '统计时间'
- # )
- # COMMENT 'cts_incr_updated_data_cnt'
- # PARTITIONED BY (`dt` STRING)
- # STORED AS ORC
- # tblproperties ('orc.compress' = 'ZLIB')
|