datax-hdfs-export-starter.py 4.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. DataX hdfs-export 入口:源=HDFS(Hive 表数据),目标=外部系统(ES/Mongo/MySQL/Kafka 等),
  5. 对应 jobs/ads/ 场景。
  6. 老参数平迁集(见 kb/90 §2.6;无 -skip-partition / -t):
  7. -ini / -inis / -start-date / -stop-date / -host / -random / -parallel / -skip-datax / -skip-check
  8. HDFS 源存在性 check 默认开启,missing/empty → 任务失败(与老 silent skip 语义反转,
  9. 配合 DS 告警"没数据=异常"场景);手动跑批知道分区空可加 -skip-check 跳过。
  10. """
  11. import argparse
  12. import os
  13. import sys
  14. from datetime import date, timedelta
  15. project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  16. sys.path.append(project_root_dir)
  17. from dw_base.datax.entry import DataxExport
  18. def main():
  19. parser = argparse.ArgumentParser(
  20. prog='datax-hdfs-export-starter',
  21. description='DataX hdfs-export 入口:Hive/HDFS → 外部系统 (ads 层)',
  22. )
  23. parser.add_argument('-ini', action='append', default=[], metavar='FILE',
  24. help='DataX ini 单文件(可多次)')
  25. parser.add_argument('-inis', action='append', default=[], metavar='DIR',
  26. help='DataX ini 目录,非递归扫 *.ini(可多次)')
  27. parser.add_argument('-start-date', default=None, metavar='YYYYMMDD',
  28. help='默认昨天(对齐老 sh 行为)')
  29. parser.add_argument('-stop-date', default=None, metavar='YYYYMMDD',
  30. help='默认今天(对齐老 sh 行为)')
  31. parser.add_argument('-host', default=None, metavar='HOSTNAME',
  32. help='显式指定 worker(优先于 -random)')
  33. parser.add_argument('-random', action='store_true', dest='use_random',
  34. help='从 conf/workers.ini 加权随机选 worker')
  35. parser.add_argument('-parallel', action='store_true',
  36. help='并行执行(默认串行)')
  37. parser.add_argument('-skip-datax', action='store_true',
  38. help='只生成 json 不执行 datax.py')
  39. parser.add_argument('-skip-check', action='store_true',
  40. help='跳过 HDFS 源路径 check(默认开启 check,missing/empty 算失败;显式关闭后不检查直接交 datax)')
  41. # L3 speed 覆盖(L1 conf/datax-tuning.conf < L2 ini [speed] 段 < L3 本参数)
  42. parser.add_argument('-channel', type=int, default=None,
  43. help='L3 speed.channel 覆盖(不传则走 L2 ini / L1 conf)')
  44. parser.add_argument('-byte', type=int, default=None,
  45. help='L3 speed.byte 覆盖(单位 bytes)')
  46. parser.add_argument('-record', type=int, default=None,
  47. help='L3 speed.record 覆盖')
  48. args = parser.parse_args()
  49. # 默认日期:昨天 → 今天(对齐老 datax-single-job-starter.sh:207-219 行为)
  50. if not args.start_date:
  51. args.start_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  52. if not args.stop_date:
  53. args.stop_date = date.today().strftime('%Y%m%d')
  54. print('{script} 收到参数: {argv}'.format(
  55. script=os.path.basename(__file__), argv=' '.join(sys.argv[1:]),
  56. ))
  57. print(' start_date={s} stop_date={e}'.format(s=args.start_date, e=args.stop_date))
  58. exporter = DataxExport(
  59. base_dir=project_root_dir,
  60. workers_ini_path=os.path.join(project_root_dir, 'conf', 'workers.ini'),
  61. release_user=os.environ['RELEASE_USER'],
  62. release_root_dir=os.environ['RELEASE_ROOT_DIR'],
  63. python3_path=os.environ['PYTHON3_PATH'],
  64. datax_home=os.environ['DATAX_HOME'],
  65. log_root_dir=os.environ['LOG_ROOT_DIR'],
  66. )
  67. speed_overrides = {'channel': args.channel, 'byte': args.byte, 'record': args.record}
  68. failed = exporter.run(
  69. inis=args.ini,
  70. inis_dirs=args.inis,
  71. start_date=args.start_date,
  72. stop_date=args.stop_date,
  73. host=args.host,
  74. use_random=args.use_random,
  75. parallel=args.parallel,
  76. skip_datax=args.skip_datax,
  77. skip_check=args.skip_check,
  78. speed_overrides=speed_overrides,
  79. )
  80. sys.exit(failed)
  81. if __name__ == '__main__':
  82. main()