| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- import inspect
- import string
- import os
- import re
- import sys
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from importlib import import_module
- from pyspark.sql import SparkSession
- from dw_base.spark.udf.spark_common_udf import *
- def read_file(file_path:str)->(list,str):
- module_list = []
- sql_list = []
- with open(file_path, 'r') as file:
- for line in file:
- if line.strip().startswith("ADD FILE"):
- module = line.strip()[len("ADD FILE"):].split('.')[0].replace("/",".")
- print(f'将要导入的模块为: {module}')
- module_list.append(module.strip())
- elif line.strip().startswith("--"):
- continue
- else:
- sql_list.append(line)
- sql = " ".join(sql_list)
- return module_list,sql
- def register_udf(spark:SparkSession,udf_files:list):
- for f in udf_files:
- module = import_module(f)
- for name,value in inspect.getmembers(module):
- if inspect.isfunction(value):
- print(f'registing udf --> name is :{name} , value is :{value}')
- # spark.sparkContext.addPyFile("dw_base/spark/udf/spark_json_array_udf.py")
- spark.udf.register(name,value)
- def generate_random_string(length):
- # 生成包含大小写字母和数字的字符集
- characters = string.ascii_letters + string.digits
- # 生成指定长度的随机字符串
- random_string = ''.join(random.choice(characters) for i in range(length))
- return random_string
- class Hive2ExcelUtil:
- def __init__(self):
- base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- self.BASE_DIR = os.path.dirname(base_dir)
- self.spark = SparkSession.builder \
- .appName("HiveToExcel") \
- .master("local[4]") \
- .config('hive.exec.orc.default.block.size', 134217728) \
- .config('spark.debug.maxToStringFields', 5000) \
- .config('spark.dynamicAllocation.enabled', False) \
- .config('spark.files.ignoreCorruptFiles', True) \
- .config('spark.sql.adaptive.enabled', 'true') \
- .config('spark.sql.broadcastTimeout', -1) \
- .config('spark.sql.codegen.wholeStage', 'false') \
- .config('spark.sql.execution.arrow.enabled', True) \
- .config('spark.sql.execution.arrow.fallback.enabled', True) \
- .config('spark.sql.files.ignoreCorruptFiles', True) \
- .config('spark.sql.statistics.fallBackToHdfs', True) \
- .config('spark.sql.execution.arrow.enabled', False) \
- .config('spark.yarn.queue', "default") \
- .enableHiveSupport().getOrCreate()
- def transfer(self,excel_position: str, sql_file_position: str):
- excel_full_path = excel_position
- sql_full_path = sql_file_position
- if not excel_position.startswith("/"):
- excel_full_path = os.path.join(self.BASE_DIR, excel_position)
- print(f'生成的excel文件位置在: {excel_full_path}')
- if not sql_file_position.startswith("/"):
- sql_full_path = os.path.join(self.BASE_DIR, sql_file_position)
- print(f'生成的excel文件位置在: {sql_full_path}')
- module_list,sql = read_file(sql_full_path)
- print(f"正在执行的sql为: {sql}")
- register_udf(self.spark,module_list)
- pd_df = self.spark.sql(sql).toPandas()
- pd_df.to_excel(excel_full_path, index=False)
- if __name__ == '__main__':
- sql_file_position_input = input("请输入你的sql文件相对路径或绝对路径:")
- sql_file_position = sql_file_position_input.strip()
- excel_position_input = input("请输入你要生成的excel文件的相对路径或绝对路径:")
- excel_position = excel_position_input.strip()
- if excel_position.endswith("/"):
- excel_position = f'{excel_position}{generate_random_string(10)}_output.xlsx'
- else:
- excel_path_name = excel_position.split(".")[0]
- excel_position = f'{excel_path_name}.xlsx'
- Hive2ExcelUtil().transfer(excel_position,sql_file_position)
- print("=================transfer completed!=======================")
|