spark_parse_json_to_hive.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. import os
  2. import re
  3. from datetime import datetime, timedelta
  4. import requests
  5. import sys
  6. abspath = os.path.abspath(__file__)
  7. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  8. sys.path.append(root_path)
  9. from dw_base.utils.hdfs_merge_small_file import hdfs_estimate_num_partitions_absolute_path
  10. from dw_base.spark.spark_sql import SparkSQL
  11. from dw_base.utils.config_utils import parse_args
  12. from dw_base.utils.log_utils import pretty_print
  13. """
  14. author: HQL
  15. create_time:2024-04-28
  16. update_time:2024-04-28
  17. remarks:
  18. 该脚本用于将flume从kafka拉取到HDFS的文件,从ent_raw.ent_crawler_base表中,读取对应topic的表,自动将建过表的数据,抽取到相应表中
  19. -base:【可选】默认为 ent_raw.ent_crawler_base,kafka原始数据表得位置
  20. -topics:【必须】需要同步的topic,可以传入多个,使用逗号','分隔;例如-topics=topic1,topic2
  21. -dt:【可选】需要同步的日期,默认是当日;例如-dt=20240401
  22. -table:【可选】需要同步的数据表,当-table不为空时,只能传入一个topic;例如-table=table1
  23. """
  24. # 获取spark对象
  25. spark = SparkSQL()
  26. NORM_MGT: str = '\033[0;35m'
  27. NORM_GRN: str = '\033[0;32m'
  28. def executor(topics, dt, table, base_table):
  29. """
  30. Args:
  31. topics: hive表中kafka topic的名称
  32. dt: 数据同步日期
  33. table: 指定同步的数据表
  34. Returns:无返回,直接执行写入程序
  35. """
  36. # 0.当传入table不为空,直接执行当前表的插入
  37. if table:
  38. exe_sql = get_execute_sql(table, topics[0], dt)
  39. if exe_sql is not None:
  40. spark.query(exe_sql)
  41. else:
  42. # 1.for循环topic,读取topic表中存在多少表
  43. for topic in topics:
  44. # 读取每个topic中存在多少table
  45. sql = f"""
  46. SELECT get_json_object(ori_json,'$.table') table_name FROM {base_table}
  47. WHERE dt = '{dt}'
  48. AND topic = '{topic}'
  49. group by get_json_object(ori_json,'$.table')
  50. """
  51. tables = spark.query(sql)[0].collect()
  52. if not tables:
  53. pretty_print(f'{NORM_MGT}该topic: {topic} 在 {dt} 暂时没有数据 \n{NORM_GRN}')
  54. else:
  55. # 2.for循环读取每个table,获取hive中表的列信息
  56. for topic_table in tables:
  57. topic_table = topic_table[0].replace('ods_', 'ent_')
  58. exe_sql = get_execute_sql(topic_table, topic, dt)
  59. if exe_sql is not None:
  60. spark.query(exe_sql)
  61. def get_execute_sql(tbl, topic, dt):
  62. """
  63. Args:
  64. tbl: hive表名称
  65. Returns:
  66. 返回拼接select cols from tablename where dt = {dt} and topic = {topic} lateral view json_tuple(ori_json,cols) parse_json as cols
  67. """
  68. if not tbl:
  69. pretty_print(f'{NORM_MGT}参数异常 tbl = {tbl}\n{NORM_GRN}')
  70. return None
  71. sql = f'DESC ent_ods.{tbl}'
  72. # 解析show create table输出结果
  73. try:
  74. spark.query(sql)[0]
  75. except Exception as e:
  76. pretty_print(f'{NORM_MGT}未发现此表, 请建表 {tbl} 后执行\n{NORM_GRN}')
  77. # 异常信息写入钉钉
  78. response_text = f"异常播报:\n\t未发现ent_ods.{tbl} 数据表,请先建表后使用"
  79. dingtalk(response_text)
  80. else:
  81. # 解析字段描述信息,提取字段名称
  82. column_names = []
  83. for row in spark.query(sql)[0].collect():
  84. if row.col_name == "# Partition Information":
  85. break
  86. if row.col_name == "dt":
  87. continue
  88. # if row.col_name in ('_id', 'date', 'desc'):
  89. # column_names.append("`" + row.col_name + "`")
  90. else:
  91. column_names.append(row.col_name)
  92. # 拼接字段名称
  93. select_query = ",".join(column_names)
  94. lateral_select_query = ",".join([f"'{column}'" for column in column_names])
  95. kafka_table = tbl.replace('ent_', 'ods_')
  96. sql = f"""
  97. INSERT OVERWRITE TABLE ent_ods.{tbl} PARTITION (dt={dt})
  98. SELECT {select_query} FROM ( SELECT ori_json FROM {base_table}
  99. WHERE dt = '{dt}'
  100. and topic = '{topic}'
  101. and get_json_object(ori_json, '$.table') = '{kafka_table}' ) t
  102. LATERAL VIEW json_tuple(ori_json, {lateral_select_query}) parse_json AS {select_query}
  103. """
  104. return sql
  105. def dingtalk(response_text):
  106. """
  107. Args:
  108. response_text:写入钉钉机器人的内容
  109. Returns:无返回
  110. """
  111. webhook_url = 'http://m1.node.cdh/dingtalk/api/robot/send?access_token=166d3462282cb6382ef88e7b67d9e06903095172612d44d8a7b94b5ab96976e2'
  112. # 构建发送到钉钉机器人的 JSON 数据
  113. json_data = {
  114. "msgtype": "text",
  115. "text": {
  116. "content": response_text
  117. },
  118. "at": {
  119. "atMobiles": ['15333978057'],
  120. "isAtAll": False
  121. }
  122. }
  123. headers = {"Content-Type": "application/json"}
  124. # 发送 HTTP POST 请求到钉钉机器人
  125. response = requests.post(webhook_url, json=json_data, headers=headers)
  126. if __name__ == '__main__':
  127. # 解析命令行参数
  128. CONFIG, _ = parse_args(sys.argv[1:])
  129. base_table = CONFIG.get('base')
  130. topics = CONFIG.get('topics')
  131. dt = CONFIG.get('dt')
  132. table = CONFIG.get('table')
  133. if base_table is None or base_table == '':
  134. base_table = 'ent_raw.ent_crawler_base'
  135. if topics is None or topics == '':
  136. pretty_print(f'{NORM_MGT}请输入正确的topic名称!\n{NORM_GRN}')
  137. pretty_print(f'{NORM_MGT}-dt=topic1, topic2\n{NORM_GRN}')
  138. sys.exit()
  139. topics = topics.split(',')
  140. if dt is None:
  141. dt = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
  142. if table:
  143. if len(topics) > 1:
  144. pretty_print(f'{NORM_MGT}当传入-table时, 必须只能传入一个-topic\n{NORM_GRN}')
  145. pretty_print(f'{NORM_MGT}-topic = {topics} -table = {table}\n{NORM_GRN}')
  146. sys.exit()
  147. # 执行插入脚本
  148. executor(topics, dt, table, base_table)