hdfs_dir_file_coalesce.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. # encoding: utf8
  2. import sys
  3. import argparse
  4. """
  5. 输入hdfs目录、存储类型、合并文件大小、压缩类型:
  6. 1、如果该目录下是二级目录,则遍历每个二级目录,每个二级目录中的小文件做合并
  7. 2、如果该目录下是文件,则合并该目录下的小文件
  8. """
  9. def parse_arguments():
  10. parser = argparse.ArgumentParser(description="Merge small files in HDFS")
  11. # hdfs路径,可以输入多个。例如:--path /tmp/a /tmp/b /tmp/c
  12. parser.add_argument('--path', type=str, nargs='+', help='Directory to scan for files')
  13. parser.add_argument('--file_type', type=str, default='text', choices=["text", "orc", "parquet"],
  14. help='Type of files to merge (default: text, support: orc,parquet,text )')
  15. parser.add_argument('--merge_size', type=int, default=128,
  16. help='Size in MB for merging files (default: 128)')
  17. parser.add_argument('--compression', type=str, default='gzip',
  18. help='Compression type (default: gzip)')
  19. # 解析命令行参数
  20. return parser.parse_args()
  21. if __name__ == '__main__':
  22. args = parse_arguments()
  23. print(f"任务参数:{args}")
  24. hdfs_path_list = args.path
  25. file_type = args.file_type
  26. merge_size = args.merge_size
  27. compression = args.compression
  28. from dw_base.utils.hdfs_merge_small_file import scan_dir_merge_file, hadoop_fs_test
  29. for hdfs_path in hdfs_path_list:
  30. if hdfs_path.strip() == "":
  31. print(f"当前参数传入的HDFS路径为空,请输入正确的HDFS路径")
  32. sys.exit()
  33. is_exists = hadoop_fs_test(hdfs_path)
  34. if is_exists:
  35. scan_dir_merge_file(hdfs_path, file_type=file_type, file_merge_size=merge_size,
  36. compression_type=compression)
  37. print("")
  38. else:
  39. print(f"HDFS路径: {hdfs_path} 不存在,请检查.")