import os import re from datetime import datetime, timedelta import requests import sys abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.utils.hdfs_merge_small_file import hdfs_estimate_num_partitions_absolute_path from dw_base.spark.spark_sql import SparkSQL from dw_base.utils.config_utils import parse_args from dw_base.utils.log_utils import pretty_print """ author: HQL create_time:2024-04-28 update_time:2024-04-28 remarks: 该脚本用于将flume从kafka拉取到HDFS的文件,从ent_raw.ent_crawler_base表中,读取对应topic的表,自动将建过表的数据,抽取到相应表中 -base:【可选】默认为 ent_raw.ent_crawler_base,kafka原始数据表得位置 -topics:【必须】需要同步的topic,可以传入多个,使用逗号','分隔;例如-topics=topic1,topic2 -dt:【可选】需要同步的日期,默认是当日;例如-dt=20240401 -table:【可选】需要同步的数据表,当-table不为空时,只能传入一个topic;例如-table=table1 """ # 获取spark对象 spark = SparkSQL() NORM_MGT: str = '\033[0;35m' NORM_GRN: str = '\033[0;32m' def executor(topics, dt, table, base_table): """ Args: topics: hive表中kafka topic的名称 dt: 数据同步日期 table: 指定同步的数据表 Returns:无返回,直接执行写入程序 """ # 0.当传入table不为空,直接执行当前表的插入 if table: exe_sql = get_execute_sql(table, topics[0], dt) if exe_sql is not None: spark.query(exe_sql) else: # 1.for循环topic,读取topic表中存在多少表 for topic in topics: # 读取每个topic中存在多少table sql = f""" SELECT get_json_object(ori_json,'$.table') table_name FROM {base_table} WHERE dt = '{dt}' AND topic = '{topic}' group by get_json_object(ori_json,'$.table') """ tables = spark.query(sql)[0].collect() if not tables: pretty_print(f'{NORM_MGT}该topic: {topic} 在 {dt} 暂时没有数据 \n{NORM_GRN}') else: # 2.for循环读取每个table,获取hive中表的列信息 for topic_table in tables: topic_table = topic_table[0].replace('ods_', 'ent_') exe_sql = get_execute_sql(topic_table, topic, dt) if exe_sql is not None: spark.query(exe_sql) def get_execute_sql(tbl, topic, dt): """ Args: tbl: hive表名称 Returns: 返回拼接select cols from tablename where dt = {dt} and topic = {topic} lateral view json_tuple(ori_json,cols) parse_json as cols """ if not tbl: pretty_print(f'{NORM_MGT}参数异常 tbl = {tbl}\n{NORM_GRN}') return None sql = f'DESC ent_ods.{tbl}' # 解析show create table输出结果 try: spark.query(sql)[0] except Exception as e: pretty_print(f'{NORM_MGT}未发现此表, 请建表 {tbl} 后执行\n{NORM_GRN}') # 异常信息写入钉钉 response_text = f"异常播报:\n\t未发现ent_ods.{tbl} 数据表,请先建表后使用" dingtalk(response_text) else: # 解析字段描述信息,提取字段名称 column_names = [] for row in spark.query(sql)[0].collect(): if row.col_name == "# Partition Information": break if row.col_name == "dt": continue # if row.col_name in ('_id', 'date', 'desc'): # column_names.append("`" + row.col_name + "`") else: column_names.append(row.col_name) # 拼接字段名称 select_query = ",".join(column_names) lateral_select_query = ",".join([f"'{column}'" for column in column_names]) kafka_table = tbl.replace('ent_', 'ods_') sql = f""" INSERT OVERWRITE TABLE ent_ods.{tbl} PARTITION (dt={dt}) SELECT {select_query} FROM ( SELECT ori_json FROM {base_table} WHERE dt = '{dt}' and topic = '{topic}' and get_json_object(ori_json, '$.table') = '{kafka_table}' ) t LATERAL VIEW json_tuple(ori_json, {lateral_select_query}) parse_json AS {select_query} """ return sql def dingtalk(response_text): """ Args: response_text:写入钉钉机器人的内容 Returns:无返回 """ webhook_url = 'http://m1.node.cdh/dingtalk/api/robot/send?access_token=166d3462282cb6382ef88e7b67d9e06903095172612d44d8a7b94b5ab96976e2' # 构建发送到钉钉机器人的 JSON 数据 json_data = { "msgtype": "text", "text": { "content": response_text }, "at": { "atMobiles": ['15333978057'], "isAtAll": False } } headers = {"Content-Type": "application/json"} # 发送 HTTP POST 请求到钉钉机器人 response = requests.post(webhook_url, json=json_data, headers=headers) if __name__ == '__main__': # 解析命令行参数 CONFIG, _ = parse_args(sys.argv[1:]) base_table = CONFIG.get('base') topics = CONFIG.get('topics') dt = CONFIG.get('dt') table = CONFIG.get('table') if base_table is None or base_table == '': base_table = 'ent_raw.ent_crawler_base' if topics is None or topics == '': pretty_print(f'{NORM_MGT}请输入正确的topic名称!\n{NORM_GRN}') pretty_print(f'{NORM_MGT}-dt=topic1, topic2\n{NORM_GRN}') sys.exit() topics = topics.split(',') if dt is None: dt = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d") if table: if len(topics) > 1: pretty_print(f'{NORM_MGT}当传入-table时, 必须只能传入一个-topic\n{NORM_GRN}') pretty_print(f'{NORM_MGT}-topic = {topics} -table = {table}\n{NORM_GRN}') sys.exit() # 执行插入脚本 executor(topics, dt, table, base_table)