Browse Source

feat(ds): conf/ds.ini 加 [projects] 段 + cli 加 put 子命令

- conf/ds.ini 新增 [projects] 段(poyee-data-warehouse / manual /
  test 三项目码),项目码不算高敏,入库
- DSClient.project_code(name) 查映射;未注册抛 RuntimeError 提示
  加 conf;workspace 脚本不再硬编 magic number
- [projects] 段可选(向后兼容旧 ds.ini)
- cli 加 put 子命令(与 post 对称):-j JSON / -d k=v 互斥 + -o 写文件
- post / put 共用 body 解析与互斥校验

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu 2 days ago
parent
commit
a4de3ae3ff
5 changed files with 146 additions and 5 deletions
  1. 7 0
      conf/ds.ini
  2. 20 1
      dw_base/ds/api.py
  3. 15 4
      dw_base/ds/cli.py
  4. 48 0
      tests/unit/ds/test_api.py
  5. 56 0
      tests/unit/ds/test_cli.py

+ 7 - 0
conf/ds.ini

@@ -3,7 +3,14 @@
 ;
 ; base_url:DS 3.4.1 默认 API 端口 12345 + path /dolphinscheduler
 ; token:admin 令牌,权限顶级;后续按需迁 datasource/ds/ 不入库
+; [projects]:项目名 → projectCode 映射;DSClient.project_code(name) 查
+;   workspace 脚本不再硬编 magic number;项目码不算高敏,入库
 
 [dolphinscheduler]
 base_url = http://100.64.0.61:12345/dolphinscheduler
 token = 921755ce53b15c74d417978802eda884
+
+[projects]
+poyee-data-warehouse = 171673813105472
+manual = 171673781143360
+test = 171965331686208

+ 20 - 1
dw_base/ds/api.py

@@ -3,10 +3,11 @@
 DolphinScheduler API 客户端最小封装。
 
 职责:
-- 读 conf/ds.ini 拿 base_url + token
+- 读 conf/ds.ini 拿 base_url + token + 项目码映射
 - requests.Session 注入 token header
 - 通用 get / post / put:2xx 返 JSON;非 2xx 抛 RuntimeError;
   expect_business_ok=True 时还会检查 DS 业务级 code == 0
+- project_code(name) 查 conf [projects] 段,避免 workspace 脚本硬编 magic number
 - 不预封任何具体 endpoint,调用方按需拼 path
 
 DS 业务级响应约定:
@@ -46,6 +47,11 @@ class DSClient:
         cp.read(path, encoding='utf-8')
         self.base_url = cp.get('dolphinscheduler', 'base_url').rstrip('/')
         self.token = cp.get('dolphinscheduler', 'token')
+        # [projects] 段可选(向后兼容旧 ds.ini);存在则读为 {name: code} dict
+        self._projects = {}  # type: Dict[str, int]
+        if cp.has_section('projects'):
+            for name, code_str in cp.items('projects'):
+                self._projects[name] = int(code_str)
         self.session = requests.Session()
         self.session.headers['token'] = self.token
 
@@ -53,6 +59,19 @@ class DSClient:
         # type: (str) -> str
         return self.base_url + '/' + path.lstrip('/')
 
+    def project_code(self, name):
+        # type: (str) -> int
+        """查 conf/ds.ini [projects] 段拿项目码。
+
+        未注册抛 RuntimeError 提示加 conf。避免 workspace 脚本硬编
+        magic number;项目码不算高敏,入库 ds.ini。
+        """
+        if name not in self._projects:
+            raise RuntimeError(
+                "项目码未注册:'{}';conf/ds.ini [projects] 段加 '{} = <code>'".format(
+                    name, name))
+        return self._projects[name]
+
     def _request(self, method, path,
                  params=None, json_body=None, form_data=None,
                  expect_business_ok=False):

+ 15 - 4
dw_base/ds/cli.py

@@ -3,12 +3,14 @@
 DolphinScheduler API CLI 入口。
 
 用法:
-  python -m dw_base.ds.cli get <path> [-p k=v ...]
-  python -m dw_base.ds.cli post <path> [-j '<json-body>']
+  python -m dw_base.ds.cli get  <path> [-p k=v ...]
+  python -m dw_base.ds.cli post <path> [-j '<json-body>' | -d k=v ...]
+  python -m dw_base.ds.cli put  <path> [-j '<json-body>' | -d k=v ...]
 
 示例:
   python -m dw_base.ds.cli get /projects -p pageSize=10 -p pageNo=1
