hive_to_excel_utils.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import inspect
  2. import string
  3. import os
  4. import re
  5. import sys
  6. abspath = os.path.abspath(__file__)
  7. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  8. sys.path.append(root_path)
  9. from importlib import import_module
  10. from pyspark.sql import SparkSession
  11. from dw_base.spark.udf.spark_common_udf import *
  12. def read_file(file_path:str)->(list,str):
  13. module_list = []
  14. sql_list = []
  15. with open(file_path, 'r') as file:
  16. for line in file:
  17. if line.strip().startswith("ADD FILE"):
  18. module = line.strip()[len("ADD FILE"):].split('.')[0].replace("/",".")
  19. print(f'将要导入的模块为: {module}')
  20. module_list.append(module.strip())
  21. elif line.strip().startswith("--"):
  22. continue
  23. else:
  24. sql_list.append(line)
  25. sql = " ".join(sql_list)
  26. return module_list,sql
  27. def register_udf(spark:SparkSession,udf_files:list):
  28. for f in udf_files:
  29. module = import_module(f)
  30. for name,value in inspect.getmembers(module):
  31. if inspect.isfunction(value):
  32. print(f'registing udf --> name is :{name} , value is :{value}')
  33. # spark.sparkContext.addPyFile("dw_base/spark/udf/spark_json_array_udf.py")
  34. spark.udf.register(name,value)
  35. def generate_random_string(length):
  36. # 生成包含大小写字母和数字的字符集
  37. characters = string.ascii_letters + string.digits
  38. # 生成指定长度的随机字符串
  39. random_string = ''.join(random.choice(characters) for i in range(length))
  40. return random_string
  41. class Hive2ExcelUtil:
  42. def __init__(self):
  43. base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  44. self.BASE_DIR = os.path.dirname(base_dir)
  45. self.spark = SparkSession.builder \
  46. .appName("HiveToExcel") \
  47. .master("local[4]") \
  48. .config('hive.exec.orc.default.block.size', 134217728) \
  49. .config('spark.debug.maxToStringFields', 5000) \
  50. .config('spark.dynamicAllocation.enabled', False) \
  51. .config('spark.files.ignoreCorruptFiles', True) \
  52. .config('spark.sql.adaptive.enabled', 'true') \
  53. .config('spark.sql.broadcastTimeout', -1) \
  54. .config('spark.sql.codegen.wholeStage', 'false') \
  55. .config('spark.sql.execution.arrow.enabled', True) \
  56. .config('spark.sql.execution.arrow.fallback.enabled', True) \
  57. .config('spark.sql.files.ignoreCorruptFiles', True) \
  58. .config('spark.sql.statistics.fallBackToHdfs', True) \
  59. .config('spark.sql.execution.arrow.enabled', False) \
  60. .config('spark.yarn.queue', "default") \
  61. .enableHiveSupport().getOrCreate()
  62. def transfer(self,excel_position: str, sql_file_position: str):
  63. excel_full_path = excel_position
  64. sql_full_path = sql_file_position
  65. if not excel_position.startswith("/"):
  66. excel_full_path = os.path.join(self.BASE_DIR, excel_position)
  67. print(f'生成的excel文件位置在: {excel_full_path}')
  68. if not sql_file_position.startswith("/"):
  69. sql_full_path = os.path.join(self.BASE_DIR, sql_file_position)
  70. print(f'生成的excel文件位置在: {sql_full_path}')
  71. module_list,sql = read_file(sql_full_path)
  72. print(f"正在执行的sql为: {sql}")
  73. register_udf(self.spark,module_list)
  74. pd_df = self.spark.sql(sql).toPandas()
  75. pd_df.to_excel(excel_full_path, index=False)
  76. if __name__ == '__main__':
  77. sql_file_position_input = input("请输入你的sql文件相对路径或绝对路径:")
  78. sql_file_position = sql_file_position_input.strip()
  79. excel_position_input = input("请输入你要生成的excel文件的相对路径或绝对路径:")
  80. excel_position = excel_position_input.strip()
  81. if excel_position.endswith("/"):
  82. excel_position = f'{excel_position}{generate_random_string(10)}_output.xlsx'
  83. else:
  84. excel_path_name = excel_position.split(".")[0]
  85. excel_position = f'{excel_path_name}.xlsx'
  86. Hive2ExcelUtil().transfer(excel_position,sql_file_position)
  87. print("=================transfer completed!=======================")