import os import re import subprocess import sys abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from pyspark.sql import SparkSession from dw_base import NORM_MGT, NORM_GRN from dw_base.utils.log_utils import pretty_print """ author: HQL create_time:2024-05-13 update_time:2024-05-14 remarks: 该脚本内方法为了实现hdfs小文件文件数据合并的方案,其中有两种方式合并 1.merge_hdfs_file(db_name, table_name, partition_name, file_merge_size) 建议就是hive数据表方式(不支持外部表,外部表下面得绝对路径) db_name: 数据库名称 table_name: 表名称 partition_name: 分区目录,支持多级,写法为 例如dt=20240202/topic=aaaa file_merge_size: 合并文件大小,默认128MB 2.merge_hdfs_file_absolute_path(file_path, file_merge_size, file_type) 建议直接对文件绝对路径下的数据进行合并(例如:/user/dev005/workspace/flume_test/dt=20240418) file_path: 文件绝对路径 file_merge_size: 文件拆分大小 file_type: 文件类型,例如orc,parquet,text,默认text 3.scan_hdfs_in_hive_tbl(table_name) 传入指定数据库下表名(db_name.table_name),返回当前表优化建议和小文件数据量以及空间占用情况 如需实现全库扫描,需要自己写个show tables ,然后遍历scan_hdfs_in_hive_tbl方法传入表名(采用dfs语法,速度一般,还是建议运维直接拉mysql中hive元数据) """ def get_spark_session(): """ Returns:返回sparkSession """ session = SparkSession.builder \ .appName("collect_hdfs_file") \ .master("yarn") \ .config('spark.default.parallelism', 8) \ .config('spark.driver.cores', 1) \ .config('spark.driver.memory', '1g') \ .config('spark.executor.cores', 2) \ .config('spark.executor.instances', 4) \ .config('spark.executor.memory', '6g') \ .config('spark.sql.shuffle.partitions', '8') \ .config('spark.yarn.queue', 'spark') \ .config('spark.sql.hive.convertMetastoreOrc', 'false') return session def get_hive_mete(): df = spark.read.format("jdbc") \ .option("driver", "com.mysql.cj.jdbc.Driver") \ .option("url", "jdbc:mysql://m3:3306/hive") \ .option("dbtable", "TBLS") \ .option("user", "hive") \ .option("password", "Tendata_hive2024") \ .load() df.show(10) def get_hive_context(): """ Returns:返回 hiveContext对象 """ session = get_spark_session() hive = session.enableHiveSupport() \ .config('hive.input.format', 'org.apache.hadoop.hive.ql.io.CombineHiveInputFormat') \ .config('hive.exec.dynamic.partition.mode', 'nonstrict') \ .config('hive.exec.dynamic.partition', 'true') \ .getOrCreate() return hive spark = get_spark_session().getOrCreate() hive = get_hive_context() def hdfs_estimate_num_partitions(db_name, table_name, partition_name, file_merge_size): """ Args: db_name: 数据库名称 table_name: 表名称 partition_name: 分区目录信息 例如dt=20240202/topic=aaaa file_merge_size: 合并文件大小,默认128 Returns: 返回应该拆分的文件切片数 """ if file_merge_size is None or file_merge_size == '': file_merge_size = 128 if db_name is None or table_name is None or partition_name == '': pretty_print( f'{NORM_MGT}请输入正确的db_name:{db_name}, table_name:{table_name}, partition_name:{partition_name}\n{NORM_GRN}') sys.exit() if partition_name is None: pretty_print(f'{NORM_MGT}非分区数据表\n{NORM_GRN}') # warehouse_path 文件路径 default_warehouse_path = "/user/hive/warehouse/" # file_path 文件绝对路径位置 if partition_name is not None: file_path = default_warehouse_path + db_name + ".db/" + table_name + "/" + partition_name else: file_path = default_warehouse_path + db_name + ".db/" + table_name # 构建 Hadoop 命令 command = ["hadoop", "fs", "-du", "-s", file_path] # 执行命令 result = subprocess.check_output(command).decode("utf-8") # 解析命令输出,提取文件大小 file_size = result.strip().split()[0] pretty_print(f'{NORM_MGT}{file_path}未压缩前,当前路径下文件共计{file_size}byte\n{NORM_GRN}') # 按照fileMergeSize 传入大小 进行切分,获取应该切分多少片 if file_size == 0: return 0 else: return int(int(file_size) / file_merge_size / 1024 / 1024) + 1 def hdfs_estimate_num_partitions_absolute_path(file_path, file_merge_size=128): """ Args: file_path: 文件绝对路径 file_merge_size: 合并文件大小,默认128 Returns:返回应该拆分的文件切片数 """ # 构建 Hadoop 命令 command = ["hadoop", "fs", "-du", "-s", file_path] # 执行命令 result = subprocess.check_output(command).decode("utf-8") # 解析命令输出,提取文件大小 file_size = result.strip().split()[0] pretty_print(f'{NORM_MGT}{file_path}未压缩前,当前路径下文件共计{file_size}byte\n{NORM_GRN}') # 按照fileMergeSize 传入大小 进行切分,获取应该切分多少片 if file_size == 0: return 0 else: return int(int(file_size) / file_merge_size / 1024 / 1024) + 1 def merge_hdfs_file(db_name, table_name, partitions, file_merge_size): """ Args: db_name: 数据库名称 table_name: 表名称 partitions: 分区目录信息 例如dt=20240202/topic=aaaa file_merge_size: 合并文件大小,默认128 Returns:返回合并后文件个数 num_partitions """ if db_name is None or table_name is None or partitions == '': pretty_print( f'{NORM_MGT}请输入正确的db_name:{db_name}, table_name:{table_name}, partition_name:{partitions}\n{NORM_GRN}') sys.exit() # 获取hive on spark 配置环境 # 获取应该压缩多个数据分片 num_partitions = hdfs_estimate_num_partitions(db_name, table_name, partitions, file_merge_size) # 当解析分区内文件为0的时候,不进行文件切人 if num_partitions == 0: print(f"表{table_name}在分区{partitions}下没有文件数据,或文件已损坏") sys.exit() # 判断当是否分区表,分开处理 if partitions is not None: # 处理多级分区问题 partition_list = partitions.split('/') # 提取分区键值对 conditions = [segment.split('=') for segment in partition_list] # 提取分区键 keys = ','.join([segment.split('=')[0] for segment in partition_list]) # 构建条件语句 condition_str = ' and '.join([f"{key}='{value}'" for key, value in conditions]) # 组装 WHERE 子句 where_clause = f"WHERE {condition_str}" pretty_print( f'{NORM_MGT}分区表开始执行合并 db_name:{db_name}, table_name:{table_name}, partition={partitions}\n{NORM_GRN}') hive.sql(f"select * from {db_name}.{table_name} {where_clause} ").coalesce( num_partitions).createOrReplaceTempView("mid_table") hive.sql(f"INSERT OVERWRITE TABLE {db_name}.{table_name} PARTITION({keys}) select * from mid_table ") # hive.stop() pretty_print( f'{NORM_MGT}分区表结束合并文件, 共合并db_name:{db_name}, table_name:{table_name}, partition={partitions} 为{num_partitions}个文件\n{NORM_GRN}') return num_partitions else: pretty_print(f'{NORM_MGT}非分区表开始执行压缩 db_name:{db_name}, table_name:{table_name}\n{NORM_GRN}') hive.sql(f"select * from {db_name}.{table_name}").coalesce(num_partitions).createOrReplaceTempView("mid_table") hive.sql(f"INSERT OVERWRITE TABLE {db_name}.{table_name} select * from mid_table ") # hive.stop() pretty_print( f'{NORM_MGT}非分区表结束合并文件, 共合并db_name:{db_name}, table_name:{table_name} 为{num_partitions}个文件\n{NORM_GRN}') return num_partitions def hadoop_fs_test(hdfs_file): """ 检查 HDFS 目录下是否包含 指定目录或文件 Args: hdfs_file: HDFS 目录路径 Returns: bool: 如果存在,返回 True;否则返回 False """ try: # 检查 _SUCCESS 文件是否存在 result = subprocess.run(f"hadoop fs -test -e {hdfs_file}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return result.returncode == 0 except subprocess.CalledProcessError as e: print(f"执行命令时发生错误: {e}") return False def hadoop_fs_count(location_hdfs_path): try: output = subprocess.check_output(f"hadoop fs -count -q -v {location_hdfs_path}", shell=True).decode("utf-8") lines = output.strip().split("\n") if len(lines) >= 2: parts = lines[1].split() num_files = parts[5] data_size = parts[6] if int(num_files) >= 1: # 判断是否有_SUCCESS文件,需要排除 exists_success = hadoop_fs_test(f"{location_hdfs_path}/_SUCCESS") if exists_success: num_files = int(num_files) - 1 return num_files, data_size except subprocess.CalledProcessError as e: # 捕获文件为空的状态 print(f"当前文件为空 {location_hdfs_path}") return 0, 0 def merge_hdfs_file_absolute_path(file_path, file_type="text", file_merge_size=128, compression_type="none"): """ Args: compression_type: 压缩格式 file_path: 文件绝对路径 file_merge_size: 文件拆分大小 file_type: 文件类型,例如orc,parquet,text Returns:返回合并后文件个数 num_partitions """ if file_type not in ['orc', 'parquet', 'text']: pretty_print( f'{NORM_MGT}不支持的文件类型{file_type},目前支持orc,parquet,text\n{NORM_GRN}') return # 获取hdfs文件大小,并解析需要拆分多少数据切片 file_count, file_size = hadoop_fs_count(file_path) # 当解析分区内文件为0的时候,不进行文件切人 if file_count == 0: print(f"该路径{file_path}下没有文件数据,或文件已损坏") sys.exit() num_partitions = int(int(file_size) / file_merge_size / 1024 / 1024) + 1 if num_partitions == file_count: print(f"该路径{file_path}下文件数量合适,不需要合并小文件") return # 复写小文件到当前文件夹 new_file_path = file_path + "merge" # 读取原始HDFS文件路径 dataset = spark.read.format(file_type).load(file_path) pretty_print(f'{NORM_MGT}路径{file_path}文件合并开始, 预划分为{num_partitions}个文件\n{NORM_GRN}') # 重新写入到HDFS中,因目前无法直接覆盖当前文件夹数据,改为写入新文件,老文件进行删除 if num_partitions >= 3: dataset = dataset.repartition(num_partitions) else: dataset = dataset.coalesce(num_partitions) dataset.write \ .option("compression", compression_type) \ .save(path=new_file_path, format=file_type, mode="overwrite") # 执行命令删除文件 subprocess.run(f"hdfs dfs -rm -r {file_path}", shell=True) # 移动目录 subprocess.run(f"hdfs dfs -mv {new_file_path} {file_path}", shell=True) pretty_print(f'{NORM_MGT}路径{file_path}文件合并完成, 共压缩为{num_partitions}个文件\n{NORM_GRN}') # 关闭session # session.stop() return num_partitions def scan_dir_merge_file(scan_dir, file_type="text", file_merge_size=128, compression_type="gzip"): _dirs, _files = list_hdfs_directory_contents(scan_dir) print(f"当前目录{scan_dir}下包含以下目录: {_dirs}") print(f"当前目录{scan_dir}下包含以下文件: {_files}") if len(_dirs) != 0: for _dir in _dirs: print(f"{'*' * 20 } 开始合并目录{_dir}下的小文件 {'*' * 20 }") merge_hdfs_file_absolute_path(file_path=_dir, file_type=file_type, file_merge_size=file_merge_size, compression_type=compression_type) elif len(_files) != 0: print(f"当前目录{scan_dir}下没有子目录,合并当前目录下的文件") merge_hdfs_file_absolute_path(scan_dir, file_type=file_type, file_merge_size=file_merge_size, compression_type=compression_type) else: print(f"当前目录{scan_dir}下没有文件,跳过合并") def scan_hdfs_in_hive_tbl(tbl_name): """ Args: tbl_name: 表名称 指定表名称,扫描该表下面得文件个数,以及文件总大小,建议文件个数等 如果需要去全库扫描,需要自己写一个show tables然后遍历该方法,就是太消耗资源了 Returns: """ # 获取表的详细信息 table_info = hive.sql(f"DESCRIBE FORMATTED {tbl_name}") # 获取表文档目录(分区表的根目录) table_location = str(table_info.filter("col_name = 'Location'").select("data_type").collect()[0][0])[19:] # 检查表是否是分区表 is_partitioned = any(row["col_name"] == "# Partition Information" for row in table_info.collect()) if is_partitioned: # 获取每个分区的空间占用和小文件个数 partitions = hive.sql(f"SHOW PARTITIONS {tbl_name}").select("partition").collect() # 遍历分区集合 for partition_row in partitions: partition = partition_row["partition"] location_name = table_location + "/" + partition # 获取hdfs元数据信息 num_files, data_size = hadoop_fs_count(location_name) if num_files != '0' and data_size != '0': # 获取小于20M得小文件数据 small_file_cnt = count_small_files(get_hdfs_file_sizes(location_name), 20 * 1024 * 1024) # 返回分区表名、分区、空间占用和小文件个数 return { "表名": tbl_name, "分区": partition, "空间占用": data_size + "byte≈" + str(round(int(data_size) / 1024 / 1024, 3)) + "Mb", "当前文件个数": num_files, "小于20Mb文件个数": small_file_cnt, "建议设置文件个数": int(int(data_size) / 128 / 1024 / 1024) + 1 } else: return { "表名": tbl_name, "分区": partition, "空间占用": 0, "当前文件个数": 0, "小于20Mb文件个数": 0, "建议设置文件个数": "无建议,当前分区无数据" } else: # 获取hdfs元数据信息 num_files, data_size = hadoop_fs_count(table_location) if num_files != '0' and data_size != '0': # 获取小于20M得小文件数据 small_file_cnt = count_small_files(get_hdfs_file_sizes(table_location), 20 * 1024 * 1024) # 返回表名、空间占用和小文件个数 return { "表名": tbl_name, "分区": "无分区", "空间占用": data_size + "byte≈" + str(round(int(data_size) / 1024 / 1024, 3)) + "Mb", "当前文件个数": num_files, "小于20Mb文件个数": small_file_cnt, "建议设置文件个数": int(int(data_size) / 128 / 1024 / 1024) + 1 } else: return { "表名": tbl_name, "分区": "无分区", "空间占用": 0, "当前文件个数": 0, "小于20Mb文件个数": 0, "建议设置文件个数": "无建议,当前表无数据" } # spark.stop() def get_hdfs_file_sizes(path): """ Args: path:HDFS文件路径 Returns:文件路径以及大小集合 """ # 执行 hadoop fs -du 命令获取文件大小信息 command = f'hadoop fs -du {path}' result = subprocess.check_output(command, shell=True).decode("utf-8") # 解析命令输出,获取每个文件的大小 file_sizes = {} lines = result.strip().split("\n") if not any(lines): return 0 else: for line in lines: line_split = line.split(" ") file_sizes[line_split[2]] = line_split[0] return file_sizes def count_small_files(file_sizes, threshold): """ Args: file_sizes: 文件路径以及大小集合 threshold: 指定阈值小文件大小 Returns:返回小于阈值得小文件个数 """ # 统计小于指定阈值的文件数量 if file_sizes == 0: return 0 else: count = 0 for file_path, size in file_sizes.items(): if int(size) < threshold: count += 1 return count def scan_hdfs_in_db(db_name): # 查找指定数据库得全部数据表 db_tables = hive.sql(f"SHOW TABLES IN {db_name}").collect() # 对数据进行遍历 for table in db_tables: print(scan_hdfs_in_hive_tbl(f"{db_name}.{table.tableName}")) hive.stop() spark.stop() def list_hdfs_directory_contents(hdfs_path): """ 查询HDFS目录下的所有目录名称和文件名称(非递归模式) Args: hdfs_path (str): HDFS目录路径 Returns: tuple: (dirs, files) 其中dirs是目录列表,files是文件列表 """ try: # 调用hadoop fs -ls命令 output = subprocess.check_output(f"hadoop fs -ls {hdfs_path}", shell=True).decode("utf-8") lines = output.strip().split("\n") dirs = [] files = [] for line in lines: parts = line.split() if len(parts) >= 8: path = parts[-1] if parts[0].startswith("d"): # 目录 dirs.append(path) else: # 文件 files.append(path) return dirs, files except subprocess.CalledProcessError as e: print(f"Error executing HDFS command: {e}") return [], [] if __name__ == '__main__': # nums = hdfs_estimate_num_partitions('ent_ods', 'ent_globiz_mg_companies', '19700101', 128) # print(nums) # collect_hdfs_file_test('20240512', 'ent_raw', 'ent_crawler_base', 'ent_bing_crawler') # merge_hdfs_file('tmp', 'ent_shh_api_company_logs_tmp2', None, 128) # merge_hdfs_file_absolute_path("/user/hive/warehouse/tmp.db/cts_peru_cpdl_fill", 128,'text') # print(scan_hdfs_in_hive_tbl('tmp.country_mapping_514')) scan_hdfs_in_db('tmp') # merge_hdfs_file('ent_dwd','ent_venezuela_biz_basic','dt=19700101',128) # get_hive_mete()