| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- # -*- coding:utf-8 -*-
- """
- DolphinScheduler form-encoded body 拼装模板。
- 抽出 workspace 脚本反复重写的 SHELL task / relations / workflow body /
- schedule body 样板。项目惯例默认值集中在常量字典,特殊场景走 overrides。
- 不预封 endpoint 调用——纯 dict 工厂。caller 拼好 body 后用
- DSClient.post / put 发出,过 expect_business_ok 收口业务级失败。
- """
- import json
- from typing import Any, Dict, List, Optional, Sequence, Tuple
- # 项目惯例 SHELL task 默认值。来源:jobs/raw/* 跑得通的 task 字典共同字段。
- SHELL_TASK_DEFAULTS = {
- 'description': '',
- 'taskType': 'SHELL',
- 'flag': 'YES',
- 'taskPriority': 'MEDIUM',
- 'workerGroup': 'd1~d3',
- 'failRetryTimes': 0,
- 'failRetryInterval': 1,
- 'timeoutFlag': 'CLOSE',
- 'timeoutNotifyStrategy': 'WARN',
- 'timeout': 0,
- 'delayTime': 0,
- 'environmentCode': -1,
- 'taskGroupId': 0,
- 'taskGroupPriority': 0,
- 'cpuQuota': -1,
- 'memoryMax': -1,
- 'taskExecuteType': 'BATCH',
- }
- def build_shell_task(name, raw_script,
- file_param=None, code=None, version=None,
- **overrides):
- # type: (str, str, Optional[str], Optional[int], Optional[int], Any) -> Dict[str, Any]
- """构造单个 SHELL task 字典。
- - file_param 非空时注入 localParams 一项
- {prop:'file', direct:'IN', type:'VARCHAR', value:file_param}
- (兼容现 inc_d / ods / init 工作流的 ${file} 注入约定)
- - code 用于 task identity(建工作流前先 GET gen-task-codes 拿真实 long);
- 不传则不写入字典(POST 创建场景由 caller 在 main 里赋)
- - version 用于 PUT 场景 task identity;新建 task 不传,DS 端自动赋 v1
- - overrides 覆盖默认值(如 workerGroup / timeout / failRetryTimes)
- """
- task = dict(SHELL_TASK_DEFAULTS)
- task['name'] = name
- local_params = []
- if file_param is not None:
- local_params.append({'prop': 'file', 'direct': 'IN',
- 'type': 'VARCHAR', 'value': file_param})
- task['taskParams'] = {
- 'localParams': local_params,
- 'rawScript': raw_script,
- 'resourceList': [],
- }
- if code is not None:
- task['code'] = code
- if version is not None:
- task['version'] = version
- task.update(overrides)
- return task
- def build_linear_relations(codes_with_versions):
- # type: (Sequence[Tuple[int, int]]) -> List[Dict[str, Any]]
- """串行 task chain 自动生成 relations。
- 输入 [(code1, v1), (code2, v2), (code3, v3)] →
- [
- {pre 0 → post code1 v1}, # 起点(preTaskCode=0 = 入口)
- {pre code1 v1 → post code2 v2},
- {pre code2 v2 → post code3 v3},
- ]
- 新建 task 的 version 通常用 1(DS 端首版即 v1);
- 保留 task 的 version 必须从 GET workflow-definition/{code} 拿真实值,
- 不能硬编 0(实证:硬编 0 PUT 会丢任务,详见 dw_base/ds/api.py docstring)。
- """
- if not codes_with_versions:
- return []
- relations = [{
- 'name': '', 'preTaskCode': 0, 'preTaskVersion': 0,
- 'postTaskCode': codes_with_versions[0][0],
- 'postTaskVersion': codes_with_versions[0][1],
- 'conditionType': 'NONE', 'conditionParams': {},
- }]
- for i in range(1, len(codes_with_versions)):
- prev_c, prev_v = codes_with_versions[i - 1]
- cur_c, cur_v = codes_with_versions[i]
- relations.append({
- 'name': '', 'preTaskCode': prev_c, 'preTaskVersion': prev_v,
- 'postTaskCode': cur_c, 'postTaskVersion': cur_v,
- 'conditionType': 'NONE', 'conditionParams': {},
- })
- return relations
- def build_workflow_body(name, description, tasks, relations,
- locations=None,
- execution_type='SERIAL_WAIT',
- timeout=0,
- global_params='[]'):
- # type: (str, str, List[Dict[str, Any]], List[Dict[str, Any]], Optional[List[Dict[str, Any]]], str, int, str) -> Dict[str, Any]
- """拼装 POST/PUT /workflow-definition[/{code}] 的 form-encoded body。
- tasks / relations / locations 自动 JSON 序列化为字符串(DS API 要求 string 字段)。
- locations:
- - None / [] → 序列化为 '[]',POST 创建场景默认值(DS UI 自动布局)
- - PUT 改造场景必须传非空 list[{taskCode, x, y}],否则 UI 不渲染节点
- (详见 dw_base/ds/api.py docstring 与 workspace/20260507/extend_raw_to_ods_workflows.py)
- """
- return {
- 'name': name,
- 'description': description or '',
- 'globalParams': global_params,
- 'locations': json.dumps(locations or [], ensure_ascii=False),
- 'timeout': timeout,
- 'executionType': execution_type,
- 'taskDefinitionJson': json.dumps(tasks, ensure_ascii=False),
- 'taskRelationJson': json.dumps(relations, ensure_ascii=False),
- }
- def build_schedule_body(workflow_code, crontab,
- start_time='2026-01-01 00:00:00',
- end_time='2126-01-01 00:00:00',
- timezone_id='Asia/Shanghai',
- worker_group='d1~d3',
- tenant='bigdata',
- failure_strategy='END',
- warning_type='NONE',
- warning_group_id=0,
- priority='MEDIUM',
- environment_code=-1):
- # type: (int, str, str, str, str, str, str, str, str, int, str, int) -> Dict[str, Any]
- """拼装 POST /schedules 的 form-encoded body。
- schedule 字段是嵌套 JSON 字符串:含 crontab + 起止时间 + 时区。
- 默认起止 2026-01-01 ~ 2126-01-01(百年窗口);时区 Asia/Shanghai;
- worker / tenant 用项目惯例 d1~d3 / bigdata。
- """
- schedule_inner = {
- 'startTime': start_time,
- 'endTime': end_time,
- 'timezoneId': timezone_id,
- 'crontab': crontab,
- }
- return {
- 'workflowDefinitionCode': workflow_code,
- 'schedule': json.dumps(schedule_inner, ensure_ascii=False),
- 'failureStrategy': failure_strategy,
- 'warningType': warning_type,
- 'warningGroupId': warning_group_id,
- 'workflowInstancePriority': priority,
- 'workerGroup': worker_group,
- 'tenantCode': tenant,
- 'environmentCode': environment_code,
- }
|