| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- # encoding: utf8
- import sys
- import argparse
- """
- 输入hdfs目录、存储类型、合并文件大小、压缩类型:
- 1、如果该目录下是二级目录,则遍历每个二级目录,每个二级目录中的小文件做合并
- 2、如果该目录下是文件,则合并该目录下的小文件
- """
- def parse_arguments():
- parser = argparse.ArgumentParser(description="Merge small files in HDFS")
- # hdfs路径,可以输入多个。例如:--path /tmp/a /tmp/b /tmp/c
- parser.add_argument('--path', type=str, nargs='+', help='Directory to scan for files')
- parser.add_argument('--file_type', type=str, default='text', choices=["text", "orc", "parquet"],
- help='Type of files to merge (default: text, support: orc,parquet,text )')
- parser.add_argument('--merge_size', type=int, default=128,
- help='Size in MB for merging files (default: 128)')
- parser.add_argument('--compression', type=str, default='gzip',
- help='Compression type (default: gzip)')
- # 解析命令行参数
- return parser.parse_args()
- if __name__ == '__main__':
- args = parse_arguments()
- print(f"任务参数:{args}")
- hdfs_path_list = args.path
- file_type = args.file_type
- merge_size = args.merge_size
- compression = args.compression
- from dw_base.utils.hdfs_merge_small_file import scan_dir_merge_file, hadoop_fs_test
- for hdfs_path in hdfs_path_list:
- if hdfs_path.strip() == "":
- print(f"当前参数传入的HDFS路径为空,请输入正确的HDFS路径")
- sys.exit()
- is_exists = hadoop_fs_test(hdfs_path)
- if is_exists:
- scan_dir_merge_file(hdfs_path, file_type=file_type, file_merge_size=merge_size,
- compression_type=compression)
- print("")
- else:
- print(f"HDFS路径: {hdfs_path} 不存在,请检查.")
|