excel_to_hive_utils.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import os
  2. import pandas as pd
  3. import re
  4. from pypinyin import lazy_pinyin
  5. from pyspark.sql import SparkSession
  6. # 对列类型进行特殊转换
  7. # dtype={'注册号': str}
  8. dtype=None
  9. # dtype={'sygj': str,'hgbm_source': str}
  10. # 对hive表结构进行自定义指定
  11. # hive_table_schema = "fj:string,hgbm:string,sygj:string,hgbm_source:string,hgbmms_en:string,hgbmms_cn:string,cphy_en:string,cphy_en1:string,cphy_cn:string,cpdl_cn:string,cpgg_en:string,cpgg_cn:string"
  12. hive_table_schema = None
  13. class Excel2HiveUtil:
  14. def __init__(self):
  15. base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  16. self.BASE_DIR = base_dir
  17. def create_hive_table(self,spark_session,db:str,table_name:str,sheet_name:str,columns:list):
  18. table_name = f"{self.chinese_to_pinyin(table_name,True)}_{self.chinese_to_pinyin(str(sheet_name),False).lower()}"
  19. # columns = [chinese_to_pinyin(col) for col in columns]
  20. table_name = re.sub("[^a-zA-Z0-9_]","",table_name)
  21. columns = [f"`{re.sub('[^a-zA-Z0-9_]', '',self.chinese_to_pinyin(col,False))}` string COMMENT '{col}'" for col in columns]
  22. col_str=",".join(columns)
  23. create_tbl_sql = f"CREATE TABLE IF NOT EXISTS {db}.{table_name} ({col_str}) STORED AS ORC"
  24. print("建表语句为:",create_tbl_sql)
  25. is_right = input("请检查建表语句,确认是否继续执行(y or n):")
  26. if is_right.strip().upper() == 'Y':
  27. pass
  28. elif is_right.strip().upper() == 'N':
  29. exit(1)
  30. else:
  31. print("输入不满足要求,结束任务")
  32. exit(1)
  33. # spark_session_name="task_"+db+"_"+table_name
  34. spark_session.sql(create_tbl_sql)
  35. return f"{db}.{table_name}"
  36. def chinese_to_pinyin(self,text,is_filter_digit:bool):
  37. if is_filter_digit:
  38. # 调用lazy_pinyin函数获取每个汉字对应的拼音列表
  39. pinyins = [py for py in filter(str.isalpha, lazy_pinyin(text))]
  40. else:
  41. # 调用lazy_pinyin函数获取每个汉字对应的拼音列表
  42. pinyins = [re.sub(r'(?<=^)[^a-zA-Z0-9]+|[^a-zA-Z0-9]+(?=$)', '', py) for py in lazy_pinyin(text)]
  43. return '_'.join(pinyins)
  44. def run(self,excel_position:str,hive_db:str='tmp',sheet_name_list=[0]):
  45. spark = SparkSession.builder \
  46. .appName("ExcelToHive") \
  47. .master("yarn") \
  48. .config('hive.exec.orc.default.block.size', 134217728) \
  49. .config('spark.debug.maxToStringFields', 5000) \
  50. .config('spark.dynamicAllocation.enabled', False) \
  51. .config('spark.files.ignoreCorruptFiles', True) \
  52. .config('spark.sql.adaptive.enabled', 'true') \
  53. .config('spark.sql.broadcastTimeout', -1) \
  54. .config('spark.sql.codegen.wholeStage', 'false') \
  55. .config('spark.sql.execution.arrow.enabled', True) \
  56. .config('spark.sql.execution.arrow.fallback.enabled', True) \
  57. .config('spark.sql.files.ignoreCorruptFiles', True) \
  58. .config('spark.sql.statistics.fallBackToHdfs', True) \
  59. .config('spark.yarn.queue', "default") \
  60. .enableHiveSupport().getOrCreate()
  61. if excel_position.startswith('/'):
  62. full_path = excel_position
  63. else:
  64. full_path = os.path.join(self.BASE_DIR, excel_position)
  65. file_name = os.path.basename(full_path).replace('_', '').split('.')[0]
  66. if sheet_name_list is None:
  67. # 读取Excel文件
  68. excel_data_dict = pd.read_excel(full_path, sheet_name=None,dtype=dtype)
  69. try:
  70. for sheet,excel_data in excel_data_dict.items():
  71. excel_data.fillna(value='', inplace=True)
  72. db_tbl = self.create_hive_table(spark, hive_db, file_name, sheet, excel_data.columns)
  73. # 将pandas DataFrame转换为Spark DataFrame
  74. spark_df = spark.createDataFrame(excel_data, schema=hive_table_schema)
  75. # print(spark_df.schema)
  76. # spark_df.select("*").show()
  77. spark_df.write.insertInto(db_tbl, True)
  78. except Exception as e:
  79. print(e)
  80. exit(1)
  81. else:
  82. for sheet in sheet_name_list:
  83. try:
  84. # 读取Excel文件
  85. excel_data = pd.read_excel(full_path, sheet_name=sheet,dtype=dtype)
  86. except Exception as e:
  87. print(e)
  88. else:
  89. excel_data.fillna(value='',inplace=True)
  90. # print(excel_data)
  91. db_tbl = self.create_hive_table(spark, hive_db, file_name, sheet,excel_data.columns)
  92. # 将pandas DataFrame转换为Spark DataFrame
  93. spark_df = spark.createDataFrame(excel_data,schema=hive_table_schema)
  94. # print(spark_df.schema)
  95. # spark_df.select("*").show()
  96. spark_df.write.insertInto(db_tbl, True)
  97. spark.stop()
  98. if __name__ == '__main__':
  99. excel_position_input = input("请输入你要转换的表相对路径或绝对路径:")
  100. excel_position = excel_position_input.strip()
  101. hive_db_input = input("请输入你要插入的hive库名,默认为[tmp]库:")
  102. hive_db = hive_db_input.strip()
  103. if not hive_db:
  104. hive_db = 'tmp'
  105. sheet_list_input = input("请输入你要转换的sheet列表(以英文逗号分隔),如果转换所有sheet请输入None,不输入默认只转换第一张sheet: ")
  106. if sheet_list_input.strip() == 'None':
  107. Excel2HiveUtil().run(excel_position,hive_db, None)
  108. elif sheet_list_input.strip() == '':
  109. Excel2HiveUtil().run(excel_position,hive_db)
  110. else:
  111. sheet_list = [i.strip() for i in sheet_list_input.split(",")]
  112. Excel2HiveUtil().run(excel_position,hive_db, sheet_list)