datax-hdfs-export-starter.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. DataX hdfs-export 入口:源=HDFS(Hive 表数据),目标=外部系统(ES/Mongo/MySQL/Kafka 等),
  5. 对应 jobs/ads/ 场景。
  6. 老参数平迁集(见 kb/90 §2.6;无 -skip-partition / -t):
  7. -ini / -inis / -start-date / -stop-date / -host / -random / -parallel / -skip-datax
  8. 注:源 HDFS 路径存在性 check(老 check_data_exists 行为)本版本暂未搬迁,
  9. 后续按需补;当前源路径不存在时 datax.py 直接报错。
  10. """
  11. import argparse
  12. import os
  13. import sys
  14. project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  15. sys.path.append(project_root_dir)
  16. from dw_base.datax.entry import DataxExport
  17. def main():
  18. parser = argparse.ArgumentParser(
  19. prog='datax-hdfs-export-starter',
  20. description='DataX hdfs-export 入口:Hive/HDFS → 外部系统 (ads 层)',
  21. )
  22. parser.add_argument('-ini', action='append', default=[], metavar='FILE',
  23. help='DataX ini 单文件(可多次)')
  24. parser.add_argument('-inis', action='append', default=[], metavar='DIR',
  25. help='DataX ini 目录,非递归扫 *.ini(可多次)')
  26. parser.add_argument('-start-date', required=True, metavar='YYYYMMDD')
  27. parser.add_argument('-stop-date', required=True, metavar='YYYYMMDD')
  28. parser.add_argument('-host', default=None, metavar='HOSTNAME',
  29. help='显式指定 worker(优先于 -random)')
  30. parser.add_argument('-random', action='store_true', dest='use_random',
  31. help='从 conf/workers.ini 加权随机选 worker')
  32. parser.add_argument('-parallel', action='store_true',
  33. help='并行执行(默认串行)')
  34. parser.add_argument('-skip-datax', action='store_true',
  35. help='只生成 json 不执行 datax.py')
  36. args = parser.parse_args()
  37. print('{script} 收到参数: {argv}'.format(
  38. script=os.path.basename(__file__), argv=' '.join(sys.argv[1:]),
  39. ))
  40. exporter = DataxExport(
  41. base_dir=project_root_dir,
  42. workers_ini_path=os.path.join(project_root_dir, 'conf', 'workers.ini'),
  43. release_user=os.environ['RELEASE_USER'],
  44. release_root_dir=os.environ['RELEASE_ROOT_DIR'],
  45. python3_path=os.environ['PYTHON3_PATH'],
  46. datax_home=os.environ['DATAX_HOME'],
  47. log_root_dir=os.environ['LOG_ROOT_DIR'],
  48. )
  49. failed = exporter.run(
  50. inis=args.ini,
  51. inis_dirs=args.inis,
  52. start_date=args.start_date,
  53. stop_date=args.stop_date,
  54. host=args.host,
  55. use_random=args.use_random,
  56. parallel=args.parallel,
  57. skip_datax=args.skip_datax,
  58. )
  59. sys.exit(failed)
  60. if __name__ == '__main__':
  61. main()