Ver Fonte

feat(ds): 加 workflow.py — 工作流查询 / 状态切换领域 helper

抽 4 个 workspace 脚本反复重写的 DS 操作(不预封所有 endpoint):

- find_workflow_by_name / list_workflow_names —— idempotent 跳过用
- get_workflow_full —— PUT 改造前拿真实 task version
- gen_task_codes —— 建工作流前置(传 0 报 code 50036)
- find_schedule_by_workflow —— 二次过滤 totalList 防误命中
- release_workflow / release_schedule —— ONLINE/OFFLINE 切换;
  schedule 走 /online + /offline 两条独立 endpoint

全部走 DSClient(expect_business_ok=True) 业务码兜底。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu há 21 horas atrás
pai
commit
f5861f1e0c
2 ficheiros alterados com 323 adições e 0 exclusões
  1. 117 0
      dw_base/ds/workflow.py
  2. 206 0
      tests/unit/ds/test_workflow.py

+ 117 - 0
dw_base/ds/workflow.py

@@ -0,0 +1,117 @@
+# -*- coding:utf-8 -*-
+"""
+DolphinScheduler 工作流 / schedule 查询 + 状态切换领域 helper。
+
+只抽 workspace 脚本反复重写 ≥ 2 次的操作;不预封所有 endpoint
+(DS 升级 endpoint 全要跟,一次性脚本生命周期短,直接拼 path 干净)。
+
+所有 helper 都用 DSClient(expect_business_ok=True),业务码 != 0
+直接抛 RuntimeError 含完整 response。
+"""
+from typing import Any, Dict, List, Optional
+
+from dw_base.ds.api import DSClient
+
+
+def find_workflow_by_name(client, project_code, name, page_size=200):
+    # type: (DSClient, int, str, int) -> Optional[Dict[str, Any]]
+    """按名找工作流,找不到返 None。
+
+    搜索仅在前 page_size 条内(默认 200,覆盖中等规模项目)。
+    """
+    resp = client.get(
+        '/projects/{}/workflow-definition'.format(project_code),
+        params={'pageNo': 1, 'pageSize': page_size, 'searchVal': ''},
+        expect_business_ok=True,
+    )
+    for w in resp['data']['totalList']:
+        if w['name'] == name:
+            return w
+    return None
+
+
+def list_workflow_names(client, project_code, page_size=200):
+    # type: (DSClient, int, int) -> List[str]
+    """列项目下所有工作流名(idempotent 跳过用)。"""
+    resp = client.get(
+        '/projects/{}/workflow-definition'.format(project_code),
+        params={'pageNo': 1, 'pageSize': page_size, 'searchVal': ''},
+        expect_business_ok=True,
+    )
+    return [w['name'] for w in resp['data']['totalList']]
+
+
+def get_workflow_full(client, project_code, wf_code):
+    # type: (DSClient, int, int) -> Dict[str, Any]
+    """拿工作流完整定义。
+
+    返 {workflowDefinition, taskDefinitionList, workflowTaskRelationList}。
+    PUT 改造前必跑:从 taskDefinitionList[0]['version'] 拿真实 version
+    (不能硬编 0)。
+    """
+    resp = client.get(
+        '/projects/{}/workflow-definition/{}'.format(project_code, wf_code),
+        expect_business_ok=True,
+    )
+    return resp['data']
+
+
+def gen_task_codes(client, project_code, n=1):
+    # type: (DSClient, int, int) -> List[int]
+    """前置 GET,拿 n 个真实 long taskCode。
+
+    建工作流前必跑:传 0 报 code 50036 check workflow task relation error。
+    """
+    resp = client.get(
+        '/projects/{}/task-definition/gen-task-codes'.format(project_code),
+        params={'genNum': n},
+        expect_business_ok=True,
+    )
+    return resp['data']
+
+
+def find_schedule_by_workflow(client, project_code, wf_code):
+    # type: (DSClient, int, int) -> Optional[Dict[str, Any]]
+    """按 workflow code 找 schedule,找不到返 None。
+
+    返字典含 id / releaseState / workflowDefinitionCode 等字段。
+    """
+    resp = client.get(
+        '/projects/{}/schedules'.format(project_code),
+        params={'pageNo': 1, 'pageSize': 100, 'searchVal': '',
+                'workflowDefinitionCode': wf_code},
+        expect_business_ok=True,
+    )
+    for s in resp['data']['totalList']:
+        if s.get('workflowDefinitionCode') == wf_code:
+            return s
+    return None
+
+
+def release_workflow(client, project_code, wf_code, state):
+    # type: (DSClient, int, int, str) -> Dict[str, Any]
+    """切 workflow 上下线状态。state in {'ONLINE', 'OFFLINE'}。"""
+    if state not in ('ONLINE', 'OFFLINE'):
+        raise ValueError("state 必须是 'ONLINE' 或 'OFFLINE':" + repr(state))
+    return client.post(
+        '/projects/{}/workflow-definition/{}/release'.format(project_code, wf_code),
+        form_data={'releaseState': state},
+        expect_business_ok=True,
+    )
+
+
+def release_schedule(client, project_code, sched_id, state):
+    # type: (DSClient, int, int, str) -> Dict[str, Any]
+    """切 schedule 上下线状态。state in {'ONLINE', 'OFFLINE'}。
+
+    DS 把 ONLINE / OFFLINE 拆成两条 endpoint:/schedules/{id}/online
+    与 /schedules/{id}/offline,body 空 form_data。
+    """
+    if state not in ('ONLINE', 'OFFLINE'):
+        raise ValueError("state 必须是 'ONLINE' 或 'OFFLINE':" + repr(state))
+    suffix = 'online' if state == 'ONLINE' else 'offline'
+    return client.post(
+        '/projects/{}/schedules/{}/{}'.format(project_code, sched_id, suffix),
+        form_data={},
+        expect_business_ok=True,
+    )

