datax-hive-import-starter.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. DataX hive-import 入口:目标=Hive(自动预建分区),对应 jobs/raw/ 场景。
  5. 参数集(见 kb/90 §2.6):
  6. -ini <file> 单 ini,可多次
  7. -inis <dir> ini 目录(非递归扫 *.ini),可多次
  8. -start-date / -stop-date yyyyMMdd
  9. -host <hostname> 显式指定 worker
  10. -random 加权随机选 worker
  11. -parallel 并行(默认串行)
  12. -skip-datax 只生成 json 不执行
  13. -skip-partition 跳过 Hive 分区预建
  14. -backfill 【高级用法】存量回填(见下方注释)
  15. -channel / -byte / -record L3 speed 覆盖
  16. """
  17. import argparse
  18. import os
  19. import sys
  20. from datetime import date, timedelta
  21. project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  22. sys.path.append(project_root_dir)
  23. from dw_base.datax.entry import DataxImport
  24. def main():
  25. parser = argparse.ArgumentParser(
  26. prog='datax-hive-import-starter',
  27. description='DataX hive-import 入口:PG/其他源 → Hive (raw 层)',
  28. )
  29. parser.add_argument('-ini', action='append', default=[], metavar='FILE',
  30. help='DataX ini 单文件(可多次)')
  31. parser.add_argument('-inis', action='append', default=[], metavar='DIR',
  32. help='DataX ini 目录,非递归扫 *.ini(可多次)')
  33. parser.add_argument('-start-date', default=None, metavar='YYYYMMDD',
  34. help='默认昨天(对齐老 sh 行为);-backfill 模式下作为外层回填范围起(含)')
  35. parser.add_argument('-stop-date', default=None, metavar='YYYYMMDD',
  36. help='默认今天(对齐老 sh 行为);-backfill 模式下作为外层回填范围止(不含)')
  37. parser.add_argument('-host', default=None, metavar='HOSTNAME',
  38. help='显式指定 worker(优先于 -random)')
  39. parser.add_argument('-random', action='store_true', dest='use_random',
  40. help='从 conf/workers.ini 加权随机选 worker')
  41. parser.add_argument('-parallel', action='store_true',
  42. help='并行执行(默认串行);-backfill 模式下表示单日内多 ini 并行(日间仍串行)')
  43. parser.add_argument('-skip-datax', action='store_true',
  44. help='只生成 json 不执行 datax.py')
  45. parser.add_argument('-skip-partition', action='store_true',
  46. help='跳过 Hive 分区预建')
  47. # -----------------------------------------------------------------------------
  48. # 【高级用法】-backfill 存量回填模式
  49. #
  50. # 不传:默认单日语义,-start-date/-stop-date 对应一个 dt 分区(T+1 增量场景,DS 调度)
  51. # 传 :-start-date/-stop-date 作外层回填范围(左闭右开),按日循环调用单日逻辑;
  52. # 每天独立建分区 + 跑 datax,失败不中断、继续下一天;exit = 失败任务数
  53. #
  54. # 典型场景:接入一张新源表,一次性回填历史存量数据(1 个月、半年、N 年)
  55. #
  56. # 例:7 天范围,2 个 ini(a/b),单日内并行跑两表
  57. # python3 bin/datax-hive-import-starter.py \
  58. # -ini jobs/raw/a.ini -ini jobs/raw/b.ini \
  59. # -start-date 20260420 -stop-date 20260427 \
  60. # -parallel -backfill
  61. # 时序:外层 7 天串行(dt=20260420 → dt=20260426),每天内 a/b 并发跑 →
  62. # 产物 14 个 Hive 分区(7 天 × 2 表)
  63. # -----------------------------------------------------------------------------
  64. parser.add_argument('-backfill', action='store_true',
  65. help='【高级用法】存量回填:-start-date/-stop-date 作外层范围按日循环(DS 任务不加此 flag)')
  66. # L3 speed 覆盖(L1 conf/datax-tuning.conf < L2 ini [speed] 段 < L3 本参数)
  67. parser.add_argument('-channel', type=int, default=None,
  68. help='L3 speed.channel 覆盖(不传则走 L2 ini / L1 conf)')
  69. parser.add_argument('-byte', type=int, default=None,
  70. help='L3 speed.byte 覆盖(单位 bytes)')
  71. parser.add_argument('-record', type=int, default=None,
  72. help='L3 speed.record 覆盖')
  73. args = parser.parse_args()
  74. # 默认日期:昨天 → 今天(对齐老 datax-single-job-starter.sh:207-219 行为)
  75. if not args.start_date:
  76. args.start_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  77. if not args.stop_date:
  78. args.stop_date = date.today().strftime('%Y%m%d')
  79. print('{script} 收到参数: {argv}'.format(
  80. script=os.path.basename(__file__), argv=' '.join(sys.argv[1:]),
  81. ))
  82. print(' start_date={s} stop_date={e} backfill={b}'.format(
  83. s=args.start_date, e=args.stop_date, b=args.backfill))
  84. speed_overrides = {'channel': args.channel, 'byte': args.byte, 'record': args.record}
  85. importer = DataxImport(
  86. base_dir=project_root_dir,
  87. workers_ini_path=os.path.join(project_root_dir, 'conf', 'workers.ini'),
  88. release_user=os.environ['RELEASE_USER'],
  89. release_root_dir=os.environ['RELEASE_ROOT_DIR'],
  90. python3_path=os.environ['PYTHON3_PATH'],
  91. datax_home=os.environ['DATAX_HOME'],
  92. log_root_dir=os.environ['LOG_ROOT_DIR'],
  93. )
  94. if args.backfill:
  95. total_failed = importer.backfill(
  96. inis=args.ini,
  97. inis_dirs=args.inis,
  98. start_date=args.start_date,
  99. stop_date=args.stop_date,
  100. host=args.host,
  101. use_random=args.use_random,
  102. parallel=args.parallel,
  103. skip_partition=args.skip_partition,
  104. skip_datax=args.skip_datax,
  105. speed_overrides=speed_overrides,
  106. )
  107. sys.exit(total_failed)
  108. failed = importer.run(
  109. inis=args.ini,
  110. inis_dirs=args.inis,
  111. start_date=args.start_date,
  112. stop_date=args.stop_date,
  113. host=args.host,
  114. use_random=args.use_random,
  115. parallel=args.parallel,
  116. skip_partition=args.skip_partition,
  117. skip_datax=args.skip_datax,
  118. speed_overrides=speed_overrides,
  119. )
  120. sys.exit(failed)
  121. if __name__ == '__main__':
  122. main()