# encoding: utf8 import sys import argparse """ 输入hdfs目录、存储类型、合并文件大小、压缩类型: 1、如果该目录下是二级目录,则遍历每个二级目录,每个二级目录中的小文件做合并 2、如果该目录下是文件,则合并该目录下的小文件 """ def parse_arguments(): parser = argparse.ArgumentParser(description="Merge small files in HDFS") # hdfs路径,可以输入多个。例如:--path /tmp/a /tmp/b /tmp/c parser.add_argument('--path', type=str, nargs='+', help='Directory to scan for files') parser.add_argument('--file_type', type=str, default='text', choices=["text", "orc", "parquet"], help='Type of files to merge (default: text, support: orc,parquet,text )') parser.add_argument('--merge_size', type=int, default=128, help='Size in MB for merging files (default: 128)') parser.add_argument('--compression', type=str, default='gzip', help='Compression type (default: gzip)') # 解析命令行参数 return parser.parse_args() if __name__ == '__main__': args = parse_arguments() print(f"任务参数:{args}") hdfs_path_list = args.path file_type = args.file_type merge_size = args.merge_size compression = args.compression from dw_base.utils.hdfs_merge_small_file import scan_dir_merge_file, hadoop_fs_test for hdfs_path in hdfs_path_list: if hdfs_path.strip() == "": print(f"当前参数传入的HDFS路径为空,请输入正确的HDFS路径") sys.exit() is_exists = hadoop_fs_test(hdfs_path) if is_exists: scan_dir_merge_file(hdfs_path, file_type=file_type, file_merge_size=merge_size, compression_type=compression) print("") else: print(f"HDFS路径: {hdfs_path} 不存在,请检查.")