import os import pandas as pd import re from pypinyin import lazy_pinyin from pyspark.sql import SparkSession # 对列类型进行特殊转换 # dtype={'注册号': str} dtype=None # dtype={'sygj': str,'hgbm_source': str} # 对hive表结构进行自定义指定 # hive_table_schema = "fj:string,hgbm:string,sygj:string,hgbm_source:string,hgbmms_en:string,hgbmms_cn:string,cphy_en:string,cphy_en1:string,cphy_cn:string,cpdl_cn:string,cpgg_en:string,cpgg_cn:string" hive_table_schema = None class Excel2HiveUtil: def __init__(self): base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) self.BASE_DIR = base_dir def create_hive_table(self,spark_session,db:str,table_name:str,sheet_name:str,columns:list): table_name = f"{self.chinese_to_pinyin(table_name,True)}_{self.chinese_to_pinyin(str(sheet_name),False).lower()}" # columns = [chinese_to_pinyin(col) for col in columns] table_name = re.sub("[^a-zA-Z0-9_]","",table_name) columns = [f"`{re.sub('[^a-zA-Z0-9_]', '',self.chinese_to_pinyin(col,False))}` string COMMENT '{col}'" for col in columns] col_str=",".join(columns) create_tbl_sql = f"CREATE TABLE IF NOT EXISTS {db}.{table_name} ({col_str}) STORED AS ORC" print("建表语句为:",create_tbl_sql) is_right = input("请检查建表语句,确认是否继续执行(y or n):") if is_right.strip().upper() == 'Y': pass elif is_right.strip().upper() == 'N': exit(1) else: print("输入不满足要求,结束任务") exit(1) # spark_session_name="task_"+db+"_"+table_name spark_session.sql(create_tbl_sql) return f"{db}.{table_name}" def chinese_to_pinyin(self,text,is_filter_digit:bool): if is_filter_digit: # 调用lazy_pinyin函数获取每个汉字对应的拼音列表 pinyins = [py for py in filter(str.isalpha, lazy_pinyin(text))] else: # 调用lazy_pinyin函数获取每个汉字对应的拼音列表 pinyins = [re.sub(r'(?<=^)[^a-zA-Z0-9]+|[^a-zA-Z0-9]+(?=$)', '', py) for py in lazy_pinyin(text)] return '_'.join(pinyins) def run(self,excel_position:str,hive_db:str='tmp',sheet_name_list=[0]): spark = SparkSession.builder \ .appName("ExcelToHive") \ .master("yarn") \ .config('hive.exec.orc.default.block.size', 134217728) \ .config('spark.debug.maxToStringFields', 5000) \ .config('spark.dynamicAllocation.enabled', False) \ .config('spark.files.ignoreCorruptFiles', True) \ .config('spark.sql.adaptive.enabled', 'true') \ .config('spark.sql.broadcastTimeout', -1) \ .config('spark.sql.codegen.wholeStage', 'false') \ .config('spark.sql.execution.arrow.enabled', True) \ .config('spark.sql.execution.arrow.fallback.enabled', True) \ .config('spark.sql.files.ignoreCorruptFiles', True) \ .config('spark.sql.statistics.fallBackToHdfs', True) \ .config('spark.yarn.queue', "default") \ .enableHiveSupport().getOrCreate() if excel_position.startswith('/'): full_path = excel_position else: full_path = os.path.join(self.BASE_DIR, excel_position) file_name = os.path.basename(full_path).replace('_', '').split('.')[0] if sheet_name_list is None: # 读取Excel文件 excel_data_dict = pd.read_excel(full_path, sheet_name=None,dtype=dtype) try: for sheet,excel_data in excel_data_dict.items(): excel_data.fillna(value='', inplace=True) db_tbl = self.create_hive_table(spark, hive_db, file_name, sheet, excel_data.columns) # 将pandas DataFrame转换为Spark DataFrame spark_df = spark.createDataFrame(excel_data, schema=hive_table_schema) # print(spark_df.schema) # spark_df.select("*").show() spark_df.write.insertInto(db_tbl, True) except Exception as e: print(e) exit(1) else: for sheet in sheet_name_list: try: # 读取Excel文件 excel_data = pd.read_excel(full_path, sheet_name=sheet,dtype=dtype) except Exception as e: print(e) else: excel_data.fillna(value='',inplace=True) # print(excel_data) db_tbl = self.create_hive_table(spark, hive_db, file_name, sheet,excel_data.columns) # 将pandas DataFrame转换为Spark DataFrame spark_df = spark.createDataFrame(excel_data,schema=hive_table_schema) # print(spark_df.schema) # spark_df.select("*").show() spark_df.write.insertInto(db_tbl, True) spark.stop() if __name__ == '__main__': excel_position_input = input("请输入你要转换的表相对路径或绝对路径:") excel_position = excel_position_input.strip() hive_db_input = input("请输入你要插入的hive库名,默认为[tmp]库:") hive_db = hive_db_input.strip() if not hive_db: hive_db = 'tmp' sheet_list_input = input("请输入你要转换的sheet列表(以英文逗号分隔),如果转换所有sheet请输入None,不输入默认只转换第一张sheet: ") if sheet_list_input.strip() == 'None': Excel2HiveUtil().run(excel_position,hive_db, None) elif sheet_list_input.strip() == '': Excel2HiveUtil().run(excel_position,hive_db) else: sheet_list = [i.strip() for i in sheet_list_input.split(",")] Excel2HiveUtil().run(excel_position,hive_db, sheet_list)