# -*- coding:utf-8 -*- """ DolphinScheduler form-encoded body 拼装模板。 抽出 workspace 脚本反复重写的 SHELL task / relations / workflow body / schedule body 样板。项目惯例默认值集中在常量字典,特殊场景走 overrides。 不预封 endpoint 调用——纯 dict 工厂。caller 拼好 body 后用 DSClient.post / put 发出,过 expect_business_ok 收口业务级失败。 """ import json from typing import Any, Dict, List, Optional, Sequence, Tuple # 项目惯例 SHELL task 默认值。来源:jobs/raw/* 跑得通的 task 字典共同字段。 SHELL_TASK_DEFAULTS = { 'description': '', 'taskType': 'SHELL', 'flag': 'YES', 'taskPriority': 'MEDIUM', 'workerGroup': 'd1~d3', 'failRetryTimes': 0, 'failRetryInterval': 1, 'timeoutFlag': 'CLOSE', 'timeoutNotifyStrategy': 'WARN', 'timeout': 0, 'delayTime': 0, 'environmentCode': -1, 'taskGroupId': 0, 'taskGroupPriority': 0, 'cpuQuota': -1, 'memoryMax': -1, 'taskExecuteType': 'BATCH', } def build_shell_task(name, raw_script, file_param=None, code=None, version=None, **overrides): # type: (str, str, Optional[str], Optional[int], Optional[int], Any) -> Dict[str, Any] """构造单个 SHELL task 字典。 - file_param 非空时注入 localParams 一项 {prop:'file', direct:'IN', type:'VARCHAR', value:file_param} (兼容现 inc_d / ods / init 工作流的 ${file} 注入约定) - code 用于 task identity(建工作流前先 GET gen-task-codes 拿真实 long); 不传则不写入字典(POST 创建场景由 caller 在 main 里赋) - version 用于 PUT 场景 task identity;新建 task 不传,DS 端自动赋 v1 - overrides 覆盖默认值(如 workerGroup / timeout / failRetryTimes) """ task = dict(SHELL_TASK_DEFAULTS) task['name'] = name local_params = [] if file_param is not None: local_params.append({'prop': 'file', 'direct': 'IN', 'type': 'VARCHAR', 'value': file_param}) task['taskParams'] = { 'localParams': local_params, 'rawScript': raw_script, 'resourceList': [], } if code is not None: task['code'] = code if version is not None: task['version'] = version task.update(overrides) return task def build_linear_relations(codes_with_versions): # type: (Sequence[Tuple[int, int]]) -> List[Dict[str, Any]] """串行 task chain 自动生成 relations。 输入 [(code1, v1), (code2, v2), (code3, v3)] → [ {pre 0 → post code1 v1}, # 起点(preTaskCode=0 = 入口) {pre code1 v1 → post code2 v2}, {pre code2 v2 → post code3 v3}, ] 新建 task 的 version 通常用 1(DS 端首版即 v1); 保留 task 的 version 必须从 GET workflow-definition/{code} 拿真实值, 不能硬编 0(实证:硬编 0 PUT 会丢任务,详见 dw_base/ds/api.py docstring)。 """ if not codes_with_versions: return [] relations = [{ 'name': '', 'preTaskCode': 0, 'preTaskVersion': 0, 'postTaskCode': codes_with_versions[0][0], 'postTaskVersion': codes_with_versions[0][1], 'conditionType': 'NONE', 'conditionParams': {}, }] for i in range(1, len(codes_with_versions)): prev_c, prev_v = codes_with_versions[i - 1] cur_c, cur_v = codes_with_versions[i] relations.append({ 'name': '', 'preTaskCode': prev_c, 'preTaskVersion': prev_v, 'postTaskCode': cur_c, 'postTaskVersion': cur_v, 'conditionType': 'NONE', 'conditionParams': {}, }) return relations def build_workflow_body(name, description, tasks, relations, locations=None, execution_type='SERIAL_WAIT', timeout=0, global_params='[]'): # type: (str, str, List[Dict[str, Any]], List[Dict[str, Any]], Optional[List[Dict[str, Any]]], str, int, str) -> Dict[str, Any] """拼装 POST/PUT /workflow-definition[/{code}] 的 form-encoded body。 tasks / relations / locations 自动 JSON 序列化为字符串(DS API 要求 string 字段)。 locations: - None / [] → 序列化为 '[]',POST 创建场景默认值(DS UI 自动布局) - PUT 改造场景必须传非空 list[{taskCode, x, y}],否则 UI 不渲染节点 (详见 dw_base/ds/api.py docstring 与 workspace/20260507/extend_raw_to_ods_workflows.py) """ return { 'name': name, 'description': description or '', 'globalParams': global_params, 'locations': json.dumps(locations or [], ensure_ascii=False), 'timeout': timeout, 'executionType': execution_type, 'taskDefinitionJson': json.dumps(tasks, ensure_ascii=False), 'taskRelationJson': json.dumps(relations, ensure_ascii=False), } def build_schedule_body(workflow_code, crontab, start_time='2026-01-01 00:00:00', end_time='2126-01-01 00:00:00', timezone_id='Asia/Shanghai', worker_group='d1~d3', tenant='bigdata', failure_strategy='END', warning_type='NONE', warning_group_id=0, priority='MEDIUM', environment_code=-1): # type: (int, str, str, str, str, str, str, str, str, int, str, int) -> Dict[str, Any] """拼装 POST /schedules 的 form-encoded body。 schedule 字段是嵌套 JSON 字符串:含 crontab + 起止时间 + 时区。 默认起止 2026-01-01 ~ 2126-01-01(百年窗口);时区 Asia/Shanghai; worker / tenant 用项目惯例 d1~d3 / bigdata。 """ schedule_inner = { 'startTime': start_time, 'endTime': end_time, 'timezoneId': timezone_id, 'crontab': crontab, } return { 'workflowDefinitionCode': workflow_code, 'schedule': json.dumps(schedule_inner, ensure_ascii=False), 'failureStrategy': failure_strategy, 'warningType': warning_type, 'warningGroupId': warning_group_id, 'workflowInstancePriority': priority, 'workerGroup': worker_group, 'tenantCode': tenant, 'environmentCode': environment_code, }