excel_to_hive.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. -- 设置 SparkSession 名称(血缘分析)
  5. SET spark.app.name=excel_to_hive;
  6. -- 设置 Spark 配置
  7. SET spark.xxx.yyy.zzz=xyz;
  8. -- 引用 UDF
  9. ADD FILE dw_base/spark/udf/spark_xxx_udf.py;
  10. -- 声明变量
  11. SET TOPIC=xxx;
  12. -- 查看数据行数
  13. SET LIMIT=1000;
  14. """
  15. import json
  16. import os
  17. import sys
  18. import argparse
  19. import re
  20. import pandas as pd
  21. from pyspark.sql import SparkSession
  22. base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  23. sys.path.append(base_dir)
  24. class Excel2HiveUtil:
  25. def __init__(self):
  26. self.base_dir = base_dir
  27. def run(self, excel_position: str, db_table: str, topic: str, dt: str, skip_rows=0):
  28. spark = SparkSession.builder \
  29. .appName("ExcelToHive") \
  30. .master("yarn") \
  31. .config('hive.exec.orc.default.block.size', 134217728) \
  32. .config('spark.debug.maxToStringFields', 5000) \
  33. .config('spark.dynamicAllocation.enabled', False) \
  34. .config('spark.files.ignoreCorruptFiles', True) \
  35. .config('spark.sql.adaptive.enabled', 'true') \
  36. .config('spark.sql.broadcastTimeout', -1) \
  37. .config('spark.sql.codegen.wholeStage', 'false') \
  38. .config('spark.sql.execution.arrow.enabled', True) \
  39. .config('spark.sql.execution.arrow.fallback.enabled', True) \
  40. .config('spark.sql.files.ignoreCorruptFiles', True) \
  41. .config('spark.sql.statistics.fallBackToHdfs', True) \
  42. .config('hive.exec.dynamic.partition.mode', 'nonstrict') \
  43. .config('spark.yarn.queue', "default") \
  44. .enableHiveSupport().getOrCreate()
  45. if not excel_position.startswith('/'):
  46. full_path = os.path.join(self.base_dir, excel_position)
  47. else:
  48. full_path = excel_position
  49. # 读取 Excel 数据
  50. excel_data = pd.read_excel(full_path, dtype=str, skiprows=skip_rows)
  51. excel_data.fillna(value='', inplace=True)
  52. json_data = excel_data.apply(
  53. lambda row: json.dumps({f'col{i + 1}': str(re.sub(r'[\n\t]', '', val)) for i, val in enumerate(row)}),
  54. axis=1)
  55. pandas_df = pd.DataFrame({'ori_json': json_data})
  56. pandas_df['dt'] = dt
  57. pandas_df['topic'] = topic
  58. spark_df = spark.createDataFrame(pandas_df)
  59. # # Write data to Hive table with partitions
  60. # spark_df.write.mode('overwrite').partitionBy("dt", "topic").saveAsTable(db_table)
  61. spark_df.createOrReplaceTempView("temp_view")
  62. spark.sql(
  63. f"""
  64. INSERT OVERWRITE TABLE {db_table} PARTITION (dt='{dt}', topic='{topic}')
  65. SELECT ori_json FROM temp_view
  66. """)
  67. spark.stop()
  68. def usage():
  69. print(
  70. f'Usage: {sys.argv[0]}\n'
  71. f' [-h/--help] 打印脚本使用方法\n'
  72. f' [-p/--path <excel_file_path>] 要转换的 Excel 文件路径(必填)\n'
  73. f' [-t/--topic <topic>] 要插入到 Hive 表中的 topic(必填)\n'
  74. f' [-s/--skip <skip_rows>] 要跳过的 Excel 文件的行数(可选,默认为 0)\n'
  75. f' [-d/--dt <date>] 要插入到 Hive 表中的日期(可选,默认为 19700101)\n'
  76. )
  77. exit(1)
  78. def parse_args():
  79. parser = argparse.ArgumentParser(description='Excel to Hive')
  80. parser.add_argument('-p', '--path', type=str, required=True, help='Path to the Excel file')
  81. parser.add_argument('-t', '--topic', type=str, required=True, help='Topic name to insert into Hive table')
  82. parser.add_argument('-s', '--skip', type=int, default=0, help='Number of rows to skip in the Excel file')
  83. parser.add_argument('-d', '--dt', type=str, default='19700101',
  84. help='Date to insert into Hive table, default is 19700101')
  85. args = parser.parse_args()
  86. config = {
  87. 'path': args.path,
  88. 'topic': args.topic,
  89. 'skip': args.skip,
  90. 'dt': args.dt
  91. }
  92. return config
  93. if __name__ == '__main__':
  94. config = parse_args()
  95. excel_position = config['path']
  96. db_table = 'ent_raw.manual_import_data'
  97. topic = config['topic']
  98. skip_rows = config['skip']
  99. dt = config['dt']
  100. Excel2HiveUtil().run(excel_position, db_table, topic, dt, skip_rows)
  101. print("================= Transfer completed! =======================")