ソースを参照

feat(ds): DSClient 加 put + 抽 _request + 业务码校验

- put 方法补齐(与 get/post 对称);不再绕 client.session.put 私
  访 _url
- 抽 _request 内部 helper,3 公共方法共用 status / json / SPA
  fallback 抛错(去 30 行复制)
- 加 expect_business_ok 选项:2xx + code==0 才返,否则抛带完整
  response 的 RuntimeError;workspace 脚本 4 份 post_check helper
  可复用此能力消除重复

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu 6 時間 前
コミット
7342294fd8
2 ファイル変更217 行追加75 行削除
  1. 56 25
      dw_base/ds/api.py
  2. 161 50
      tests/unit/ds/test_api.py

+ 56 - 25
dw_base/ds/api.py

@@ -5,16 +5,25 @@ DolphinScheduler API 客户端最小封装。
 职责:
 - 读 conf/ds.ini 拿 base_url + token
 - requests.Session 注入 token header
-- 通用 get / post,2xx 返回 json,非 2xx 抛 RuntimeError
+- 通用 get / post / put:2xx 返 JSON;非 2xx 抛 RuntimeError;
+  expect_business_ok=True 时还会检查 DS 业务级 code == 0
 - 不预封任何具体 endpoint,调用方按需拼 path
 
+DS 业务级响应约定:
+- 2xx HTTP + body 形如 {"code": 0, "data": {...}, "msg": "success"}
+- code != 0 表示业务级失败(参数错 / 权限 / 资源冲突);HTTP 仍 2xx
+- expect_business_ok=True 把这两层失败收口到一个 RuntimeError 里
+  含 method + path + 完整 response,caller 不再需要自己写 post_check helper
+
 DS 3.4 PUT /workflow-definition/{code} 必传字段(实证踩坑):
 - taskDefinitionJson 中每个 task 字典必须含 version 字段——DS 端用 (code, version) 识别
   task identity;缺则被当新建 task 处理,原 task 被丢弃
 - taskRelationJson 中 preTaskVersion / postTaskVersion 必须用任务真实 version,不能硬编 0
 - locations 字段必须传 [{taskCode, x, y}, ...] 含全部 task 坐标,否则 UI 不渲染节点
   (taskDefinitionList 数据上有,但 DAG 看不到)
