소스 검색

feat(ds): 加 builders.py — SHELL task / relations / workflow / schedule body 模板

抽 4 个 workspace 脚本反复重写的样板:

- SHELL_TASK_DEFAULTS 集中项目惯例默认值(workerGroup d1~d3 /
  failRetryTimes 0 / timeoutFlag CLOSE / taskExecuteType BATCH 等)
- build_shell_task:file_param 注入 ${file} localParams;overrides
  覆盖默认值;code/version 可选(POST 创建不传,PUT 必传)
- build_linear_relations:串行 task chain 自动生成入口 + 串接 relations
- build_workflow_body:内部 JSON 序列化 taskDefinitionJson /
  taskRelationJson / locations,返 form-encoded dict
- build_schedule_body:默认百年窗口 + Asia/Shanghai + d1~d3 + bigdata

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu 23 시간 전
부모
커밋
680ebbc661
2개의 변경된 파일363개의 추가작업 그리고 0개의 파일을 삭제
  1. 163 0
      dw_base/ds/builders.py
  2. 200 0
      tests/unit/ds/test_builders.py

+ 163 - 0
dw_base/ds/builders.py

@@ -0,0 +1,163 @@
+# -*- coding:utf-8 -*-
+"""
+DolphinScheduler form-encoded body 拼装模板。
+
+抽出 workspace 脚本反复重写的 SHELL task / relations / workflow body /
+schedule body 样板。项目惯例默认值集中在常量字典,特殊场景走 overrides。
+
+不预封 endpoint 调用——纯 dict 工厂。caller 拼好 body 后用
+DSClient.post / put 发出,过 expect_business_ok 收口业务级失败。
+"""
+import json
+from typing import Any, Dict, List, Optional, Sequence, Tuple
+
+
+# 项目惯例 SHELL task 默认值。来源:jobs/raw/* 跑得通的 task 字典共同字段。
+SHELL_TASK_DEFAULTS = {
+    'description': '',
+    'taskType': 'SHELL',
+    'flag': 'YES',
+    'taskPriority': 'MEDIUM',
+    'workerGroup': 'd1~d3',
+    'failRetryTimes': 0,
+    'failRetryInterval': 1,
+    'timeoutFlag': 'CLOSE',
+    'timeoutNotifyStrategy': 'WARN',
+    'timeout': 0,
+    'delayTime': 0,
+    'environmentCode': -1,
+    'taskGroupId': 0,
+    'taskGroupPriority': 0,
+    'cpuQuota': -1,
+    'memoryMax': -1,
+    'taskExecuteType': 'BATCH',
+}
+
+
+def build_shell_task(name, raw_script,
+                     file_param=None, code=None, version=None,
+                     **overrides):
+    # type: (str, str, Optional[str], Optional[int], Optional[int], Any) -> Dict[str, Any]
+    """构造单个 SHELL task 字典。
+
+    - file_param 非空时注入 localParams 一项
+      {prop:'file', direct:'IN', type:'VARCHAR', value:file_param}
+      (兼容现 inc_d / ods / init 工作流的 ${file} 注入约定)
+    - code 用于 task identity(建工作流前先 GET gen-task-codes 拿真实 long);
+      不传则不写入字典(POST 创建场景由 caller 在 main 里赋)
+    - version 用于 PUT 场景 task identity;新建 task 不传,DS 端自动赋 v1
+    - overrides 覆盖默认值(如 workerGroup / timeout / failRetryTimes)
+    """
+    task = dict(SHELL_TASK_DEFAULTS)
+    task['name'] = name
+    local_params = []
+    if file_param is not None:
+        local_params.append({'prop': 'file', 'direct': 'IN',
+                             'type': 'VARCHAR', 'value': file_param})
+    task['taskParams'] = {
+        'localParams': local_params,
+        'rawScript': raw_script,
+        'resourceList': [],
+    }
+    if code is not None:
+        task['code'] = code
+    if version is not None:
+        task['version'] = version
+    task.update(overrides)
+    return task
+
+
+def build_linear_relations(codes_with_versions):
+    # type: (Sequence[Tuple[int, int]]) -> List[Dict[str, Any]]
+    """串行 task chain 自动生成 relations。
+
+    输入 [(code1, v1), (code2, v2), (code3, v3)] →
+        [
+          {pre 0       → post code1 v1},   # 起点(preTaskCode=0 = 入口)
+          {pre code1 v1 → post code2 v2},
+          {pre code2 v2 → post code3 v3},
+        ]
+    新建 task 的 version 通常用 1(DS 端首版即 v1);
+    保留 task 的 version 必须从 GET workflow-definition/{code} 拿真实值,
+    不能硬编 0(实证:硬编 0 PUT 会丢任务,详见 dw_base/ds/api.py docstring)。
+    """
+    if not codes_with_versions:
+        return []
+    relations = [{
+        'name': '', 'preTaskCode': 0, 'preTaskVersion': 0,
+        'postTaskCode': codes_with_versions[0][0],
+        'postTaskVersion': codes_with_versions[0][1],
+        'conditionType': 'NONE', 'conditionParams': {},
+    }]
+    for i in range(1, len(codes_with_versions)):
+        prev_c, prev_v = codes_with_versions[i - 1]
+        cur_c, cur_v = codes_with_versions[i]
+        relations.append({
+            'name': '', 'preTaskCode': prev_c, 'preTaskVersion': prev_v,
+            'postTaskCode': cur_c, 'postTaskVersion': cur_v,
+            'conditionType': 'NONE', 'conditionParams': {},
+        })
+    return relations
+
+
+def build_workflow_body(name, description, tasks, relations,
+                        locations=None,
+                        execution_type='SERIAL_WAIT',
+                        timeout=0,
+                        global_params='[]'):
+    # type: (str, str, List[Dict[str, Any]], List[Dict[str, Any]], Optional[List[Dict[str, Any]]], str, int, str) -> Dict[str, Any]
+    """拼装 POST/PUT /workflow-definition[/{code}] 的 form-encoded body。
+
+    tasks / relations / locations 自动 JSON 序列化为字符串(DS API 要求 string 字段)。
+    locations:
+      - None / [] → 序列化为 '[]',POST 创建场景默认值(DS UI 自动布局)
+      - PUT 改造场景必须传非空 list[{taskCode, x, y}],否则 UI 不渲染节点
+        (详见 dw_base/ds/api.py docstring 与 workspace/20260507/extend_raw_to_ods_workflows.py)
+    """
+    return {
+        'name': name,
+        'description': description or '',
+        'globalParams': global_params,
+        'locations': json.dumps(locations or [], ensure_ascii=False),
+        'timeout': timeout,
+        'executionType': execution_type,
+        'taskDefinitionJson': json.dumps(tasks, ensure_ascii=False),
+        'taskRelationJson': json.dumps(relations, ensure_ascii=False),
+    }
+
+
+def build_schedule_body(workflow_code, crontab,
+                        start_time='2026-01-01 00:00:00',
+                        end_time='2126-01-01 00:00:00',
+                        timezone_id='Asia/Shanghai',
+                        worker_group='d1~d3',
+                        tenant='bigdata',
+                        failure_strategy='END',
+                        warning_type='NONE',
+                        warning_group_id=0,
+                        priority='MEDIUM',
+                        environment_code=-1):
+    # type: (int, str, str, str, str, str, str, str, str, int, str, int) -> Dict[str, Any]
+    """拼装 POST /schedules 的 form-encoded body。
+
+    schedule 字段是嵌套 JSON 字符串:含 crontab + 起止时间 + 时区。
+    默认起止 2026-01-01 ~ 2126-01-01(百年窗口);时区 Asia/Shanghai;
+    worker / tenant 用项目惯例 d1~d3 / bigdata。
+    """
+    schedule_inner = {
+        'startTime': start_time,
+        'endTime': end_time,
+        'timezoneId': timezone_id,
+        'crontab': crontab,
+    }
+    return {
+        'workflowDefinitionCode': workflow_code,
+        'schedule': json.dumps(schedule_inner, ensure_ascii=False),
+        'failureStrategy': failure_strategy,
+        'warningType': warning_type,
+        'warningGroupId': warning_group_id,
+        'workflowInstancePriority': priority,
+        'workerGroup': worker_group,
+        'tenantCode': tenant,
+        'environmentCode': environment_code,
+    }

