datax-hdfs-export-starter.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. DataX hdfs-export 入口:源=HDFS(Hive 表数据),目标=外部系统(ES/Mongo/MySQL/Kafka 等),
  5. 对应 jobs/ads/ 场景。
  6. 老参数平迁集(见 kb/90 §2.6;无 -skip-partition / -t):
  7. -ini / -inis / -start-date / -stop-date / -host / -random / -parallel / -skip-datax
  8. 注:源 HDFS 路径存在性 check(老 check_data_exists 行为)本版本暂未搬迁,
  9. 后续按需补;当前源路径不存在时 datax.py 直接报错。
  10. """
  11. import argparse
  12. import os
  13. import sys
  14. from datetime import date, timedelta
  15. project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  16. sys.path.append(project_root_dir)
  17. from dw_base.datax.entry import DataxExport
  18. def main():
  19. parser = argparse.ArgumentParser(
  20. prog='datax-hdfs-export-starter',
  21. description='DataX hdfs-export 入口:Hive/HDFS → 外部系统 (ads 层)',
  22. )
  23. parser.add_argument('-ini', action='append', default=[], metavar='FILE',
  24. help='DataX ini 单文件(可多次)')
  25. parser.add_argument('-inis', action='append', default=[], metavar='DIR',
  26. help='DataX ini 目录,非递归扫 *.ini(可多次)')
  27. parser.add_argument('-start-date', default=None, metavar='YYYYMMDD',
  28. help='默认昨天(对齐老 sh 行为)')
  29. parser.add_argument('-stop-date', default=None, metavar='YYYYMMDD',
  30. help='默认今天(对齐老 sh 行为)')
  31. parser.add_argument('-host', default=None, metavar='HOSTNAME',
  32. help='显式指定 worker(优先于 -random)')
  33. parser.add_argument('-random', action='store_true', dest='use_random',
  34. help='从 conf/workers.ini 加权随机选 worker')
  35. parser.add_argument('-parallel', action='store_true',
  36. help='并行执行(默认串行)')
  37. parser.add_argument('-skip-datax', action='store_true',
  38. help='只生成 json 不执行 datax.py')
  39. args = parser.parse_args()
  40. # 默认日期:昨天 → 今天(对齐老 datax-single-job-starter.sh:207-219 行为)
  41. if not args.start_date:
  42. args.start_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  43. if not args.stop_date:
  44. args.stop_date = date.today().strftime('%Y%m%d')
  45. print('{script} 收到参数: {argv}'.format(
  46. script=os.path.basename(__file__), argv=' '.join(sys.argv[1:]),
  47. ))
  48. print(' start_date={s} stop_date={e}'.format(s=args.start_date, e=args.stop_date))
  49. exporter = DataxExport(
  50. base_dir=project_root_dir,
  51. workers_ini_path=os.path.join(project_root_dir, 'conf', 'workers.ini'),
  52. release_user=os.environ['RELEASE_USER'],
  53. release_root_dir=os.environ['RELEASE_ROOT_DIR'],
  54. python3_path=os.environ['PYTHON3_PATH'],
  55. datax_home=os.environ['DATAX_HOME'],
  56. log_root_dir=os.environ['LOG_ROOT_DIR'],
  57. )
  58. failed = exporter.run(
  59. inis=args.ini,
  60. inis_dirs=args.inis,
  61. start_date=args.start_date,
  62. stop_date=args.stop_date,
  63. host=args.host,
  64. use_random=args.use_random,
  65. parallel=args.parallel,
  66. skip_datax=args.skip_datax,
  67. )
  68. sys.exit(failed)
  69. if __name__ == '__main__':
  70. main()