-  python -m dw_base.ds.cli post /projects/123/process-definition/query -j '{}'
+  python -m dw_base.ds.cli post /projects/123/workflow-definition -d name=wf -d desc=hi
+  python -m dw_base.ds.cli put /projects/123/workflow-definition/100 -j '{"name":"wf"}'
 """
 import argparse
 import json
@@ -45,18 +47,27 @@ def main(argv=None):
                    help='form-data 参数(可多次,DS 3.x 多用 form-encoded;与 -j 互斥)')
     p.add_argument('-o', dest='output', default=None, metavar='FILE', help='写 JSON 到文件(不打 stdout)')
 
+    u = sub.add_parser('put', help='HTTP PUT(如 workflow-definition 改造)')
+    u.add_argument('path')
+    u.add_argument('-j', dest='body', default=None, metavar='JSON', help='JSON body(与 -d 互斥)')
+    u.add_argument('-d', dest='form', action='append', default=[], metavar='k=v',
+                   help='form-data 参数(可多次;与 -j 互斥)')
+    u.add_argument('-o', dest='output', default=None, metavar='FILE', help='写 JSON 到文件(不打 stdout)')
+
     args = parser.parse_args(argv)
     client = DSClient()
 
     if args.cmd == 'get':
         result = client.get(args.path, params=_parse_kv(args.p))
     else:
+        # post / put 共用 body / form 解析与互斥校验
         body = json.loads(args.body) if args.body else None
         form = _parse_kv(args.form) if args.form else None
         if body is not None and form is not None:
             sys.stderr.write('-j 与 -d 互斥\n')
             sys.exit(1)
-        result = client.post(args.path, json_body=body, form_data=form)
+        method = client.put if args.cmd == 'put' else client.post
+        result = method(args.path, json_body=body, form_data=form)
 
     text = json.dumps(result, ensure_ascii=False, indent=2)
     if args.output:

+ 48 - 0
tests/unit/ds/test_api.py

@@ -248,3 +248,51 @@ def test_business_ok_skips_non_dict_response(client):
     with patch.object(client.session, 'request') as mock_req:
         mock_req.return_value = _mock_resp(200, [1, 2, 3])
         assert client.get('/p', expect_business_ok=True) == [1, 2, 3]
+
+
+# --- project_code([projects] 段映射)---
+
+def test_project_code_loaded_from_conf(tmp_path):
+    conf = tmp_path / 'ds.ini'
+    conf.write_text(
+        '[dolphinscheduler]\n'
+        'base_url = http://example/dolphinscheduler\n'
+        'token = T\n'
+        '\n'
+        '[projects]\n'
+        'poyee-data-warehouse = 171673813105472\n'
+        'manual = 171673781143360\n'
+        'test = 171965331686208\n',
+        encoding='utf-8',
+    )
+    c = DSClient(conf_path=str(conf))
+    assert c.project_code('poyee-data-warehouse') == 171673813105472
+    assert c.project_code('manual') == 171673781143360
+    assert c.project_code('test') == 171965331686208
+
+
+def test_project_code_unknown_name_raises(tmp_path):
+    conf = tmp_path / 'ds.ini'
+    conf.write_text(
+        '[dolphinscheduler]\n'
+        'base_url = http://example/dolphinscheduler\n'
+        'token = T\n'
+        '[projects]\n'
+        'foo = 1\n',
+        encoding='utf-8',
+    )
+    c = DSClient(conf_path=str(conf))
+    with pytest.raises(RuntimeError) as exc:
+        c.project_code('bar')
+    msg = str(exc.value)
+    assert '项目码未注册' in msg
+    assert 'bar' in msg
+    assert '[projects]' in msg
+
+
+def test_project_code_section_optional(client):
+    """[projects] 段缺失时不报错;查询任何 name 抛 '未注册'。"""
+    # fixture 的 ds.ini 没有 [projects] 段
+    assert client._projects == {}
+    with pytest.raises(RuntimeError, match='项目码未注册'):
+        client.project_code('any')

+ 56 - 0
tests/unit/ds/test_cli.py

@@ -97,3 +97,59 @@ def test_kv_invalid_format_skipped(capsys):
         cli.main(['get', '/p', '-p', 'no_equals', '-p', 'k=v'])
         inst.get.assert_called_once_with('/p', params={'k': 'v'})
         assert '需要 k=v 格式' in capsys.readouterr().err
+
+
+# --- put 子命令(与 post 对称) ---
+
+@patch('dw_base.ds.cli.DSClient')
+def test_put_with_json_body(MockClient, capsys):
+    inst = MagicMock()
+    inst.put.return_value = {'ok': True}
+    MockClient.return_value = inst
+
+    cli.main(['put', '/foo/1', '-j', '{"a":1}'])
+
+    inst.put.assert_called_once_with('/foo/1', json_body={'a': 1}, form_data=None)
+
+
+@patch('dw_base.ds.cli.DSClient')
+def test_put_with_form_data(MockClient, capsys):
+    inst = MagicMock()
+    inst.put.return_value = {'ok': True}
+    MockClient.return_value = inst
+
+    cli.main(['put', '/foo/1', '-d', 'name=x', '-d', 'desc=hi'])
+
+    inst.put.assert_called_once_with(
+        '/foo/1', json_body=None, form_data={'name': 'x', 'desc': 'hi'})
+
+
+@patch('dw_base.ds.cli.DSClient')
+def test_put_no_body(MockClient, capsys):
+    inst = MagicMock()
+    inst.put.return_value = {}
+    MockClient.return_value = inst
+
+    cli.main(['put', '/foo/1'])
+
+    inst.put.assert_called_once_with('/foo/1', json_body=None, form_data=None)
+
+
+def test_put_j_and_d_mutex_exits():
+    with pytest.raises(SystemExit):
+        cli.main(['put', '/foo/1', '-j', '{}', '-d', 'k=v'])
+
+
+@patch('dw_base.ds.cli.DSClient')
+def test_put_o_writes_file(MockClient, tmp_path, capsys):
+    inst = MagicMock()
+    inst.put.return_value = {'code': 0}
+    MockClient.return_value = inst
+
+    out_file = tmp_path / 'out.json'
+    cli.main(['put', '/foo/1', '-d', 'k=v', '-o', str(out_file)])
+
+    assert json.loads(out_file.read_text(encoding='utf-8')) == {'code': 0}
+    captured = capsys.readouterr()
+    assert captured.out == ''
+    assert '已写到' in captured.err