浏览代码

feat(datax): 新增 cli 模块(python -m dw_base.datax.cli gen-json)+ 单测

子命令 gen-json <ini> -start-date -stop-date 内部调 JobConfigGenerator
生成 json 到 conf/datax-json/{job_name}.json;供 runner 本机和远端
ssh 统一调用,替代老 bin/datax-job-config-generator.py shim

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 1 周之前
父节点
当前提交
952754b4db
共有 2 个文件被更改,包括 77 次插入0 次删除
  1. 44 0
      dw_base/datax/cli.py
  2. 33 0
      tests/unit/datax/test_cli.py

+ 44 - 0
dw_base/datax/cli.py

@@ -0,0 +1,44 @@
+# -*- coding:utf-8 -*-
+"""
+DataX Python 层 CLI 入口(供 runner 本机调用 + 远端 ssh 调用统一用它替代老 bin shim)。
+
+当前子命令:
+  gen-json <ini> -start-date <yyyymmdd> -stop-date <yyyymmdd>
+    读 ini 调 JobConfigGenerator 生成 json 到 conf/datax-json/{job_name}.json
+
+用法:
+  python3 -m dw_base.datax.cli gen-json <ini> -start-date 20260422 -stop-date 20260423
+"""
+import argparse
+import os
+
+from dw_base.datax.job_config_generator import JobConfigGenerator
+from dw_base.datax.path_utils import json_output_path
+
+
+def _cmd_gen_json(args):
+    base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
+    out = json_output_path(base_dir, args.ini)
+    os.makedirs(os.path.dirname(out), exist_ok=True)
+    gen = JobConfigGenerator(base_dir, args.ini, args.start_date, args.stop_date, out)
+    gen.run()
+    print('生成 DataX json: {out}'.format(out=out))
+
+
+def main(argv=None):
+    parser = argparse.ArgumentParser(prog='dw_base.datax.cli')
+    sub = parser.add_subparsers(dest='cmd')
+    sub.required = True
+
+    g = sub.add_parser('gen-json', help='ini → DataX json 配置')
+    g.add_argument('ini', help='DataX ini 路径')
+    g.add_argument('-start-date', required=True, dest='start_date', metavar='YYYYMMDD')
+    g.add_argument('-stop-date', required=True, dest='stop_date', metavar='YYYYMMDD')
+    g.set_defaults(func=_cmd_gen_json)
+
+    args = parser.parse_args(argv)
+    args.func(args)
+
+
+if __name__ == '__main__':
+    main()

+ 33 - 0
tests/unit/datax/test_cli.py

@@ -0,0 +1,33 @@
+# -*- coding:utf-8 -*-
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from dw_base.datax import cli
+
+
+@patch('dw_base.datax.cli.JobConfigGenerator')
+def test_gen_json_invokes_generator(MockGen):
+    mock_inst = MagicMock()
+    MockGen.return_value = mock_inst
+    cli.main(['gen-json', 'some.ini',
+              '-start-date', '20260422',
+              '-stop-date', '20260423'])
+    MockGen.assert_called_once()
+    call_args = MockGen.call_args[0]
+    # 位置参数:(base_dir, ini, start, stop, output)
+    assert call_args[1] == 'some.ini'
+    assert call_args[2] == '20260422'
+    assert call_args[3] == '20260423'
+    assert call_args[4].endswith('some.json')
+    mock_inst.run.assert_called_once()
+
+
+def test_gen_json_missing_args_exits():
+    with pytest.raises(SystemExit):
+        cli.main(['gen-json', 'some.ini'])  # 缺 -start-date/-stop-date
+
+
+def test_unknown_subcommand_exits():
+    with pytest.raises(SystemExit):
+        cli.main(['bogus-cmd'])