workflow.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. # -*- coding:utf-8 -*-
  2. """
  3. DolphinScheduler 工作流 / schedule 查询 + 状态切换领域 helper。
  4. 只抽 workspace 脚本反复重写 ≥ 2 次的操作;不预封所有 endpoint
  5. (DS 升级 endpoint 全要跟,一次性脚本生命周期短,直接拼 path 干净)。
  6. 所有 helper 都用 DSClient(expect_business_ok=True),业务码 != 0
  7. 直接抛 RuntimeError 含完整 response。
  8. """
  9. from typing import Any, Dict, List, Optional
  10. from dw_base.ds.api import DSClient
  11. def find_workflow_by_name(client, project_code, name, page_size=200):
  12. # type: (DSClient, int, str, int) -> Optional[Dict[str, Any]]
  13. """按名找工作流,找不到返 None。
  14. 搜索仅在前 page_size 条内(默认 200,覆盖中等规模项目)。
  15. """
  16. resp = client.get(
  17. '/projects/{}/workflow-definition'.format(project_code),
  18. params={'pageNo': 1, 'pageSize': page_size, 'searchVal': ''},
  19. expect_business_ok=True,
  20. )
  21. for w in resp['data']['totalList']:
  22. if w['name'] == name:
  23. return w
  24. return None
  25. def list_workflow_names(client, project_code, page_size=200):
  26. # type: (DSClient, int, int) -> List[str]
  27. """列项目下所有工作流名(idempotent 跳过用)。"""
  28. resp = client.get(
  29. '/projects/{}/workflow-definition'.format(project_code),
  30. params={'pageNo': 1, 'pageSize': page_size, 'searchVal': ''},
  31. expect_business_ok=True,
  32. )
  33. return [w['name'] for w in resp['data']['totalList']]
  34. def get_workflow_full(client, project_code, wf_code):
  35. # type: (DSClient, int, int) -> Dict[str, Any]
  36. """拿工作流完整定义。
  37. 返 {workflowDefinition, taskDefinitionList, workflowTaskRelationList}。
  38. PUT 改造前必跑:从 taskDefinitionList[0]['version'] 拿真实 version
  39. (不能硬编 0)。
  40. """
  41. resp = client.get(
  42. '/projects/{}/workflow-definition/{}'.format(project_code, wf_code),
  43. expect_business_ok=True,
  44. )
  45. return resp['data']
  46. def gen_task_codes(client, project_code, n=1):
  47. # type: (DSClient, int, int) -> List[int]
  48. """前置 GET,拿 n 个真实 long taskCode。
  49. 建工作流前必跑:传 0 报 code 50036 check workflow task relation error。
  50. """
  51. resp = client.get(
  52. '/projects/{}/task-definition/gen-task-codes'.format(project_code),
  53. params={'genNum': n},
  54. expect_business_ok=True,
  55. )
  56. return resp['data']
  57. def find_schedule_by_workflow(client, project_code, wf_code):
  58. # type: (DSClient, int, int) -> Optional[Dict[str, Any]]
  59. """按 workflow code 找 schedule,找不到返 None。
  60. 返字典含 id / releaseState / workflowDefinitionCode 等字段。
  61. """
  62. resp = client.get(
  63. '/projects/{}/schedules'.format(project_code),
  64. params={'pageNo': 1, 'pageSize': 100, 'searchVal': '',
  65. 'workflowDefinitionCode': wf_code},
  66. expect_business_ok=True,
  67. )
  68. for s in resp['data']['totalList']:
  69. if s.get('workflowDefinitionCode') == wf_code:
  70. return s
  71. return None
  72. def release_workflow(client, project_code, wf_code, state):
  73. # type: (DSClient, int, int, str) -> Dict[str, Any]
  74. """切 workflow 上下线状态。state in {'ONLINE', 'OFFLINE'}。"""
  75. if state not in ('ONLINE', 'OFFLINE'):
  76. raise ValueError("state 必须是 'ONLINE' 或 'OFFLINE':" + repr(state))
  77. return client.post(
  78. '/projects/{}/workflow-definition/{}/release'.format(project_code, wf_code),
  79. form_data={'releaseState': state},
  80. expect_business_ok=True,
  81. )
  82. def release_schedule(client, project_code, sched_id, state):
  83. # type: (DSClient, int, int, str) -> Dict[str, Any]
  84. """切 schedule 上下线状态。state in {'ONLINE', 'OFFLINE'}。
  85. DS 把 ONLINE / OFFLINE 拆成两条 endpoint:/schedules/{id}/online
  86. 与 /schedules/{id}/offline,body 空 form_data。
  87. """
  88. if state not in ('ONLINE', 'OFFLINE'):
  89. raise ValueError("state 必须是 'ONLINE' 或 'OFFLINE':" + repr(state))
  90. suffix = 'online' if state == 'ONLINE' else 'offline'
  91. return client.post(
  92. '/projects/{}/schedules/{}/{}'.format(project_code, sched_id, suffix),
  93. form_data={},
  94. expect_business_ok=True,
  95. )