+ 206 - 0
tests/unit/ds/test_workflow.py

@@ -0,0 +1,206 @@
+# -*- coding:utf-8 -*-
+from unittest.mock import MagicMock
+
+import pytest
+
+from dw_base.ds.workflow import (
+    find_schedule_by_workflow,
+    find_workflow_by_name,
+    gen_task_codes,
+    get_workflow_full,
+    list_workflow_names,
+    release_schedule,
+    release_workflow,
+)
+
+
+PC = 999  # 项目码占位
+
+
+def _make_client():
+    return MagicMock()
+
+
+# --- find_workflow_by_name ---
+
+def test_find_workflow_by_name_hit():
+    client = _make_client()
+    client.get.return_value = {
+        'code': 0,
+        'data': {'totalList': [
+            {'name': 'wf_a', 'code': 100},
+            {'name': 'wf_b', 'code': 200},
+        ]}
+    }
+    result = find_workflow_by_name(client, PC, 'wf_b')
+    assert result == {'name': 'wf_b', 'code': 200}
+    client.get.assert_called_once_with(
+        '/projects/999/workflow-definition',
+        params={'pageNo': 1, 'pageSize': 200, 'searchVal': ''},
+        expect_business_ok=True,
+    )
+
+
+def test_find_workflow_by_name_miss_returns_none():
+    client = _make_client()
+    client.get.return_value = {'code': 0, 'data': {'totalList': []}}
+    assert find_workflow_by_name(client, PC, 'wf_x') is None
+
+
+def test_find_workflow_by_name_custom_page_size():
+    client = _make_client()
+    client.get.return_value = {'code': 0, 'data': {'totalList': []}}
+    find_workflow_by_name(client, PC, 'x', page_size=50)
+    assert client.get.call_args[1]['params']['pageSize'] == 50
+
+
+# --- list_workflow_names ---
+
+def test_list_workflow_names():
+    client = _make_client()
+    client.get.return_value = {
+        'code': 0,
+        'data': {'totalList': [
+            {'name': 'a'}, {'name': 'b'}, {'name': 'c'},
+        ]}
+    }
+    assert list_workflow_names(client, PC) == ['a', 'b', 'c']
+
+
+def test_list_workflow_names_empty():
+    client = _make_client()
+    client.get.return_value = {'code': 0, 'data': {'totalList': []}}
+    assert list_workflow_names(client, PC) == []
+
+
+# --- get_workflow_full ---
+
+def test_get_workflow_full():
+    client = _make_client()
+    payload = {
+        'workflowDefinition': {'code': 100, 'name': 'x'},
+        'taskDefinitionList': [{'code': 1, 'version': 1}],
+        'workflowTaskRelationList': [],
+    }
+    client.get.return_value = {'code': 0, 'data': payload}
+    assert get_workflow_full(client, PC, 100) == payload
+    client.get.assert_called_once_with(
+        '/projects/999/workflow-definition/100',
+        expect_business_ok=True,
+    )
+
+
+# --- gen_task_codes ---
+
+def test_gen_task_codes_default_one():
+    client = _make_client()
+    client.get.return_value = {'code': 0, 'data': [171234567890]}
+    codes = gen_task_codes(client, PC)
+    assert codes == [171234567890]
+    client.get.assert_called_once_with(
+        '/projects/999/task-definition/gen-task-codes',
+        params={'genNum': 1},
+        expect_business_ok=True,
+    )
+
+
+def test_gen_task_codes_multi():
+    client = _make_client()
+    client.get.return_value = {'code': 0, 'data': [1, 2, 3]}
+    assert gen_task_codes(client, PC, n=3) == [1, 2, 3]
+    assert client.get.call_args[1]['params']['genNum'] == 3
+
+
+# --- find_schedule_by_workflow ---
+
+def test_find_schedule_hit():
+    client = _make_client()
+    client.get.return_value = {
+        'code': 0,
+        'data': {'totalList': [
+            {'id': 5, 'workflowDefinitionCode': 100, 'releaseState': 'ONLINE'},
+        ]}
+    }
+    s = find_schedule_by_workflow(client, PC, 100)
+    assert s['id'] == 5
+    assert s['releaseState'] == 'ONLINE'
+    client.get.assert_called_once_with(
+        '/projects/999/schedules',
+        params={'pageNo': 1, 'pageSize': 100, 'searchVal': '',
+                'workflowDefinitionCode': 100},
+        expect_business_ok=True,
+    )
+
+
+def test_find_schedule_miss_returns_none():
+    client = _make_client()
+    client.get.return_value = {'code': 0, 'data': {'totalList': []}}
+    assert find_schedule_by_workflow(client, PC, 100) is None
+
+
+def test_find_schedule_filters_mismatched_wf_code():
+    """totalList 可能含其他 workflow 的 schedule(保险二次过滤)。"""
+    client = _make_client()
+    client.get.return_value = {
+        'code': 0,
+        'data': {'totalList': [
+            {'id': 1, 'workflowDefinitionCode': 999},
+            {'id': 2, 'workflowDefinitionCode': 100},
+        ]}
+    }
+    s = find_schedule_by_workflow(client, PC, 100)
+    assert s['id'] == 2
+
+
+# --- release_workflow ---
+
+def test_release_workflow_online():
+    client = _make_client()
+    client.post.return_value = {'code': 0}
+    release_workflow(client, PC, 100, 'ONLINE')
+    client.post.assert_called_once_with(
+        '/projects/999/workflow-definition/100/release',
+        form_data={'releaseState': 'ONLINE'},
+        expect_business_ok=True,
+    )
+
+
+def test_release_workflow_offline():
+    client = _make_client()
+    client.post.return_value = {'code': 0}
+    release_workflow(client, PC, 100, 'OFFLINE')
+    assert client.post.call_args[1]['form_data'] == {'releaseState': 'OFFLINE'}
+
+
+def test_release_workflow_invalid_state_raises():
+    client = _make_client()
+    with pytest.raises(ValueError, match="ONLINE.*OFFLINE"):
+        release_workflow(client, PC, 100, 'PAUSE')
+    client.post.assert_not_called()
+
+
+# --- release_schedule ---
+
+def test_release_schedule_online_uses_online_endpoint():
+    client = _make_client()
+    client.post.return_value = {'code': 0}
+    release_schedule(client, PC, 5, 'ONLINE')
+    client.post.assert_called_once_with(
+        '/projects/999/schedules/5/online',
+        form_data={},
+        expect_business_ok=True,
+    )
+
+
+def test_release_schedule_offline_uses_offline_endpoint():
+    client = _make_client()
+    client.post.return_value = {'code': 0}
+    release_schedule(client, PC, 5, 'OFFLINE')
+    assert client.post.call_args[0][0] == '/projects/999/schedules/5/offline'
+
+
+def test_release_schedule_invalid_state_raises():
+    client = _make_client()
+    with pytest.raises(ValueError, match="ONLINE.*OFFLINE"):
+        release_schedule(client, PC, 5, 'PAUSE')
+    client.post.assert_not_called()