# -*- coding:utf-8 -*- """ DataX Python 层 CLI 入口(供 runner 本机调用 + 远端 ssh 调用统一用它替代老 bin shim)。 当前子命令: gen-json -start-date -stop-date [-channel N] [-byte N] [-record N] 读 ini 调 JobConfigGenerator 生成 json 到 conf/datax-json/{job_name}.json -channel/-byte/-record 是 L3 CLI speed 覆盖(L1 conf < L2 ini [speed] < L3 CLI) 用法: python3 -m dw_base.datax.cli gen-json -start-date 20260422 -stop-date 20260423 \ -channel 20 -byte 20971520 -record 50000 """ import argparse import os from dw_base.datax.job_config_generator import JobConfigGenerator from dw_base.datax.path_utils import json_output_path def _cmd_gen_json(args): base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) out = json_output_path(base_dir, args.ini) os.makedirs(os.path.dirname(out), exist_ok=True) cli_speed_overrides = { 'channel': args.channel, 'byte': args.byte, 'record': args.record, } gen = JobConfigGenerator(base_dir, args.ini, args.start_date, args.stop_date, out, cli_speed_overrides=cli_speed_overrides) gen.run() print('生成 DataX json: {out}'.format(out=out)) def main(argv=None): parser = argparse.ArgumentParser(prog='dw_base.datax.cli') sub = parser.add_subparsers(dest='cmd') sub.required = True g = sub.add_parser('gen-json', help='ini → DataX json 配置') g.add_argument('ini', help='DataX ini 路径') g.add_argument('-start-date', required=True, dest='start_date', metavar='YYYYMMDD') g.add_argument('-stop-date', required=True, dest='stop_date', metavar='YYYYMMDD') g.add_argument('-channel', type=int, default=None, help='L3 speed.channel 覆盖(不传则走 L2 ini / L1 conf)') g.add_argument('-byte', type=int, default=None, help='L3 speed.byte 覆盖(单位 bytes)') g.add_argument('-record', type=int, default=None, help='L3 speed.record 覆盖') g.set_defaults(func=_cmd_gen_json) args = parser.parse_args(argv) args.func(args) if __name__ == '__main__': main()