spark_read_hive_columns_cnt.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. from dw_base.spark.spark_sql import SparkSQL
  2. import pandas as pd
  3. import os
  4. spark = SparkSQL()
  5. def get_hive_cols(tbl):
  6. """
  7. Args:
  8. tbl: hive表名称
  9. Returns:
  10. 返回拼接select count(col*) from table union all select count(distinct col*) from table 字符串
  11. """
  12. if not tbl:
  13. print(f"参数异常 tbl = {tbl}")
  14. return None
  15. sql = f'SHOW CREATE TABLE {tbl}'
  16. # 解析show create table输出结果
  17. create_table_statement = spark.query(sql)[0].collect()[0][0]
  18. fields_start_index = create_table_statement.find("(") + 1
  19. fields_end_index = create_table_statement.rfind(")")
  20. fields_str = create_table_statement[fields_start_index:fields_end_index]
  21. fields_list = [field.split()[0] for field in fields_str.split(",")]
  22. new_fields_list = [item.replace("`", "") for item in fields_list]
  23. # 输出字段名称
  24. select_query = ", ".join([f"COUNT({field}) as {field} " for field in new_fields_list])
  25. dist_select_query = ", ".join([f"COUNT(DISTINCT {field}) as {field}" for field in new_fields_list])
  26. # 字符串拼接
  27. count_query = f"SELECT {select_query} FROM {tbl}"
  28. distinct_count_query = f"SELECT {dist_select_query} FROM {tbl}"
  29. return f"{count_query} UNION ALL {distinct_count_query}"
  30. def querySQLAndInsert2excel(sql, excel_file_path, sheet_name, mode):
  31. """
  32. Args:
  33. sql: 传入需要跑批的结果,将结果写入到excel文件中
  34. excel_file_path: 写入到linux工作目录的地址
  35. sheet_name: excel的sheet
  36. Returns:
  37. """
  38. if not sql:
  39. print(f"参数异常 sql = {sql}")
  40. return
  41. if not sheet_name:
  42. print(f"参数异常 sheet_name = {sheet_name}")
  43. return
  44. if not excel_file_path:
  45. print(f"参数异常 excel_file_path = {excel_file_path}")
  46. return
  47. # spark sql执行结果转化为pandas excel
  48. df_pandas = spark.query(sql)[0].toPandas()
  49. # 覆盖写入到linux指定工作目录
  50. with pd.ExcelWriter(excel_file_path, mode=mode) as writer:
  51. df_pandas.to_excel(writer, sheet_name=sheet_name, index=False)
  52. def save2hdfs(file_path, hdfs_path):
  53. """
  54. Args:
  55. file_path:源linux文件位置
  56. hdfs_path: 写入hdfs位置
  57. """
  58. if file_path and hdfs_path:
  59. os.system(f"hadoop fs -put -f {file_path} {hdfs_path}")
  60. else:
  61. print(f"参数异常 file_path = {file_path} hdfs_path = {hdfs_path}")
  62. if __name__ == '__main__':
  63. file_name = "/home/dev005/tendata-warehouse/workspace/data/tables.txt"
  64. file_path = "/home/dev005/tendata-warehouse/workspace/data/cnt.xlsx"
  65. hdfs_path = "/user/dev005/workspace/"
  66. first_line = True # 设置标识,初始为 True 表示第一行
  67. # 打开文件
  68. with open(file_name, 'r') as file:
  69. # 逐行读取文件内容并进行循环处理
  70. for line in file:
  71. # 获取每一行中表名称配置
  72. table_name = line.strip()
  73. sheet_name = table_name.split('.', 1)[1][:31] if '.' in table_name else "未找到小数点"
  74. # 获取表中字段名称,并返回拼接字段
  75. cols = get_hive_cols(table_name)
  76. # 当为第一行时,进行excel覆盖,其他行则为追加
  77. if first_line:
  78. querySQLAndInsert2excel(cols, file_path, sheet_name, 'w')
  79. first_line = False
  80. else:
  81. querySQLAndInsert2excel(cols, file_path, sheet_name, 'a')
  82. save2hdfs(file_path, hdfs_path)