datax-hdfs-export-starter.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. DataX hdfs-export 入口:源=HDFS(Hive 表数据),目标=外部系统(ES/Mongo/MySQL/Kafka 等),
  5. 对应 jobs/ads/ 场景。
  6. 老参数平迁集(见 kb/90 §2.6;无 -skip-partition / -t):
  7. -ini / -inis / -start-date / -stop-date / -host / -random / -parallel / -skip-datax / -skip-check
  8. HDFS 源存在性 check 默认开启,missing/empty → 任务失败(与老 silent skip 语义反转,
  9. 配合 DS 告警"没数据=异常"场景);手动跑批知道分区空可加 -skip-check 跳过。
  10. """
  11. import argparse
  12. import os
  13. import sys
  14. from datetime import date, timedelta
  15. project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  16. sys.path.append(project_root_dir)
  17. from dw_base.datax.entry import DataxExport
  18. def main():
  19. parser = argparse.ArgumentParser(
  20. prog='datax-hdfs-export-starter',
  21. description='DataX hdfs-export 入口:Hive/HDFS → 外部系统 (ads 层)',
  22. )
  23. parser.add_argument('-ini', action='append', default=[], metavar='FILE',
  24. help='DataX ini 单文件(可多次)')
  25. parser.add_argument('-inis', action='append', default=[], metavar='DIR',
  26. help='DataX ini 目录,非递归扫 *.ini(可多次)')
  27. parser.add_argument('-start-date', default=None, metavar='YYYYMMDD',
  28. help='默认昨天(对齐老 sh 行为)')
  29. parser.add_argument('-stop-date', default=None, metavar='YYYYMMDD',
  30. help='默认今天(对齐老 sh 行为)')
  31. parser.add_argument('-host', default=None, metavar='HOSTNAME',
  32. help='显式指定 worker(优先于 -random)')
  33. parser.add_argument('-random', action='store_true', dest='use_random',
  34. help='从 conf/workers.ini 加权随机选 worker')
  35. parser.add_argument('-parallel', action='store_true',
  36. help='并行执行(默认串行)')
  37. parser.add_argument('-skip-datax', action='store_true',
  38. help='只生成 json 不执行 datax.py')
  39. parser.add_argument('-skip-check', action='store_true',
  40. help='跳过 HDFS 源路径 check(默认开启 check,missing/empty 算失败;显式关闭后不检查直接交 datax)')
  41. args = parser.parse_args()
  42. # 默认日期:昨天 → 今天(对齐老 datax-single-job-starter.sh:207-219 行为)
  43. if not args.start_date:
  44. args.start_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  45. if not args.stop_date:
  46. args.stop_date = date.today().strftime('%Y%m%d')
  47. print('{script} 收到参数: {argv}'.format(
  48. script=os.path.basename(__file__), argv=' '.join(sys.argv[1:]),
  49. ))
  50. print(' start_date={s} stop_date={e}'.format(s=args.start_date, e=args.stop_date))
  51. exporter = DataxExport(
  52. base_dir=project_root_dir,
  53. workers_ini_path=os.path.join(project_root_dir, 'conf', 'workers.ini'),
  54. release_user=os.environ['RELEASE_USER'],
  55. release_root_dir=os.environ['RELEASE_ROOT_DIR'],
  56. python3_path=os.environ['PYTHON3_PATH'],
  57. datax_home=os.environ['DATAX_HOME'],
  58. log_root_dir=os.environ['LOG_ROOT_DIR'],
  59. )
  60. failed = exporter.run(
  61. inis=args.ini,
  62. inis_dirs=args.inis,
  63. start_date=args.start_date,
  64. stop_date=args.stop_date,
  65. host=args.host,
  66. use_random=args.use_random,
  67. parallel=args.parallel,
  68. skip_datax=args.skip_datax,
  69. skip_check=args.skip_check,
  70. )
  71. sys.exit(failed)
  72. if __name__ == '__main__':
  73. main()