+ 200 - 0
tests/unit/ds/test_builders.py

@@ -0,0 +1,200 @@
+# -*- coding:utf-8 -*-
+import json
+
+from dw_base.ds.builders import (
+    SHELL_TASK_DEFAULTS,
+    build_linear_relations,
+    build_schedule_body,
+    build_shell_task,
+    build_workflow_body,
+)
+
+
+# --- build_shell_task ---
+
+def test_shell_task_minimal():
+    """name + raw_script 最小调用,默认值齐全。"""
+    t = build_shell_task('first', 'echo hi')
+    assert t['name'] == 'first'
+    assert t['taskParams']['rawScript'] == 'echo hi'
+    assert t['taskParams']['localParams'] == []
+    assert t['taskParams']['resourceList'] == []
+    # 项目惯例默认值
+    assert t['taskType'] == 'SHELL'
+    assert t['workerGroup'] == 'd1~d3'
+    assert t['failRetryTimes'] == 0
+    assert t['timeoutFlag'] == 'CLOSE'
+    assert t['environmentCode'] == -1
+    assert t['taskExecuteType'] == 'BATCH'
+    # 不传 code / version 则不写入字典
+    assert 'code' not in t
+    assert 'version' not in t
+
+
+def test_shell_task_with_file_param():
+    """file_param 注入 localParams。"""
+    t = build_shell_task('foo', 'echo ${file}', file_param='jobs/raw/x.ini')
+    assert t['taskParams']['localParams'] == [
+        {'prop': 'file', 'direct': 'IN', 'type': 'VARCHAR',
+         'value': 'jobs/raw/x.ini'}
+    ]
+
+
+def test_shell_task_with_code_and_version():
+    """code 和 version 是 PUT 场景必传。"""
+    t = build_shell_task('foo', 'echo', code=171234567890, version=2)
+    assert t['code'] == 171234567890
+    assert t['version'] == 2
+
+
+def test_shell_task_overrides_default():
+    """overrides 覆盖默认值(如改 workerGroup 或 timeout)。"""
+    t = build_shell_task('foo', 'echo',
+                         workerGroup='custom-group',
+                         timeout=600,
+                         failRetryTimes=3)
+    assert t['workerGroup'] == 'custom-group'
+    assert t['timeout'] == 600
+    assert t['failRetryTimes'] == 3
+
+
+def test_shell_task_overrides_does_not_mutate_defaults():
+    """重复调用不应共享 dict 引用污染 SHELL_TASK_DEFAULTS。"""
+    t1 = build_shell_task('a', 'echo a', workerGroup='g1')
+    t2 = build_shell_task('b', 'echo b')
+    assert t1['workerGroup'] == 'g1'
+    assert t2['workerGroup'] == 'd1~d3'
+    assert SHELL_TASK_DEFAULTS['workerGroup'] == 'd1~d3'
+
+
+# --- build_linear_relations ---
+
+def test_relations_empty():
+    assert build_linear_relations([]) == []
+
+
+def test_relations_single_task():
+    """单 task = 1 条入口 relation(preTaskCode=0)。"""
+    rels = build_linear_relations([(100, 1)])
+    assert len(rels) == 1
+    assert rels[0]['preTaskCode'] == 0
+    assert rels[0]['preTaskVersion'] == 0
+    assert rels[0]['postTaskCode'] == 100
+    assert rels[0]['postTaskVersion'] == 1
+    assert rels[0]['conditionType'] == 'NONE'
+
+
+def test_relations_chain_two():
+    """双 task = 入口 + 串接 = 2 条 relation。"""
+    rels = build_linear_relations([(100, 1), (200, 1)])
+    assert len(rels) == 2
+    assert rels[0]['postTaskCode'] == 100
+    assert rels[1]['preTaskCode'] == 100
+    assert rels[1]['preTaskVersion'] == 1
+    assert rels[1]['postTaskCode'] == 200
+    assert rels[1]['postTaskVersion'] == 1
+
+
+def test_relations_chain_three_with_real_versions():
+    """三 task 链 + 不同 version 透传。"""
+    rels = build_linear_relations([(100, 3), (200, 2), (300, 1)])
+    assert [r['preTaskCode'] for r in rels] == [0, 100, 200]
+    assert [r['postTaskCode'] for r in rels] == [100, 200, 300]
+    assert [r['preTaskVersion'] for r in rels] == [0, 3, 2]
+    assert [r['postTaskVersion'] for r in rels] == [3, 2, 1]
+
+
+# --- build_workflow_body ---
+
+def test_workflow_body_basic():
+    tasks = [{'code': 100, 'name': 'first'}]
+    rels = [{'postTaskCode': 100}]
+    body = build_workflow_body('wf1', 'desc', tasks, rels)
+    assert body['name'] == 'wf1'
+    assert body['description'] == 'desc'
+    assert body['globalParams'] == '[]'
+    assert body['locations'] == '[]'
+    assert body['timeout'] == 0
+    assert body['executionType'] == 'SERIAL_WAIT'
+    # tasks / relations 序列化为 JSON 字符串
+    assert json.loads(body['taskDefinitionJson']) == tasks
+    assert json.loads(body['taskRelationJson']) == rels
+
+
+def test_workflow_body_with_locations():
+    """locations 非空 list 序列化进 body。"""
+    locs = [{'taskCode': 100, 'x': 380, 'y': 205},
+            {'taskCode': 200, 'x': 680, 'y': 205}]
+    body = build_workflow_body('wf', '', [], [], locations=locs)
+    assert json.loads(body['locations']) == locs
+
+
+def test_workflow_body_none_description():
+    """description=None 兜底为空串(DS 不接受 null)。"""
+    body = build_workflow_body('wf', None, [], [])
+    assert body['description'] == ''
+
+
+def test_workflow_body_overrides():
+    body = build_workflow_body('wf', 'd', [], [],
+                               execution_type='PARALLEL',
+                               timeout=3600,
+                               global_params='[{"prop":"x"}]')
+    assert body['executionType'] == 'PARALLEL'
+    assert body['timeout'] == 3600
+    assert body['globalParams'] == '[{"prop":"x"}]'
+
+
+def test_workflow_body_chinese_no_ascii_escape():
+    """中文 task 不应被 ensure_ascii 编码成 \\uXXXX。"""
+    tasks = [{'name': '订单同步', 'rawScript': 'echo 你好'}]
+    body = build_workflow_body('wf', '描述', tasks, [])
+    assert '订单同步' in body['taskDefinitionJson']
+    assert '你好' in body['taskDefinitionJson']
+
+
+# --- build_schedule_body ---
+
+def test_schedule_body_minimal():
+    body = build_schedule_body(123456, '0 0 6 * * ? *')
+    assert body['workflowDefinitionCode'] == 123456
+    inner = json.loads(body['schedule'])
+    assert inner['crontab'] == '0 0 6 * * ? *'
+    assert inner['startTime'] == '2026-01-01 00:00:00'
+    assert inner['endTime'] == '2126-01-01 00:00:00'
+    assert inner['timezoneId'] == 'Asia/Shanghai'
+    # 项目惯例默认值
+    assert body['workerGroup'] == 'd1~d3'
+    assert body['tenantCode'] == 'bigdata'
+    assert body['failureStrategy'] == 'END'
+    assert body['warningType'] == 'NONE'
+    assert body['warningGroupId'] == 0
+    assert body['workflowInstancePriority'] == 'MEDIUM'
+    assert body['environmentCode'] == -1
+
+
+def test_schedule_body_overrides():
+    body = build_schedule_body(
+        123, '0 0 8 * * ? *',
+        start_time='2026-05-01 00:00:00',
+        end_time='2027-01-01 00:00:00',
+        timezone_id='UTC',
+        worker_group='wg2',
+        tenant='analytics',
+        failure_strategy='CONTINUE',
+        warning_type='EMAIL',
+        warning_group_id=5,
+        priority='HIGH',
+        environment_code=10,
+    )
+    inner = json.loads(body['schedule'])
+    assert inner['startTime'] == '2026-05-01 00:00:00'
+    assert inner['endTime'] == '2027-01-01 00:00:00'
+    assert inner['timezoneId'] == 'UTC'
+    assert body['workerGroup'] == 'wg2'
+    assert body['tenantCode'] == 'analytics'
+    assert body['failureStrategy'] == 'CONTINUE'
+    assert body['warningType'] == 'EMAIL'
+    assert body['warningGroupId'] == 5
+    assert body['workflowInstancePriority'] == 'HIGH'
+    assert body['environmentCode'] == 10