| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- from dw_base.spark.spark_sql import SparkSQL
- import pandas as pd
- import os
- spark = SparkSQL()
- def get_hive_cols(tbl):
- """
- Args:
- tbl: hive表名称
- Returns:
- 返回拼接select count(col*) from table union all select count(distinct col*) from table 字符串
- """
- if not tbl:
- print(f"参数异常 tbl = {tbl}")
- return None
- sql = f'SHOW CREATE TABLE {tbl}'
- # 解析show create table输出结果
- create_table_statement = spark.query(sql)[0].collect()[0][0]
- fields_start_index = create_table_statement.find("(") + 1
- fields_end_index = create_table_statement.rfind(")")
- fields_str = create_table_statement[fields_start_index:fields_end_index]
- fields_list = [field.split()[0] for field in fields_str.split(",")]
- new_fields_list = [item.replace("`", "") for item in fields_list]
- # 输出字段名称
- select_query = ", ".join([f"COUNT({field}) as {field} " for field in new_fields_list])
- dist_select_query = ", ".join([f"COUNT(DISTINCT {field}) as {field}" for field in new_fields_list])
- # 字符串拼接
- count_query = f"SELECT {select_query} FROM {tbl}"
- distinct_count_query = f"SELECT {dist_select_query} FROM {tbl}"
- return f"{count_query} UNION ALL {distinct_count_query}"
- def querySQLAndInsert2excel(sql, excel_file_path, sheet_name, mode):
- """
- Args:
- sql: 传入需要跑批的结果,将结果写入到excel文件中
- excel_file_path: 写入到linux工作目录的地址
- sheet_name: excel的sheet
- Returns:
- """
- if not sql:
- print(f"参数异常 sql = {sql}")
- return
- if not sheet_name:
- print(f"参数异常 sheet_name = {sheet_name}")
- return
- if not excel_file_path:
- print(f"参数异常 excel_file_path = {excel_file_path}")
- return
- # spark sql执行结果转化为pandas excel
- df_pandas = spark.query(sql)[0].toPandas()
- # 覆盖写入到linux指定工作目录
- with pd.ExcelWriter(excel_file_path, mode=mode) as writer:
- df_pandas.to_excel(writer, sheet_name=sheet_name, index=False)
- def save2hdfs(file_path, hdfs_path):
- """
- Args:
- file_path:源linux文件位置
- hdfs_path: 写入hdfs位置
- """
- if file_path and hdfs_path:
- os.system(f"hadoop fs -put -f {file_path} {hdfs_path}")
- else:
- print(f"参数异常 file_path = {file_path} hdfs_path = {hdfs_path}")
- if __name__ == '__main__':
- file_name = "/home/dev005/tendata-warehouse/workspace/data/tables.txt"
- file_path = "/home/dev005/tendata-warehouse/workspace/data/cnt.xlsx"
- hdfs_path = "/user/dev005/workspace/"
- first_line = True # 设置标识,初始为 True 表示第一行
- # 打开文件
- with open(file_name, 'r') as file:
- # 逐行读取文件内容并进行循环处理
- for line in file:
- # 获取每一行中表名称配置
- table_name = line.strip()
- sheet_name = table_name.split('.', 1)[1][:31] if '.' in table_name else "未找到小数点"
- # 获取表中字段名称,并返回拼接字段
- cols = get_hive_cols(table_name)
- # 当为第一行时,进行excel覆盖,其他行则为追加
- if first_line:
- querySQLAndInsert2excel(cols, file_path, sheet_name, 'w')
- first_line = False
- else:
- querySQLAndInsert2excel(cols, file_path, sheet_name, 'a')
- save2hdfs(file_path, hdfs_path)
|