#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- """ DataX hive-import 入口:目标=Hive(自动预建分区),对应 jobs/raw/ 场景。 参数集(见 kb/90 §2.6): -ini 单 ini,可多次 -inis ini 目录(非递归扫 *.ini),可多次 -start-date / -stop-date yyyyMMdd -host 显式指定 worker -random 加权随机选 worker -parallel 并行(默认串行) -skip-datax 只生成 json 不执行 -skip-partition 跳过 Hive 分区预建 -backfill 【高级用法】存量回填(见下方注释) -channel / -byte / -record L3 speed 覆盖 """ import argparse import os import sys from datetime import date, timedelta project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(project_root_dir) from dw_base.datax.entry import DataxImport def main(): parser = argparse.ArgumentParser( prog='datax-hive-import-starter', description='DataX hive-import 入口:PG/其他源 → Hive (raw 层)', ) parser.add_argument('-ini', action='append', default=[], metavar='FILE', help='DataX ini 单文件(可多次)') parser.add_argument('-inis', action='append', default=[], metavar='DIR', help='DataX ini 目录,非递归扫 *.ini(可多次)') parser.add_argument('-start-date', default=None, metavar='YYYYMMDD', help='默认昨天(对齐老 sh 行为);-backfill 模式下作为外层回填范围起(含)') parser.add_argument('-stop-date', default=None, metavar='YYYYMMDD', help='默认今天(对齐老 sh 行为);-backfill 模式下作为外层回填范围止(不含)') parser.add_argument('-host', default=None, metavar='HOSTNAME', help='显式指定 worker(优先于 -random)') parser.add_argument('-random', action='store_true', dest='use_random', help='从 conf/workers.ini 加权随机选 worker') parser.add_argument('-parallel', action='store_true', help='并行执行(默认串行);-backfill 模式下表示单日内多 ini 并行(日间仍串行)') parser.add_argument('-skip-datax', action='store_true', help='只生成 json 不执行 datax.py') parser.add_argument('-skip-partition', action='store_true', help='跳过 Hive 分区预建') # ----------------------------------------------------------------------------- # 【高级用法】-backfill 存量回填模式 # # 不传:默认单日语义,-start-date/-stop-date 对应一个 dt 分区(T+1 增量场景,DS 调度) # 传 :-start-date/-stop-date 作外层回填范围(左闭右开),按日循环调用单日逻辑; # 每天独立建分区 + 跑 datax,失败不中断、继续下一天;exit = 失败任务数 # # 典型场景:接入一张新源表,一次性回填历史存量数据(1 个月、半年、N 年) # # 例:7 天范围,2 个 ini(a/b),单日内并行跑两表 # python3 bin/datax-hive-import-starter.py \ # -ini jobs/raw/a.ini -ini jobs/raw/b.ini \ # -start-date 20260420 -stop-date 20260427 \ # -parallel -backfill # 时序:外层 7 天串行(dt=20260420 → dt=20260426),每天内 a/b 并发跑 → # 产物 14 个 Hive 分区(7 天 × 2 表) # ----------------------------------------------------------------------------- parser.add_argument('-backfill', action='store_true', help='【高级用法】存量回填:-start-date/-stop-date 作外层范围按日循环(DS 任务不加此 flag)') # L3 speed 覆盖(L1 conf/datax-tuning.conf < L2 ini [speed] 段 < L3 本参数) parser.add_argument('-channel', type=int, default=None, help='L3 speed.channel 覆盖(不传则走 L2 ini / L1 conf)') parser.add_argument('-byte', type=int, default=None, help='L3 speed.byte 覆盖(单位 bytes)') parser.add_argument('-record', type=int, default=None, help='L3 speed.record 覆盖') args = parser.parse_args() # 默认日期:昨天 → 今天(对齐老 datax-single-job-starter.sh:207-219 行为) if not args.start_date: args.start_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d') if not args.stop_date: args.stop_date = date.today().strftime('%Y%m%d') print('{script} 收到参数: {argv}'.format( script=os.path.basename(__file__), argv=' '.join(sys.argv[1:]), )) print(' start_date={s} stop_date={e} backfill={b}'.format( s=args.start_date, e=args.stop_date, b=args.backfill)) speed_overrides = {'channel': args.channel, 'byte': args.byte, 'record': args.record} importer = DataxImport( base_dir=project_root_dir, workers_ini_path=os.path.join(project_root_dir, 'conf', 'workers.ini'), release_user=os.environ['RELEASE_USER'], release_root_dir=os.environ['RELEASE_ROOT_DIR'], python3_path=os.environ['PYTHON3_PATH'], datax_home=os.environ['DATAX_HOME'], log_root_dir=os.environ['LOG_ROOT_DIR'], ) if args.backfill: total_failed = importer.backfill( inis=args.ini, inis_dirs=args.inis, start_date=args.start_date, stop_date=args.stop_date, host=args.host, use_random=args.use_random, parallel=args.parallel, skip_partition=args.skip_partition, skip_datax=args.skip_datax, speed_overrides=speed_overrides, ) sys.exit(total_failed) failed = importer.run( inis=args.ini, inis_dirs=args.inis, start_date=args.start_date, stop_date=args.stop_date, host=args.host, use_random=args.use_random, parallel=args.parallel, skip_partition=args.skip_partition, skip_datax=args.skip_datax, speed_overrides=speed_overrides, ) sys.exit(failed) if __name__ == '__main__': main()