#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- """ -- 设置 SparkSession 名称(血缘分析) SET spark.app.name=excel_to_hive; -- 设置 Spark 配置 SET spark.xxx.yyy.zzz=xyz; -- 引用 UDF ADD FILE dw_base/udf/business/spark_xxx_udf.py; -- 声明变量 SET TOPIC=xxx; -- 查看数据行数 SET LIMIT=1000; """ import json import os import sys import argparse import re import pandas as pd from pyspark.sql import SparkSession base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(base_dir) class Excel2HiveUtil: def __init__(self): self.base_dir = base_dir def run(self, excel_position: str, db_table: str, topic: str, dt: str, skip_rows=0): spark = SparkSession.builder \ .appName("ExcelToHive") \ .master("yarn") \ .config('hive.exec.orc.default.block.size', 134217728) \ .config('spark.debug.maxToStringFields', 5000) \ .config('spark.dynamicAllocation.enabled', False) \ .config('spark.files.ignoreCorruptFiles', True) \ .config('spark.sql.adaptive.enabled', 'true') \ .config('spark.sql.broadcastTimeout', -1) \ .config('spark.sql.codegen.wholeStage', 'false') \ .config('spark.sql.execution.arrow.enabled', True) \ .config('spark.sql.execution.arrow.fallback.enabled', True) \ .config('spark.sql.files.ignoreCorruptFiles', True) \ .config('spark.sql.statistics.fallBackToHdfs', True) \ .config('hive.exec.dynamic.partition.mode', 'nonstrict') \ .config('spark.yarn.queue', "default") \ .enableHiveSupport().getOrCreate() if not excel_position.startswith('/'): full_path = os.path.join(self.base_dir, excel_position) else: full_path = excel_position # 读取 Excel 数据 excel_data = pd.read_excel(full_path, dtype=str, skiprows=skip_rows) excel_data.fillna(value='', inplace=True) json_data = excel_data.apply( lambda row: json.dumps({f'col{i + 1}': str(re.sub(r'[\n\t]', '', val)) for i, val in enumerate(row)}), axis=1) pandas_df = pd.DataFrame({'ori_json': json_data}) pandas_df['dt'] = dt pandas_df['topic'] = topic spark_df = spark.createDataFrame(pandas_df) # # Write data to Hive table with partitions # spark_df.write.mode('overwrite').partitionBy("dt", "topic").saveAsTable(db_table) spark_df.createOrReplaceTempView("temp_view") spark.sql( f""" INSERT OVERWRITE TABLE {db_table} PARTITION (dt='{dt}', topic='{topic}') SELECT ori_json FROM temp_view """) spark.stop() def usage(): print( f'Usage: {sys.argv[0]}\n' f' [-h/--help] 打印脚本使用方法\n' f' [-p/--path ] 要转换的 Excel 文件路径(必填)\n' f' [-t/--topic ] 要插入到 Hive 表中的 topic(必填)\n' f' [-s/--skip ] 要跳过的 Excel 文件的行数(可选,默认为 0)\n' f' [-d/--dt ] 要插入到 Hive 表中的日期(可选,默认为 19700101)\n' ) exit(1) def parse_args(): parser = argparse.ArgumentParser(description='Excel to Hive') parser.add_argument('-p', '--path', type=str, required=True, help='Path to the Excel file') parser.add_argument('-t', '--topic', type=str, required=True, help='Topic name to insert into Hive table') parser.add_argument('-s', '--skip', type=int, default=0, help='Number of rows to skip in the Excel file') parser.add_argument('-d', '--dt', type=str, default='19700101', help='Date to insert into Hive table, default is 19700101') args = parser.parse_args() config = { 'path': args.path, 'topic': args.topic, 'skip': args.skip, 'dt': args.dt } return config if __name__ == '__main__': config = parse_args() excel_position = config['path'] db_table = 'ent_raw.manual_import_data' topic = config['topic'] skip_rows = config['skip'] dt = config['dt'] Excel2HiveUtil().run(excel_position, db_table, topic, dt, skip_rows) print("================= Transfer completed! =======================")