| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
- import os
- import re
- import subprocess
- import sys
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"poyee-data-warehouse.*", "poyee-data-warehouse", abspath)
- sys.path.append(root_path)
- from pyspark.sql import SparkSession
- from dw_base import NORM_MGT, NORM_GRN
- from dw_base.utils.log_utils import pretty_print
- """
- author: HQL
- create_time:2024-05-13
- update_time:2024-05-14
- remarks: 该脚本内方法为了实现hdfs小文件文件数据合并的方案,其中有两种方式合并
- 1.merge_hdfs_file(db_name, table_name, partition_name, file_merge_size) 建议就是hive数据表方式(不支持外部表,外部表下面得绝对路径)
- db_name: 数据库名称
- table_name: 表名称
- partition_name: 分区目录,支持多级,写法为 例如dt=20240202/topic=aaaa
- file_merge_size: 合并文件大小,默认128MB
- 2.merge_hdfs_file_absolute_path(file_path, file_merge_size, file_type) 建议直接对文件绝对路径下的数据进行合并(例如:/user/dev005/workspace/flume_test/dt=20240418)
- file_path: 文件绝对路径
- file_merge_size: 文件拆分大小
- file_type: 文件类型,例如orc,parquet,text,默认text
- 3.scan_hdfs_in_hive_tbl(table_name) 传入指定数据库下表名(db_name.table_name),返回当前表优化建议和小文件数据量以及空间占用情况
- 如需实现全库扫描,需要自己写个show tables ,然后遍历scan_hdfs_in_hive_tbl方法传入表名(采用dfs语法,速度一般,还是建议运维直接拉mysql中hive元数据)
- """
- def get_spark_session():
- """
- Returns:返回sparkSession
- """
- session = SparkSession.builder \
- .appName("collect_hdfs_file") \
- .master("yarn") \
- .config('spark.default.parallelism', 8) \
- .config('spark.driver.cores', 1) \
- .config('spark.driver.memory', '1g') \
- .config('spark.executor.cores', 2) \
- .config('spark.executor.instances', 4) \
- .config('spark.executor.memory', '6g') \
- .config('spark.sql.shuffle.partitions', '8') \
- .config('spark.yarn.queue', 'spark') \
- .config('spark.sql.hive.convertMetastoreOrc', 'false')
- return session
- def get_hive_mete():
- df = spark.read.format("jdbc") \
- .option("driver", "com.mysql.cj.jdbc.Driver") \
- .option("url", "jdbc:mysql://m3:3306/hive") \
- .option("dbtable", "TBLS") \
- .option("user", "hive") \
- .option("password", "Tendata_hive2024") \
- .load()
- df.show(10)
- def get_hive_context():
- """
- Returns:返回 hiveContext对象
- """
- session = get_spark_session()
- hive = session.enableHiveSupport() \
- .config('hive.input.format', 'org.apache.hadoop.hive.ql.io.CombineHiveInputFormat') \
- .config('hive.exec.dynamic.partition.mode', 'nonstrict') \
- .config('hive.exec.dynamic.partition', 'true') \
- .getOrCreate()
- return hive
- spark = get_spark_session().getOrCreate()
- hive = get_hive_context()
- def hdfs_estimate_num_partitions(db_name, table_name, partition_name, file_merge_size):
- """
- Args:
- db_name: 数据库名称
- table_name: 表名称
- partition_name: 分区目录信息 例如dt=20240202/topic=aaaa
- file_merge_size: 合并文件大小,默认128
- Returns: 返回应该拆分的文件切片数
- """
- if file_merge_size is None or file_merge_size == '':
- file_merge_size = 128
- if db_name is None or table_name is None or partition_name == '':
- pretty_print(
- f'{NORM_MGT}请输入正确的db_name:{db_name}, table_name:{table_name}, partition_name:{partition_name}\n{NORM_GRN}')
- sys.exit()
- if partition_name is None:
- pretty_print(f'{NORM_MGT}非分区数据表\n{NORM_GRN}')
- # warehouse_path 文件路径
- default_warehouse_path = "/user/hive/warehouse/"
- # file_path 文件绝对路径位置
- if partition_name is not None:
- file_path = default_warehouse_path + db_name + ".db/" + table_name + "/" + partition_name
- else:
- file_path = default_warehouse_path + db_name + ".db/" + table_name
- # 构建 Hadoop 命令
- command = ["hadoop", "fs", "-du", "-s", file_path]
- # 执行命令
- result = subprocess.check_output(command).decode("utf-8")
- # 解析命令输出,提取文件大小
- file_size = result.strip().split()[0]
- pretty_print(f'{NORM_MGT}{file_path}未压缩前,当前路径下文件共计{file_size}byte\n{NORM_GRN}')
- # 按照fileMergeSize 传入大小 进行切分,获取应该切分多少片
- if file_size == 0:
- return 0
- else:
- return int(int(file_size) / file_merge_size / 1024 / 1024) + 1
- def hdfs_estimate_num_partitions_absolute_path(file_path, file_merge_size=128):
- """
- Args:
- file_path: 文件绝对路径
- file_merge_size: 合并文件大小,默认128
- Returns:返回应该拆分的文件切片数
- """
- # 构建 Hadoop 命令
- command = ["hadoop", "fs", "-du", "-s", file_path]
- # 执行命令
- result = subprocess.check_output(command).decode("utf-8")
- # 解析命令输出,提取文件大小
- file_size = result.strip().split()[0]
- pretty_print(f'{NORM_MGT}{file_path}未压缩前,当前路径下文件共计{file_size}byte\n{NORM_GRN}')
- # 按照fileMergeSize 传入大小 进行切分,获取应该切分多少片
- if file_size == 0:
- return 0
- else:
- return int(int(file_size) / file_merge_size / 1024 / 1024) + 1
- def merge_hdfs_file(db_name, table_name, partitions, file_merge_size):
- """
- Args:
- db_name: 数据库名称
- table_name: 表名称
- partitions: 分区目录信息 例如dt=20240202/topic=aaaa
- file_merge_size: 合并文件大小,默认128
- Returns:返回合并后文件个数 num_partitions
- """
- if db_name is None or table_name is None or partitions == '':
- pretty_print(
- f'{NORM_MGT}请输入正确的db_name:{db_name}, table_name:{table_name}, partition_name:{partitions}\n{NORM_GRN}')
- sys.exit()
- # 获取hive on spark 配置环境
- # 获取应该压缩多个数据分片
- num_partitions = hdfs_estimate_num_partitions(db_name, table_name, partitions, file_merge_size)
- # 当解析分区内文件为0的时候,不进行文件切人
- if num_partitions == 0:
- print(f"表{table_name}在分区{partitions}下没有文件数据,或文件已损坏")
- sys.exit()
- # 判断当是否分区表,分开处理
- if partitions is not None:
- # 处理多级分区问题
- partition_list = partitions.split('/')
- # 提取分区键值对
- conditions = [segment.split('=') for segment in partition_list]
- # 提取分区键
- keys = ','.join([segment.split('=')[0] for segment in partition_list])
- # 构建条件语句
- condition_str = ' and '.join([f"{key}='{value}'" for key, value in conditions])
- # 组装 WHERE 子句
- where_clause = f"WHERE {condition_str}"
- pretty_print(
- f'{NORM_MGT}分区表开始执行合并 db_name:{db_name}, table_name:{table_name}, partition={partitions}\n{NORM_GRN}')
- hive.sql(f"select * from {db_name}.{table_name} {where_clause} ").coalesce(
- num_partitions).createOrReplaceTempView("mid_table")
- hive.sql(f"INSERT OVERWRITE TABLE {db_name}.{table_name} PARTITION({keys}) select * from mid_table ")
- # hive.stop()
- pretty_print(
- f'{NORM_MGT}分区表结束合并文件, 共合并db_name:{db_name}, table_name:{table_name}, partition={partitions} 为{num_partitions}个文件\n{NORM_GRN}')
- return num_partitions
- else:
- pretty_print(f'{NORM_MGT}非分区表开始执行压缩 db_name:{db_name}, table_name:{table_name}\n{NORM_GRN}')
- hive.sql(f"select * from {db_name}.{table_name}").coalesce(num_partitions).createOrReplaceTempView("mid_table")
- hive.sql(f"INSERT OVERWRITE TABLE {db_name}.{table_name} select * from mid_table ")
- # hive.stop()
- pretty_print(
- f'{NORM_MGT}非分区表结束合并文件, 共合并db_name:{db_name}, table_name:{table_name} 为{num_partitions}个文件\n{NORM_GRN}')
- return num_partitions
- def hadoop_fs_test(hdfs_file):
- """
- 检查 HDFS 目录下是否包含 指定目录或文件
- Args:
- hdfs_file: HDFS 目录路径
- Returns:
- bool: 如果存在,返回 True;否则返回 False
- """
- try:
- # 检查 _SUCCESS 文件是否存在
- result = subprocess.run(f"hadoop fs -test -e {hdfs_file}", shell=True,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- return result.returncode == 0
- except subprocess.CalledProcessError as e:
- print(f"执行命令时发生错误: {e}")
- return False
- def hadoop_fs_count(location_hdfs_path):
- try:
- output = subprocess.check_output(f"hadoop fs -count -q -v {location_hdfs_path}", shell=True).decode("utf-8")
- lines = output.strip().split("\n")
- if len(lines) >= 2:
- parts = lines[1].split()
- num_files = parts[5]
- data_size = parts[6]
- if int(num_files) >= 1:
- # 判断是否有_SUCCESS文件,需要排除
- exists_success = hadoop_fs_test(f"{location_hdfs_path}/_SUCCESS")
- if exists_success:
- num_files = int(num_files) - 1
- return num_files, data_size
- except subprocess.CalledProcessError as e:
- # 捕获文件为空的状态
- print(f"当前文件为空 {location_hdfs_path}")
- return 0, 0
- def merge_hdfs_file_absolute_path(file_path, file_type="text", file_merge_size=128, compression_type="none"):
- """
- Args:
- compression_type: 压缩格式
- file_path: 文件绝对路径
- file_merge_size: 文件拆分大小
- file_type: 文件类型,例如orc,parquet,text
- Returns:返回合并后文件个数 num_partitions
- """
- if file_type not in ['orc', 'parquet', 'text']:
- pretty_print(
- f'{NORM_MGT}不支持的文件类型{file_type},目前支持orc,parquet,text\n{NORM_GRN}')
- return
- # 获取hdfs文件大小,并解析需要拆分多少数据切片
- file_count, file_size = hadoop_fs_count(file_path)
- # 当解析分区内文件为0的时候,不进行文件切人
- if file_count == 0:
- print(f"该路径{file_path}下没有文件数据,或文件已损坏")
- sys.exit()
- num_partitions = int(int(file_size) / file_merge_size / 1024 / 1024) + 1
- if num_partitions == file_count:
- print(f"该路径{file_path}下文件数量合适,不需要合并小文件")
- return
- # 复写小文件到当前文件夹
- new_file_path = file_path + "merge"
- # 读取原始HDFS文件路径
- dataset = spark.read.format(file_type).load(file_path)
- pretty_print(f'{NORM_MGT}路径{file_path}文件合并开始, 预划分为{num_partitions}个文件\n{NORM_GRN}')
- # 重新写入到HDFS中,因目前无法直接覆盖当前文件夹数据,改为写入新文件,老文件进行删除
- if num_partitions >= 3:
- dataset = dataset.repartition(num_partitions)
- else:
- dataset = dataset.coalesce(num_partitions)
- dataset.write \
- .option("compression", compression_type) \
- .save(path=new_file_path, format=file_type, mode="overwrite")
- # 执行命令删除文件
- subprocess.run(f"hdfs dfs -rm -r {file_path}", shell=True)
- # 移动目录
- subprocess.run(f"hdfs dfs -mv {new_file_path} {file_path}", shell=True)
- pretty_print(f'{NORM_MGT}路径{file_path}文件合并完成, 共压缩为{num_partitions}个文件\n{NORM_GRN}')
- # 关闭session
- # session.stop()
- return num_partitions
- def scan_dir_merge_file(scan_dir, file_type="text", file_merge_size=128, compression_type="gzip"):
- _dirs, _files = list_hdfs_directory_contents(scan_dir)
- print(f"当前目录{scan_dir}下包含以下目录: {_dirs}")
- print(f"当前目录{scan_dir}下包含以下文件: {_files}")
- if len(_dirs) != 0:
- for _dir in _dirs:
- print(f"{'*' * 20 } 开始合并目录{_dir}下的小文件 {'*' * 20 }")
- merge_hdfs_file_absolute_path(file_path=_dir, file_type=file_type, file_merge_size=file_merge_size,
- compression_type=compression_type)
- elif len(_files) != 0:
- print(f"当前目录{scan_dir}下没有子目录,合并当前目录下的文件")
- merge_hdfs_file_absolute_path(scan_dir, file_type=file_type, file_merge_size=file_merge_size,
- compression_type=compression_type)
- else:
- print(f"当前目录{scan_dir}下没有文件,跳过合并")
- def scan_hdfs_in_hive_tbl(tbl_name):
- """
- Args:
- tbl_name: 表名称
- 指定表名称,扫描该表下面得文件个数,以及文件总大小,建议文件个数等
- 如果需要去全库扫描,需要自己写一个show tables然后遍历该方法,就是太消耗资源了
- Returns:
- """
- # 获取表的详细信息
- table_info = hive.sql(f"DESCRIBE FORMATTED {tbl_name}")
- # 获取表文档目录(分区表的根目录)
- table_location = str(table_info.filter("col_name = 'Location'").select("data_type").collect()[0][0])[19:]
- # 检查表是否是分区表
- is_partitioned = any(row["col_name"] == "# Partition Information" for row in table_info.collect())
- if is_partitioned:
- # 获取每个分区的空间占用和小文件个数
- partitions = hive.sql(f"SHOW PARTITIONS {tbl_name}").select("partition").collect()
- # 遍历分区集合
- for partition_row in partitions:
- partition = partition_row["partition"]
- location_name = table_location + "/" + partition
- # 获取hdfs元数据信息
- num_files, data_size = hadoop_fs_count(location_name)
- if num_files != '0' and data_size != '0':
- # 获取小于20M得小文件数据
- small_file_cnt = count_small_files(get_hdfs_file_sizes(location_name), 20 * 1024 * 1024)
- # 返回分区表名、分区、空间占用和小文件个数
- return {
- "表名": tbl_name,
- "分区": partition,
- "空间占用": data_size + "byte≈" + str(round(int(data_size) / 1024 / 1024, 3)) + "Mb",
- "当前文件个数": num_files,
- "小于20Mb文件个数": small_file_cnt,
- "建议设置文件个数": int(int(data_size) / 128 / 1024 / 1024) + 1
- }
- else:
- return {
- "表名": tbl_name,
- "分区": partition,
- "空间占用": 0,
- "当前文件个数": 0,
- "小于20Mb文件个数": 0,
- "建议设置文件个数": "无建议,当前分区无数据"
- }
- else:
- # 获取hdfs元数据信息
- num_files, data_size = hadoop_fs_count(table_location)
- if num_files != '0' and data_size != '0':
- # 获取小于20M得小文件数据
- small_file_cnt = count_small_files(get_hdfs_file_sizes(table_location), 20 * 1024 * 1024)
- # 返回表名、空间占用和小文件个数
- return {
- "表名": tbl_name,
- "分区": "无分区",
- "空间占用": data_size + "byte≈" + str(round(int(data_size) / 1024 / 1024, 3)) + "Mb",
- "当前文件个数": num_files,
- "小于20Mb文件个数": small_file_cnt,
- "建议设置文件个数": int(int(data_size) / 128 / 1024 / 1024) + 1
- }
- else:
- return {
- "表名": tbl_name,
- "分区": "无分区",
- "空间占用": 0,
- "当前文件个数": 0,
- "小于20Mb文件个数": 0,
- "建议设置文件个数": "无建议,当前表无数据"
- }
- # spark.stop()
- def get_hdfs_file_sizes(path):
- """
- Args:
- path:HDFS文件路径
- Returns:文件路径以及大小集合
- """
- # 执行 hadoop fs -du 命令获取文件大小信息
- command = f'hadoop fs -du {path}'
- result = subprocess.check_output(command, shell=True).decode("utf-8")
- # 解析命令输出,获取每个文件的大小
- file_sizes = {}
- lines = result.strip().split("\n")
- if not any(lines):
- return 0
- else:
- for line in lines:
- line_split = line.split(" ")
- file_sizes[line_split[2]] = line_split[0]
- return file_sizes
- def count_small_files(file_sizes, threshold):
- """
- Args:
- file_sizes: 文件路径以及大小集合
- threshold: 指定阈值小文件大小
- Returns:返回小于阈值得小文件个数
- """
- # 统计小于指定阈值的文件数量
- if file_sizes == 0:
- return 0
- else:
- count = 0
- for file_path, size in file_sizes.items():
- if int(size) < threshold:
- count += 1
- return count
- def scan_hdfs_in_db(db_name):
- # 查找指定数据库得全部数据表
- db_tables = hive.sql(f"SHOW TABLES IN {db_name}").collect()
- # 对数据进行遍历
- for table in db_tables:
- print(scan_hdfs_in_hive_tbl(f"{db_name}.{table.tableName}"))
- hive.stop()
- spark.stop()
- def list_hdfs_directory_contents(hdfs_path):
- """
- 查询HDFS目录下的所有目录名称和文件名称(非递归模式)
- Args:
- hdfs_path (str): HDFS目录路径
- Returns:
- tuple: (dirs, files) 其中dirs是目录列表,files是文件列表
- """
- try:
- # 调用hadoop fs -ls命令
- output = subprocess.check_output(f"hadoop fs -ls {hdfs_path}", shell=True).decode("utf-8")
- lines = output.strip().split("\n")
- dirs = []
- files = []
- for line in lines:
- parts = line.split()
- if len(parts) >= 8:
- path = parts[-1]
- if parts[0].startswith("d"): # 目录
- dirs.append(path)
- else: # 文件
- files.append(path)
- return dirs, files
- except subprocess.CalledProcessError as e:
- print(f"Error executing HDFS command: {e}")
- return [], []
- if __name__ == '__main__':
- # nums = hdfs_estimate_num_partitions('ent_ods', 'ent_globiz_mg_companies', '19700101', 128)
- # print(nums)
- # collect_hdfs_file_test('20240512', 'ent_raw', 'ent_crawler_base', 'ent_bing_crawler')
- # merge_hdfs_file('tmp', 'ent_shh_api_company_logs_tmp2', None, 128)
- # merge_hdfs_file_absolute_path("/user/hive/warehouse/tmp.db/cts_peru_cpdl_fill", 128,'text')
- # print(scan_hdfs_in_hive_tbl('tmp.country_mapping_514'))
- scan_hdfs_in_db('tmp')
- # merge_hdfs_file('ent_dwd','ent_venezuela_biz_basic','dt=19700101',128)
- # get_hive_mete()
|