-实现参考:workspace/20260507/extend_raw_to_ods_workflows.py(含 verify_checklist 校验)
+工作流 body 拼装:dw_base/ds/builders.py
+工作流查询 / 状态切换:dw_base/ds/workflow.py
+完整使用样板:workspace/20260507/extend_raw_to_ods_workflows.py
 """
 import os
 from configparser import ConfigParser
@@ -28,7 +37,8 @@ _DEFAULT_CONF_PATH = os.path.join(_PROJECT_ROOT, 'conf', 'ds.ini')
 
 
 class DSClient:
-    def __init__(self, conf_path: Optional[str] = None):
+    def __init__(self, conf_path=None):
+        # type: (Optional[str]) -> None
         path = conf_path or _DEFAULT_CONF_PATH
         if not os.path.isfile(path):
             raise RuntimeError('DS 配置文件不存在:' + path)
@@ -39,33 +49,54 @@ class DSClient:
         self.session = requests.Session()
         self.session.headers['token'] = self.token
 
-    def _url(self, path: str) -> str:
+    def _url(self, path):
+        # type: (str) -> str
         return self.base_url + '/' + path.lstrip('/')
 
-    def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
-        resp = self.session.get(self._url(path), params=params)
-        if resp.status_code // 100 != 2:
-            raise RuntimeError('GET {} failed rc={}: {}'.format(
-                path, resp.status_code, resp.text))
-        try:
-            return resp.json()
-        except ValueError:
-            raise RuntimeError('GET {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format(
-                path, resp.text[:200]))
-
-    def post(self, path: str, json_body: Optional[Dict[str, Any]] = None,
-             form_data: Optional[Dict[str, Any]] = None) -> Any:
+    def _request(self, method, path,
+                 params=None, json_body=None, form_data=None,
+                 expect_business_ok=False):
+        # type: (str, str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any
+        """get / post / put 共用。3 层错误统一收口:
+        1. 非 2xx HTTP → RuntimeError 含 method/path/rc/text
+        2. 2xx + 非 JSON(path 错被 SPA fallback 返 HTML)→ RuntimeError 含 raw text 前 200 字符
+        3. expect_business_ok=True 且 dict 响应 code != 0 → RuntimeError 含完整 response
+        """
         if json_body is not None and form_data is not None:
             raise ValueError('json_body 与 form_data 互斥')
+        kwargs = {}
+        if params is not None:
+            kwargs['params'] = params
         if form_data is not None:
-            resp = self.session.post(self._url(path), data=form_data)
-        else:
-            resp = self.session.post(self._url(path), json=json_body)
+            kwargs['data'] = form_data
+        elif json_body is not None:
+            kwargs['json'] = json_body
+        resp = self.session.request(method, self._url(path), **kwargs)
         if resp.status_code // 100 != 2:
-            raise RuntimeError('POST {} failed rc={}: {}'.format(
-                path, resp.status_code, resp.text))
+            raise RuntimeError('{} {} failed rc={}: {}'.format(
+                method, path, resp.status_code, resp.text))
         try:
-            return resp.json()
+            data = resp.json()
         except ValueError:
-            raise RuntimeError('POST {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format(
-                path, resp.text[:200]))
+            raise RuntimeError('{} {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format(
+                method, path, resp.text[:200]))
+        if expect_business_ok and isinstance(data, dict) and data.get('code') != 0:
+            raise RuntimeError('{} {} 业务码 != 0:{}'.format(method, path, data))
+        return data
+
+    def get(self, path, params=None, expect_business_ok=False):
+        # type: (str, Optional[Dict[str, Any]], bool) -> Any
+        return self._request('GET', path, params=params,
+                             expect_business_ok=expect_business_ok)
+
+    def post(self, path, json_body=None, form_data=None, expect_business_ok=False):
+        # type: (str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any
+        return self._request('POST', path,
+                             json_body=json_body, form_data=form_data,
+                             expect_business_ok=expect_business_ok)
+
+    def put(self, path, json_body=None, form_data=None, expect_business_ok=False):
+        # type: (str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any
+        return self._request('PUT', path,
+                             json_body=json_body, form_data=form_data,
+                             expect_business_ok=expect_business_ok)

+ 161 - 50
tests/unit/ds/test_api.py

@@ -18,6 +18,18 @@ def client(tmp_path):
     return DSClient(conf_path=str(conf))
 
 
+def _mock_resp(status_code=200, json_data=None, text=''):
+    """构造 requests.Response mock。json_data=None 表示 .json() 抛 ValueError。"""
+    r = MagicMock(status_code=status_code, text=text)
+    if json_data is None:
+        r.json.side_effect = ValueError('not json')
+    else:
+        r.json.return_value = json_data
+    return r
+
+
+# --- 初始化 ---
+
 def test_init_loads_conf(client):
     assert client.base_url == 'http://example/dolphinscheduler'
     assert client.token == 'TEST_TOKEN'
@@ -41,40 +53,48 @@ def test_init_strips_trailing_slash(tmp_path):
     assert c.base_url == 'http://example/dolphinscheduler'
 
 
+# --- GET ---
+
 def test_get_2xx_returns_json(client):
-    with patch.object(client.session, 'get') as mock_get:
-        resp = MagicMock(status_code=200)
-        resp.json.return_value = {'code': 0, 'data': 'ok'}
-        mock_get.return_value = resp
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, {'code': 0, 'data': 'ok'})
         assert client.get('/projects') == {'code': 0, 'data': 'ok'}
-        mock_get.assert_called_once_with(
-            'http://example/dolphinscheduler/projects', params=None)
+        # 不传 params 时不应在 kwargs 出现 params
+        mock_req.assert_called_once_with(
+            'GET', 'http://example/dolphinscheduler/projects')
+
+
+def test_get_with_params(client):
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, {})
+        client.get('/projects', params={'pageNo': 1, 'pageSize': 10})
+        mock_req.assert_called_once_with(
+            'GET', 'http://example/dolphinscheduler/projects',
+            params={'pageNo': 1, 'pageSize': 10})
 
 
 def test_get_strips_leading_slash(client):
     """path 带或不带前导 / 都拼成相同 URL。"""
-    with patch.object(client.session, 'get') as mock_get:
-        resp = MagicMock(status_code=200)
-        resp.json.return_value = {}
-        mock_get.return_value = resp
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, {})
         client.get('projects')
-        assert mock_get.call_args[0][0] == 'http://example/dolphinscheduler/projects'
+        assert mock_req.call_args[0][1] == 'http://example/dolphinscheduler/projects'
 
 
 def test_get_non_2xx_raises(client):
-    with patch.object(client.session, 'get') as mock_get:
-        mock_get.return_value = MagicMock(status_code=500, text='Internal Server Error')
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(500, json_data={}, text='Internal Server Error')
+        # 改 status_code 不影响 json mock
+        mock_req.return_value.status_code = 500
         with pytest.raises(RuntimeError, match='GET .*failed rc=500'):
             client.get('/projects')
 
 
 def test_get_2xx_non_json_raises_friendly(client):
-    """新特性:path 错被 SPA fallback 时抛 RuntimeError 含 raw text 前 200 字符。"""
-    with patch.object(client.session, 'get') as mock_get:
-        resp = MagicMock(status_code=200)
-        resp.json.side_effect = ValueError('Expecting value')
-        resp.text = '<!DOCTYPE html><html>fallback</html>'
-        mock_get.return_value = resp
+    """path 错被 SPA fallback 时抛 RuntimeError 含 raw text 前 200 字符。"""
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, json_data=None,
+                                           text='<!DOCTYPE html><html>fallback</html>')
         with pytest.raises(RuntimeError) as exc:
             client.get('/wrong/path')
         msg = str(exc.value)
@@ -84,56 +104,147 @@ def test_get_2xx_non_json_raises_friendly(client):
 
 
 def test_get_2xx_non_json_truncates_to_200(client):
-    with patch.object(client.session, 'get') as mock_get:
-        resp = MagicMock(status_code=200)
-        resp.json.side_effect = ValueError()
-        resp.text = 'X' * 500
-        mock_get.return_value = resp
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, json_data=None, text='X' * 500)
         with pytest.raises(RuntimeError) as exc:
             client.get('/p')
-        # 错误信息含 200 个 X,不含 500 个
         assert 'X' * 200 in str(exc.value)
         assert 'X' * 201 not in str(exc.value)
 
 
-def test_post_2xx_returns_json(client):
-    with patch.object(client.session, 'post') as mock_post:
-        resp = MagicMock(status_code=200)
-        resp.json.return_value = {'ok': True}
-        mock_post.return_value = resp
+# --- POST ---
+
+def test_post_json_body(client):
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, {'ok': True})
         assert client.post('/foo', json_body={'k': 'v'}) == {'ok': True}
-        mock_post.assert_called_once_with(
-            'http://example/dolphinscheduler/foo', json={'k': 'v'})
+        mock_req.assert_called_once_with(
+            'POST', 'http://example/dolphinscheduler/foo', json={'k': 'v'})
+
+
+def test_post_form_data_uses_data_kwarg(client):
+    """form_data 走 requests data= kwarg(form-encoded)。"""
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, {'ok': True})
+        client.post('/foo', form_data={'name': 'x', 'desc': 'hi'})
+        mock_req.assert_called_once_with(
+            'POST', 'http://example/dolphinscheduler/foo',
+            data={'name': 'x', 'desc': 'hi'})
+
+
+def test_post_no_body(client):
+    """不传 body 时 kwargs 不应有 data / json。"""
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, {})
+        client.post('/foo')
+        mock_req.assert_called_once_with(
+            'POST', 'http://example/dolphinscheduler/foo')
 
 
 def test_post_2xx_non_json_raises_friendly(client):
-    with patch.object(client.session, 'post') as mock_post:
-        resp = MagicMock(status_code=200)
-        resp.json.side_effect = ValueError()
-        resp.text = 'NOT JSON'
-        mock_post.return_value = resp
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, json_data=None, text='NOT JSON')
         with pytest.raises(RuntimeError, match='POST.*返回非 JSON'):
             client.post('/foo')
 
 
 def test_post_non_2xx_raises(client):
-    with patch.object(client.session, 'post') as mock_post:
-        mock_post.return_value = MagicMock(status_code=403, text='Forbidden')
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(403, {}, text='Forbidden')
+        mock_req.return_value.status_code = 403
         with pytest.raises(RuntimeError, match='POST .*failed rc=403'):
             client.post('/foo')
 
 
-def test_post_form_data_uses_data_kwarg(client):
-    """form_data 走 requests data= kwarg(form-encoded),不走 json="""
-    with patch.object(client.session, 'post') as mock_post:
-        resp = MagicMock(status_code=200)
-        resp.json.return_value = {'ok': True}
-        mock_post.return_value = resp
-        client.post('/foo', form_data={'name': 'x', 'desc': 'hi'})
-        mock_post.assert_called_once_with(
-            'http://example/dolphinscheduler/foo', data={'name': 'x', 'desc': 'hi'})
-
-
 def test_post_json_and_form_mutex(client):
     with pytest.raises(ValueError, match='互斥'):
         client.post('/foo', json_body={'k': 'v'}, form_data={'k': 'v'})
+
+
+# --- PUT(与 POST 对称) ---
+
+def test_put_json_body(client):
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, {'ok': True})
+        assert client.put('/foo/1', json_body={'k': 'v'}) == {'ok': True}
+        mock_req.assert_called_once_with(
+            'PUT', 'http://example/dolphinscheduler/foo/1', json={'k': 'v'})
+
+
+def test_put_form_data(client):
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, {'ok': True})
+        client.put('/foo/1', form_data={'name': 'x'})
+        mock_req.assert_called_once_with(
+            'PUT', 'http://example/dolphinscheduler/foo/1', data={'name': 'x'})
+
+
+def test_put_non_2xx_raises(client):
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(400, {}, text='Bad Request')
+        mock_req.return_value.status_code = 400
+        with pytest.raises(RuntimeError, match='PUT .*failed rc=400'):
+            client.put('/foo/1')
+
+
+def test_put_2xx_non_json_raises(client):
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, json_data=None, text='HTML')
+        with pytest.raises(RuntimeError, match='PUT.*返回非 JSON'):
+            client.put('/foo/1')
+
+
+def test_put_json_and_form_mutex(client):
+    with pytest.raises(ValueError, match='互斥'):
+        client.put('/foo/1', json_body={'k': 'v'}, form_data={'k': 'v'})
+
+
+# --- expect_business_ok:业务码 != 0 抛错 ---
+
+def test_business_ok_pass_when_code_zero(client):
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, {'code': 0, 'data': {'id': 1}})
+        result = client.post('/foo', form_data={}, expect_business_ok=True)
+        assert result == {'code': 0, 'data': {'id': 1}}
+
+
+def test_business_ok_raises_when_code_nonzero(client):
+    """2xx + code != 0 抛 RuntimeError 含完整 response。"""
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(
+            200, {'code': 10105, 'msg': "Required request parameter 'name' is not present"})
+        with pytest.raises(RuntimeError) as exc:
+            client.post('/foo', form_data={}, expect_business_ok=True)
+        msg = str(exc.value)
+        assert 'POST /foo' in msg
+        assert '业务码 != 0' in msg
+        assert '10105' in msg
+
+
+def test_business_ok_off_default_passes_nonzero_code(client):
+    """expect_business_ok=False(默认)时 code != 0 不抛错,原样返。"""
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, {'code': 50036, 'msg': 'oops'})
+        result = client.post('/foo', form_data={})
+        assert result == {'code': 50036, 'msg': 'oops'}
+
+
+def test_business_ok_works_for_get(client):
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, {'code': 1, 'msg': 'fail'})
+        with pytest.raises(RuntimeError, match='GET .*业务码 != 0'):
+            client.get('/projects', expect_business_ok=True)
+
+
+def test_business_ok_works_for_put(client):
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, {'code': 7, 'msg': 'denied'})
+        with pytest.raises(RuntimeError, match='PUT .*业务码 != 0'):
+            client.put('/foo/1', form_data={}, expect_business_ok=True)
+
+
+def test_business_ok_skips_non_dict_response(client):
+    """响应不是 dict(罕见,如 list)时 business_ok 不应抛错。"""
+    with patch.object(client.session, 'request') as mock_req:
+        mock_req.return_value = _mock_resp(200, [1, 2, 3])
+        assert client.get('/p', expect_business_ok=True) == [1, 2, 3]