| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import os
- import pandas as pd
- import re
- from pypinyin import lazy_pinyin
- from pyspark.sql import SparkSession
- # 对列类型进行特殊转换
- # dtype={'注册号': str}
- dtype=None
- # dtype={'sygj': str,'hgbm_source': str}
- # 对hive表结构进行自定义指定
- # hive_table_schema = "fj:string,hgbm:string,sygj:string,hgbm_source:string,hgbmms_en:string,hgbmms_cn:string,cphy_en:string,cphy_en1:string,cphy_cn:string,cpdl_cn:string,cpgg_en:string,cpgg_cn:string"
- hive_table_schema = None
- class Excel2HiveUtil:
- def __init__(self):
- base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- self.BASE_DIR = base_dir
- def create_hive_table(self,spark_session,db:str,table_name:str,sheet_name:str,columns:list):
- table_name = f"{self.chinese_to_pinyin(table_name,True)}_{self.chinese_to_pinyin(str(sheet_name),False).lower()}"
- # columns = [chinese_to_pinyin(col) for col in columns]
- table_name = re.sub("[^a-zA-Z0-9_]","",table_name)
- columns = [f"`{re.sub('[^a-zA-Z0-9_]', '',self.chinese_to_pinyin(col,False))}` string COMMENT '{col}'" for col in columns]
- col_str=",".join(columns)
- create_tbl_sql = f"CREATE TABLE IF NOT EXISTS {db}.{table_name} ({col_str}) STORED AS ORC"
- print("建表语句为:",create_tbl_sql)
- is_right = input("请检查建表语句,确认是否继续执行(y or n):")
- if is_right.strip().upper() == 'Y':
- pass
- elif is_right.strip().upper() == 'N':
- exit(1)
- else:
- print("输入不满足要求,结束任务")
- exit(1)
- # spark_session_name="task_"+db+"_"+table_name
- spark_session.sql(create_tbl_sql)
- return f"{db}.{table_name}"
- def chinese_to_pinyin(self,text,is_filter_digit:bool):
- if is_filter_digit:
- # 调用lazy_pinyin函数获取每个汉字对应的拼音列表
- pinyins = [py for py in filter(str.isalpha, lazy_pinyin(text))]
- else:
- # 调用lazy_pinyin函数获取每个汉字对应的拼音列表
- pinyins = [re.sub(r'(?<=^)[^a-zA-Z0-9]+|[^a-zA-Z0-9]+(?=$)', '', py) for py in lazy_pinyin(text)]
- return '_'.join(pinyins)
- def run(self,excel_position:str,hive_db:str='tmp',sheet_name_list=[0]):
- spark = SparkSession.builder \
- .appName("ExcelToHive") \
- .master("yarn") \
- .config('hive.exec.orc.default.block.size', 134217728) \
- .config('spark.debug.maxToStringFields', 5000) \
- .config('spark.dynamicAllocation.enabled', False) \
- .config('spark.files.ignoreCorruptFiles', True) \
- .config('spark.sql.adaptive.enabled', 'true') \
- .config('spark.sql.broadcastTimeout', -1) \
- .config('spark.sql.codegen.wholeStage', 'false') \
- .config('spark.sql.execution.arrow.enabled', True) \
- .config('spark.sql.execution.arrow.fallback.enabled', True) \
- .config('spark.sql.files.ignoreCorruptFiles', True) \
- .config('spark.sql.statistics.fallBackToHdfs', True) \
- .config('spark.yarn.queue', "default") \
- .enableHiveSupport().getOrCreate()
- if excel_position.startswith('/'):
- full_path = excel_position
- else:
- full_path = os.path.join(self.BASE_DIR, excel_position)
- file_name = os.path.basename(full_path).replace('_', '').split('.')[0]
- if sheet_name_list is None:
- # 读取Excel文件
- excel_data_dict = pd.read_excel(full_path, sheet_name=None,dtype=dtype)
- try:
- for sheet,excel_data in excel_data_dict.items():
- excel_data.fillna(value='', inplace=True)
- db_tbl = self.create_hive_table(spark, hive_db, file_name, sheet, excel_data.columns)
- # 将pandas DataFrame转换为Spark DataFrame
- spark_df = spark.createDataFrame(excel_data, schema=hive_table_schema)
- # print(spark_df.schema)
- # spark_df.select("*").show()
- spark_df.write.insertInto(db_tbl, True)
- except Exception as e:
- print(e)
- exit(1)
- else:
- for sheet in sheet_name_list:
- try:
- # 读取Excel文件
- excel_data = pd.read_excel(full_path, sheet_name=sheet,dtype=dtype)
- except Exception as e:
- print(e)
- else:
- excel_data.fillna(value='',inplace=True)
- # print(excel_data)
- db_tbl = self.create_hive_table(spark, hive_db, file_name, sheet,excel_data.columns)
- # 将pandas DataFrame转换为Spark DataFrame
- spark_df = spark.createDataFrame(excel_data,schema=hive_table_schema)
- # print(spark_df.schema)
- # spark_df.select("*").show()
- spark_df.write.insertInto(db_tbl, True)
- spark.stop()
- if __name__ == '__main__':
- excel_position_input = input("请输入你要转换的表相对路径或绝对路径:")
- excel_position = excel_position_input.strip()
- hive_db_input = input("请输入你要插入的hive库名,默认为[tmp]库:")
- hive_db = hive_db_input.strip()
- if not hive_db:
- hive_db = 'tmp'
- sheet_list_input = input("请输入你要转换的sheet列表(以英文逗号分隔),如果转换所有sheet请输入None,不输入默认只转换第一张sheet: ")
- if sheet_list_input.strip() == 'None':
- Excel2HiveUtil().run(excel_position,hive_db, None)
- elif sheet_list_input.strip() == '':
- Excel2HiveUtil().run(excel_position,hive_db)
- else:
- sheet_list = [i.strip() for i in sheet_list_input.split(",")]
- Excel2HiveUtil().run(excel_position,hive_db, sheet_list)
|