| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- """
- -- 设置 SparkSession 名称(血缘分析)
- SET spark.app.name=excel_to_hive;
- -- 设置 Spark 配置
- SET spark.xxx.yyy.zzz=xyz;
- -- 引用 UDF
- ADD FILE dw_base/spark/udf/spark_xxx_udf.py;
- -- 声明变量
- SET TOPIC=xxx;
- -- 查看数据行数
- SET LIMIT=1000;
- """
- import json
- import os
- import sys
- import argparse
- import re
- import pandas as pd
- from pyspark.sql import SparkSession
- base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(base_dir)
- class Excel2HiveUtil:
- def __init__(self):
- self.base_dir = base_dir
- def run(self, excel_position: str, db_table: str, topic: str, dt: str, skip_rows=0):
- spark = SparkSession.builder \
- .appName("ExcelToHive") \
- .master("yarn") \
- .config('hive.exec.orc.default.block.size', 134217728) \
- .config('spark.debug.maxToStringFields', 5000) \
- .config('spark.dynamicAllocation.enabled', False) \
- .config('spark.files.ignoreCorruptFiles', True) \
- .config('spark.sql.adaptive.enabled', 'true') \
- .config('spark.sql.broadcastTimeout', -1) \
- .config('spark.sql.codegen.wholeStage', 'false') \
- .config('spark.sql.execution.arrow.enabled', True) \
- .config('spark.sql.execution.arrow.fallback.enabled', True) \
- .config('spark.sql.files.ignoreCorruptFiles', True) \
- .config('spark.sql.statistics.fallBackToHdfs', True) \
- .config('hive.exec.dynamic.partition.mode', 'nonstrict') \
- .config('spark.yarn.queue', "default") \
- .enableHiveSupport().getOrCreate()
- if not excel_position.startswith('/'):
- full_path = os.path.join(self.base_dir, excel_position)
- else:
- full_path = excel_position
- # 读取 Excel 数据
- excel_data = pd.read_excel(full_path, dtype=str, skiprows=skip_rows)
- excel_data.fillna(value='', inplace=True)
- json_data = excel_data.apply(
- lambda row: json.dumps({f'col{i + 1}': str(re.sub(r'[\n\t]', '', val)) for i, val in enumerate(row)}),
- axis=1)
- pandas_df = pd.DataFrame({'ori_json': json_data})
- pandas_df['dt'] = dt
- pandas_df['topic'] = topic
- spark_df = spark.createDataFrame(pandas_df)
- # # Write data to Hive table with partitions
- # spark_df.write.mode('overwrite').partitionBy("dt", "topic").saveAsTable(db_table)
- spark_df.createOrReplaceTempView("temp_view")
- spark.sql(
- f"""
- INSERT OVERWRITE TABLE {db_table} PARTITION (dt='{dt}', topic='{topic}')
- SELECT ori_json FROM temp_view
- """)
- spark.stop()
- def usage():
- print(
- f'Usage: {sys.argv[0]}\n'
- f' [-h/--help] 打印脚本使用方法\n'
- f' [-p/--path <excel_file_path>] 要转换的 Excel 文件路径(必填)\n'
- f' [-t/--topic <topic>] 要插入到 Hive 表中的 topic(必填)\n'
- f' [-s/--skip <skip_rows>] 要跳过的 Excel 文件的行数(可选,默认为 0)\n'
- f' [-d/--dt <date>] 要插入到 Hive 表中的日期(可选,默认为 19700101)\n'
- )
- exit(1)
- def parse_args():
- parser = argparse.ArgumentParser(description='Excel to Hive')
- parser.add_argument('-p', '--path', type=str, required=True, help='Path to the Excel file')
- parser.add_argument('-t', '--topic', type=str, required=True, help='Topic name to insert into Hive table')
- parser.add_argument('-s', '--skip', type=int, default=0, help='Number of rows to skip in the Excel file')
- parser.add_argument('-d', '--dt', type=str, default='19700101',
- help='Date to insert into Hive table, default is 19700101')
- args = parser.parse_args()
- config = {
- 'path': args.path,
- 'topic': args.topic,
- 'skip': args.skip,
- 'dt': args.dt
- }
- return config
- if __name__ == '__main__':
- config = parse_args()
- excel_position = config['path']
- db_table = 'ent_raw.manual_import_data'
- topic = config['topic']
- skip_rows = config['skip']
- dt = config['dt']
- Excel2HiveUtil().run(excel_position, db_table, topic, dt, skip_rows)
- print("================= Transfer completed! =======================")
|