import inspect import string import os import re import sys abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from importlib import import_module from pyspark.sql import SparkSession from dw_base.spark.udf.spark_common_udf import * def read_file(file_path:str)->(list,str): module_list = [] sql_list = [] with open(file_path, 'r') as file: for line in file: if line.strip().startswith("ADD FILE"): module = line.strip()[len("ADD FILE"):].split('.')[0].replace("/",".") print(f'将要导入的模块为: {module}') module_list.append(module.strip()) elif line.strip().startswith("--"): continue else: sql_list.append(line) sql = " ".join(sql_list) return module_list,sql def register_udf(spark:SparkSession,udf_files:list): for f in udf_files: module = import_module(f) for name,value in inspect.getmembers(module): if inspect.isfunction(value): print(f'registing udf --> name is :{name} , value is :{value}') # spark.sparkContext.addPyFile("dw_base/spark/udf/spark_json_array_udf.py") spark.udf.register(name,value) def generate_random_string(length): # 生成包含大小写字母和数字的字符集 characters = string.ascii_letters + string.digits # 生成指定长度的随机字符串 random_string = ''.join(random.choice(characters) for i in range(length)) return random_string class Hive2ExcelUtil: def __init__(self): base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) self.BASE_DIR = os.path.dirname(base_dir) self.spark = SparkSession.builder \ .appName("HiveToExcel") \ .master("local[4]") \ .config('hive.exec.orc.default.block.size', 134217728) \ .config('spark.debug.maxToStringFields', 5000) \ .config('spark.dynamicAllocation.enabled', False) \ .config('spark.files.ignoreCorruptFiles', True) \ .config('spark.sql.adaptive.enabled', 'true') \ .config('spark.sql.broadcastTimeout', -1) \ .config('spark.sql.codegen.wholeStage', 'false') \ .config('spark.sql.execution.arrow.enabled', True) \ .config('spark.sql.execution.arrow.fallback.enabled', True) \ .config('spark.sql.files.ignoreCorruptFiles', True) \ .config('spark.sql.statistics.fallBackToHdfs', True) \ .config('spark.sql.execution.arrow.enabled', False) \ .config('spark.yarn.queue', "default") \ .enableHiveSupport().getOrCreate() def transfer(self,excel_position: str, sql_file_position: str): excel_full_path = excel_position sql_full_path = sql_file_position if not excel_position.startswith("/"): excel_full_path = os.path.join(self.BASE_DIR, excel_position) print(f'生成的excel文件位置在: {excel_full_path}') if not sql_file_position.startswith("/"): sql_full_path = os.path.join(self.BASE_DIR, sql_file_position) print(f'生成的excel文件位置在: {sql_full_path}') module_list,sql = read_file(sql_full_path) print(f"正在执行的sql为: {sql}") register_udf(self.spark,module_list) pd_df = self.spark.sql(sql).toPandas() pd_df.to_excel(excel_full_path, index=False) if __name__ == '__main__': sql_file_position_input = input("请输入你的sql文件相对路径或绝对路径:") sql_file_position = sql_file_position_input.strip() excel_position_input = input("请输入你要生成的excel文件的相对路径或绝对路径:") excel_position = excel_position_input.strip() if excel_position.endswith("/"): excel_position = f'{excel_position}{generate_random_string(10)}_output.xlsx' else: excel_path_name = excel_position.split(".")[0] excel_position = f'{excel_path_name}.xlsx' Hive2ExcelUtil().transfer(excel_position,sql_file_position) print("=================transfer completed!=======================")