builders.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. # -*- coding:utf-8 -*-
  2. """
  3. DolphinScheduler form-encoded body 拼装模板。
  4. 抽出 workspace 脚本反复重写的 SHELL task / relations / workflow body /
  5. schedule body 样板。项目惯例默认值集中在常量字典,特殊场景走 overrides。
  6. 不预封 endpoint 调用——纯 dict 工厂。caller 拼好 body 后用
  7. DSClient.post / put 发出,过 expect_business_ok 收口业务级失败。
  8. """
  9. import json
  10. from typing import Any, Dict, List, Optional, Sequence, Tuple
  11. # 项目惯例 SHELL task 默认值。来源:jobs/raw/* 跑得通的 task 字典共同字段。
  12. SHELL_TASK_DEFAULTS = {
  13. 'description': '',
  14. 'taskType': 'SHELL',
  15. 'flag': 'YES',
  16. 'taskPriority': 'MEDIUM',
  17. 'workerGroup': 'd1~d3',
  18. 'failRetryTimes': 0,
  19. 'failRetryInterval': 1,
  20. 'timeoutFlag': 'CLOSE',
  21. 'timeoutNotifyStrategy': 'WARN',
  22. 'timeout': 0,
  23. 'delayTime': 0,
  24. 'environmentCode': -1,
  25. 'taskGroupId': 0,
  26. 'taskGroupPriority': 0,
  27. 'cpuQuota': -1,
  28. 'memoryMax': -1,
  29. 'taskExecuteType': 'BATCH',
  30. }
  31. def build_shell_task(name, raw_script,
  32. file_param=None, code=None, version=None,
  33. **overrides):
  34. # type: (str, str, Optional[str], Optional[int], Optional[int], Any) -> Dict[str, Any]
  35. """构造单个 SHELL task 字典。
  36. - file_param 非空时注入 localParams 一项
  37. {prop:'file', direct:'IN', type:'VARCHAR', value:file_param}
  38. (兼容现 inc_d / ods / init 工作流的 ${file} 注入约定)
  39. - code 用于 task identity(建工作流前先 GET gen-task-codes 拿真实 long);
  40. 不传则不写入字典(POST 创建场景由 caller 在 main 里赋)
  41. - version 用于 PUT 场景 task identity;新建 task 不传,DS 端自动赋 v1
  42. - overrides 覆盖默认值(如 workerGroup / timeout / failRetryTimes)
  43. """
  44. task = dict(SHELL_TASK_DEFAULTS)
  45. task['name'] = name
  46. local_params = []
  47. if file_param is not None:
  48. local_params.append({'prop': 'file', 'direct': 'IN',
  49. 'type': 'VARCHAR', 'value': file_param})
  50. task['taskParams'] = {
  51. 'localParams': local_params,
  52. 'rawScript': raw_script,
  53. 'resourceList': [],
  54. }
  55. if code is not None:
  56. task['code'] = code
  57. if version is not None:
  58. task['version'] = version
  59. task.update(overrides)
  60. return task
  61. def build_linear_relations(codes_with_versions):
  62. # type: (Sequence[Tuple[int, int]]) -> List[Dict[str, Any]]
  63. """串行 task chain 自动生成 relations。
  64. 输入 [(code1, v1), (code2, v2), (code3, v3)] →
  65. [
  66. {pre 0 → post code1 v1}, # 起点(preTaskCode=0 = 入口)
  67. {pre code1 v1 → post code2 v2},
  68. {pre code2 v2 → post code3 v3},
  69. ]
  70. 新建 task 的 version 通常用 1(DS 端首版即 v1);
  71. 保留 task 的 version 必须从 GET workflow-definition/{code} 拿真实值,
  72. 不能硬编 0(实证:硬编 0 PUT 会丢任务,详见 dw_base/ds/api.py docstring)。
  73. """
  74. if not codes_with_versions:
  75. return []
  76. relations = [{
  77. 'name': '', 'preTaskCode': 0, 'preTaskVersion': 0,
  78. 'postTaskCode': codes_with_versions[0][0],
  79. 'postTaskVersion': codes_with_versions[0][1],
  80. 'conditionType': 'NONE', 'conditionParams': {},
  81. }]
  82. for i in range(1, len(codes_with_versions)):
  83. prev_c, prev_v = codes_with_versions[i - 1]
  84. cur_c, cur_v = codes_with_versions[i]
  85. relations.append({
  86. 'name': '', 'preTaskCode': prev_c, 'preTaskVersion': prev_v,
  87. 'postTaskCode': cur_c, 'postTaskVersion': cur_v,
  88. 'conditionType': 'NONE', 'conditionParams': {},
  89. })
  90. return relations
  91. def build_workflow_body(name, description, tasks, relations,
  92. locations=None,
  93. execution_type='SERIAL_WAIT',
  94. timeout=0,
  95. global_params='[]'):
  96. # type: (str, str, List[Dict[str, Any]], List[Dict[str, Any]], Optional[List[Dict[str, Any]]], str, int, str) -> Dict[str, Any]
  97. """拼装 POST/PUT /workflow-definition[/{code}] 的 form-encoded body。
  98. tasks / relations / locations 自动 JSON 序列化为字符串(DS API 要求 string 字段)。
  99. locations:
  100. - None / [] → 序列化为 '[]',POST 创建场景默认值(DS UI 自动布局)
  101. - PUT 改造场景必须传非空 list[{taskCode, x, y}],否则 UI 不渲染节点
  102. (详见 dw_base/ds/api.py docstring 与 workspace/20260507/extend_raw_to_ods_workflows.py)
  103. """
  104. return {
  105. 'name': name,
  106. 'description': description or '',
  107. 'globalParams': global_params,
  108. 'locations': json.dumps(locations or [], ensure_ascii=False),
  109. 'timeout': timeout,
  110. 'executionType': execution_type,
  111. 'taskDefinitionJson': json.dumps(tasks, ensure_ascii=False),
  112. 'taskRelationJson': json.dumps(relations, ensure_ascii=False),
  113. }
  114. def build_schedule_body(workflow_code, crontab,
  115. start_time='2026-01-01 00:00:00',
  116. end_time='2126-01-01 00:00:00',
  117. timezone_id='Asia/Shanghai',
  118. worker_group='d1~d3',
  119. tenant='bigdata',
  120. failure_strategy='END',
  121. warning_type='NONE',
  122. warning_group_id=0,
  123. priority='MEDIUM',
  124. environment_code=-1):
  125. # type: (int, str, str, str, str, str, str, str, str, int, str, int) -> Dict[str, Any]
  126. """拼装 POST /schedules 的 form-encoded body。
  127. schedule 字段是嵌套 JSON 字符串:含 crontab + 起止时间 + 时区。
  128. 默认起止 2026-01-01 ~ 2126-01-01(百年窗口);时区 Asia/Shanghai;
  129. worker / tenant 用项目惯例 d1~d3 / bigdata。
  130. """
  131. schedule_inner = {
  132. 'startTime': start_time,
  133. 'endTime': end_time,
  134. 'timezoneId': timezone_id,
  135. 'crontab': crontab,
  136. }
  137. return {
  138. 'workflowDefinitionCode': workflow_code,
  139. 'schedule': json.dumps(schedule_inner, ensure_ascii=False),
  140. 'failureStrategy': failure_strategy,
  141. 'warningType': warning_type,
  142. 'warningGroupId': warning_group_id,
  143. 'workflowInstancePriority': priority,
  144. 'workerGroup': worker_group,
  145. 'tenantCode': tenant,
  146. 'environmentCode': environment_code,
  147. }