from dw_base.spark.spark_sql import SparkSQL import pandas as pd import os spark = SparkSQL() def get_hive_cols(tbl): """ Args: tbl: hive表名称 Returns: 返回拼接select count(col*) from table union all select count(distinct col*) from table 字符串 """ if not tbl: print(f"参数异常 tbl = {tbl}") return None sql = f'SHOW CREATE TABLE {tbl}' # 解析show create table输出结果 create_table_statement = spark.query(sql)[0].collect()[0][0] fields_start_index = create_table_statement.find("(") + 1 fields_end_index = create_table_statement.rfind(")") fields_str = create_table_statement[fields_start_index:fields_end_index] fields_list = [field.split()[0] for field in fields_str.split(",")] new_fields_list = [item.replace("`", "") for item in fields_list] # 输出字段名称 select_query = ", ".join([f"COUNT({field}) as {field} " for field in new_fields_list]) dist_select_query = ", ".join([f"COUNT(DISTINCT {field}) as {field}" for field in new_fields_list]) # 字符串拼接 count_query = f"SELECT {select_query} FROM {tbl}" distinct_count_query = f"SELECT {dist_select_query} FROM {tbl}" return f"{count_query} UNION ALL {distinct_count_query}" def querySQLAndInsert2excel(sql, excel_file_path, sheet_name, mode): """ Args: sql: 传入需要跑批的结果,将结果写入到excel文件中 excel_file_path: 写入到linux工作目录的地址 sheet_name: excel的sheet Returns: """ if not sql: print(f"参数异常 sql = {sql}") return if not sheet_name: print(f"参数异常 sheet_name = {sheet_name}") return if not excel_file_path: print(f"参数异常 excel_file_path = {excel_file_path}") return # spark sql执行结果转化为pandas excel df_pandas = spark.query(sql)[0].toPandas() # 覆盖写入到linux指定工作目录 with pd.ExcelWriter(excel_file_path, mode=mode) as writer: df_pandas.to_excel(writer, sheet_name=sheet_name, index=False) def save2hdfs(file_path, hdfs_path): """ Args: file_path:源linux文件位置 hdfs_path: 写入hdfs位置 """ if file_path and hdfs_path: os.system(f"hadoop fs -put -f {file_path} {hdfs_path}") else: print(f"参数异常 file_path = {file_path} hdfs_path = {hdfs_path}") if __name__ == '__main__': file_name = "/home/dev005/tendata-warehouse/workspace/data/tables.txt" file_path = "/home/dev005/tendata-warehouse/workspace/data/cnt.xlsx" hdfs_path = "/user/dev005/workspace/" first_line = True # 设置标识,初始为 True 表示第一行 # 打开文件 with open(file_name, 'r') as file: # 逐行读取文件内容并进行循环处理 for line in file: # 获取每一行中表名称配置 table_name = line.strip() sheet_name = table_name.split('.', 1)[1][:31] if '.' in table_name else "未找到小数点" # 获取表中字段名称,并返回拼接字段 cols = get_hive_cols(table_name) # 当为第一行时,进行excel覆盖,其他行则为追加 if first_line: querySQLAndInsert2excel(cols, file_path, sheet_name, 'w') first_line = False else: querySQLAndInsert2excel(cols, file_path, sheet_name, 'a') save2hdfs(file_path, hdfs_path)