cli.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. # -*- coding:utf-8 -*-
  2. """
  3. DataX Python 层 CLI 入口(供 runner 本机调用 + 远端 ssh 调用统一用它替代老 bin shim)。
  4. 当前子命令:
  5. gen-json <ini> -start-date <yyyymmdd> -stop-date <yyyymmdd>
  6. [-channel N] [-byte N] [-record N]
  7. 读 ini 调 JobConfigGenerator 生成 json 到 conf/datax-json/{job_name}.json
  8. -channel/-byte/-record 是 L3 CLI speed 覆盖(L1 conf < L2 ini [speed] < L3 CLI)
  9. 用法:
  10. python3 -m dw_base.datax.cli gen-json <ini> -start-date 20260422 -stop-date 20260423 \
  11. -channel 20 -byte 20971520 -record 50000
  12. """
  13. import argparse
  14. import os
  15. from dw_base.datax.job_config_generator import JobConfigGenerator
  16. from dw_base.datax.path_utils import json_output_path
  17. def _cmd_gen_json(args):
  18. base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
  19. out = json_output_path(base_dir, args.ini)
  20. os.makedirs(os.path.dirname(out), exist_ok=True)
  21. cli_speed_overrides = {
  22. 'channel': args.channel,
  23. 'byte': args.byte,
  24. 'record': args.record,
  25. }
  26. gen = JobConfigGenerator(base_dir, args.ini, args.start_date, args.stop_date, out,
  27. cli_speed_overrides=cli_speed_overrides)
  28. gen.run()
  29. print('生成 DataX json: {out}'.format(out=out))
  30. def main(argv=None):
  31. parser = argparse.ArgumentParser(prog='dw_base.datax.cli')
  32. sub = parser.add_subparsers(dest='cmd')
  33. sub.required = True
  34. g = sub.add_parser('gen-json', help='ini → DataX json 配置')
  35. g.add_argument('ini', help='DataX ini 路径')
  36. g.add_argument('-start-date', required=True, dest='start_date', metavar='YYYYMMDD')
  37. g.add_argument('-stop-date', required=True, dest='stop_date', metavar='YYYYMMDD')
  38. g.add_argument('-channel', type=int, default=None,
  39. help='L3 speed.channel 覆盖(不传则走 L2 ini / L1 conf)')
  40. g.add_argument('-byte', type=int, default=None,
  41. help='L3 speed.byte 覆盖(单位 bytes)')
  42. g.add_argument('-record', type=int, default=None,
  43. help='L3 speed.record 覆盖')
  44. g.set_defaults(func=_cmd_gen_json)
  45. args = parser.parse_args(argv)
  46. args.func(args)
  47. if __name__ == '__main__':
  48. main()