| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- import os
- import re
- from datetime import datetime, timedelta
- import requests
- import sys
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from dw_base.utils.hdfs_merge_small_file import hdfs_estimate_num_partitions_absolute_path
- from dw_base.spark.spark_sql import SparkSQL
- from dw_base.utils.config_utils import parse_args
- from dw_base.utils.log_utils import pretty_print
- """
- author: HQL
- create_time:2024-04-28
- update_time:2024-04-28
- remarks:
- 该脚本用于将flume从kafka拉取到HDFS的文件,从ent_raw.ent_crawler_base表中,读取对应topic的表,自动将建过表的数据,抽取到相应表中
- -base:【可选】默认为 ent_raw.ent_crawler_base,kafka原始数据表得位置
- -topics:【必须】需要同步的topic,可以传入多个,使用逗号','分隔;例如-topics=topic1,topic2
- -dt:【可选】需要同步的日期,默认是当日;例如-dt=20240401
- -table:【可选】需要同步的数据表,当-table不为空时,只能传入一个topic;例如-table=table1
- """
- # 获取spark对象
- spark = SparkSQL()
- NORM_MGT: str = '\033[0;35m'
- NORM_GRN: str = '\033[0;32m'
- def executor(topics, dt, table, base_table):
- """
- Args:
- topics: hive表中kafka topic的名称
- dt: 数据同步日期
- table: 指定同步的数据表
- Returns:无返回,直接执行写入程序
- """
- # 0.当传入table不为空,直接执行当前表的插入
- if table:
- exe_sql = get_execute_sql(table, topics[0], dt)
- if exe_sql is not None:
- spark.query(exe_sql)
- else:
- # 1.for循环topic,读取topic表中存在多少表
- for topic in topics:
- # 读取每个topic中存在多少table
- sql = f"""
- SELECT get_json_object(ori_json,'$.table') table_name FROM {base_table}
- WHERE dt = '{dt}'
- AND topic = '{topic}'
- group by get_json_object(ori_json,'$.table')
- """
- tables = spark.query(sql)[0].collect()
- if not tables:
- pretty_print(f'{NORM_MGT}该topic: {topic} 在 {dt} 暂时没有数据 \n{NORM_GRN}')
- else:
- # 2.for循环读取每个table,获取hive中表的列信息
- for topic_table in tables:
- topic_table = topic_table[0].replace('ods_', 'ent_')
- exe_sql = get_execute_sql(topic_table, topic, dt)
- if exe_sql is not None:
- spark.query(exe_sql)
- def get_execute_sql(tbl, topic, dt):
- """
- Args:
- tbl: hive表名称
- Returns:
- 返回拼接select cols from tablename where dt = {dt} and topic = {topic} lateral view json_tuple(ori_json,cols) parse_json as cols
- """
- if not tbl:
- pretty_print(f'{NORM_MGT}参数异常 tbl = {tbl}\n{NORM_GRN}')
- return None
- sql = f'DESC ent_ods.{tbl}'
- # 解析show create table输出结果
- try:
- spark.query(sql)[0]
- except Exception as e:
- pretty_print(f'{NORM_MGT}未发现此表, 请建表 {tbl} 后执行\n{NORM_GRN}')
- # 异常信息写入钉钉
- response_text = f"异常播报:\n\t未发现ent_ods.{tbl} 数据表,请先建表后使用"
- dingtalk(response_text)
- else:
- # 解析字段描述信息,提取字段名称
- column_names = []
- for row in spark.query(sql)[0].collect():
- if row.col_name == "# Partition Information":
- break
- if row.col_name == "dt":
- continue
- # if row.col_name in ('_id', 'date', 'desc'):
- # column_names.append("`" + row.col_name + "`")
- else:
- column_names.append(row.col_name)
- # 拼接字段名称
- select_query = ",".join(column_names)
- lateral_select_query = ",".join([f"'{column}'" for column in column_names])
- kafka_table = tbl.replace('ent_', 'ods_')
- sql = f"""
- INSERT OVERWRITE TABLE ent_ods.{tbl} PARTITION (dt={dt})
- SELECT {select_query} FROM ( SELECT ori_json FROM {base_table}
- WHERE dt = '{dt}'
- and topic = '{topic}'
- and get_json_object(ori_json, '$.table') = '{kafka_table}' ) t
- LATERAL VIEW json_tuple(ori_json, {lateral_select_query}) parse_json AS {select_query}
- """
- return sql
- def dingtalk(response_text):
- """
- Args:
- response_text:写入钉钉机器人的内容
- Returns:无返回
- """
- webhook_url = 'http://m1.node.cdh/dingtalk/api/robot/send?access_token=166d3462282cb6382ef88e7b67d9e06903095172612d44d8a7b94b5ab96976e2'
- # 构建发送到钉钉机器人的 JSON 数据
- json_data = {
- "msgtype": "text",
- "text": {
- "content": response_text
- },
- "at": {
- "atMobiles": ['15333978057'],
- "isAtAll": False
- }
- }
- headers = {"Content-Type": "application/json"}
- # 发送 HTTP POST 请求到钉钉机器人
- response = requests.post(webhook_url, json=json_data, headers=headers)
- if __name__ == '__main__':
- # 解析命令行参数
- CONFIG, _ = parse_args(sys.argv[1:])
- base_table = CONFIG.get('base')
- topics = CONFIG.get('topics')
- dt = CONFIG.get('dt')
- table = CONFIG.get('table')
- if base_table is None or base_table == '':
- base_table = 'ent_raw.ent_crawler_base'
- if topics is None or topics == '':
- pretty_print(f'{NORM_MGT}请输入正确的topic名称!\n{NORM_GRN}')
- pretty_print(f'{NORM_MGT}-dt=topic1, topic2\n{NORM_GRN}')
- sys.exit()
- topics = topics.split(',')
- if dt is None:
- dt = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- if table:
- if len(topics) > 1:
- pretty_print(f'{NORM_MGT}当传入-table时, 必须只能传入一个-topic\n{NORM_GRN}')
- pretty_print(f'{NORM_MGT}-topic = {topics} -table = {table}\n{NORM_GRN}')
- sys.exit()
- # 执行插入脚本
- executor(topics, dt, table, base_table)
|