| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- # -*- coding:utf-8 -*-
- """
- DolphinScheduler 工作流 / schedule 查询 + 状态切换领域 helper。
- 只抽 workspace 脚本反复重写 ≥ 2 次的操作;不预封所有 endpoint
- (DS 升级 endpoint 全要跟,一次性脚本生命周期短,直接拼 path 干净)。
- 所有 helper 都用 DSClient(expect_business_ok=True),业务码 != 0
- 直接抛 RuntimeError 含完整 response。
- """
- from typing import Any, Dict, List, Optional
- from dw_base.ds.api import DSClient
- def find_workflow_by_name(client, project_code, name, page_size=200):
- # type: (DSClient, int, str, int) -> Optional[Dict[str, Any]]
- """按名找工作流,找不到返 None。
- 搜索仅在前 page_size 条内(默认 200,覆盖中等规模项目)。
- """
- resp = client.get(
- '/projects/{}/workflow-definition'.format(project_code),
- params={'pageNo': 1, 'pageSize': page_size, 'searchVal': ''},
- expect_business_ok=True,
- )
- for w in resp['data']['totalList']:
- if w['name'] == name:
- return w
- return None
- def list_workflow_names(client, project_code, page_size=200):
- # type: (DSClient, int, int) -> List[str]
- """列项目下所有工作流名(idempotent 跳过用)。"""
- resp = client.get(
- '/projects/{}/workflow-definition'.format(project_code),
- params={'pageNo': 1, 'pageSize': page_size, 'searchVal': ''},
- expect_business_ok=True,
- )
- return [w['name'] for w in resp['data']['totalList']]
- def get_workflow_full(client, project_code, wf_code):
- # type: (DSClient, int, int) -> Dict[str, Any]
- """拿工作流完整定义。
- 返 {workflowDefinition, taskDefinitionList, workflowTaskRelationList}。
- PUT 改造前必跑:从 taskDefinitionList[0]['version'] 拿真实 version
- (不能硬编 0)。
- """
- resp = client.get(
- '/projects/{}/workflow-definition/{}'.format(project_code, wf_code),
- expect_business_ok=True,
- )
- return resp['data']
- def gen_task_codes(client, project_code, n=1):
- # type: (DSClient, int, int) -> List[int]
- """前置 GET,拿 n 个真实 long taskCode。
- 建工作流前必跑:传 0 报 code 50036 check workflow task relation error。
- """
- resp = client.get(
- '/projects/{}/task-definition/gen-task-codes'.format(project_code),
- params={'genNum': n},
- expect_business_ok=True,
- )
- return resp['data']
- def find_schedule_by_workflow(client, project_code, wf_code):
- # type: (DSClient, int, int) -> Optional[Dict[str, Any]]
- """按 workflow code 找 schedule,找不到返 None。
- 返字典含 id / releaseState / workflowDefinitionCode 等字段。
- """
- resp = client.get(
- '/projects/{}/schedules'.format(project_code),
- params={'pageNo': 1, 'pageSize': 100, 'searchVal': '',
- 'workflowDefinitionCode': wf_code},
- expect_business_ok=True,
- )
- for s in resp['data']['totalList']:
- if s.get('workflowDefinitionCode') == wf_code:
- return s
- return None
- def release_workflow(client, project_code, wf_code, state):
- # type: (DSClient, int, int, str) -> Dict[str, Any]
- """切 workflow 上下线状态。state in {'ONLINE', 'OFFLINE'}。"""
- if state not in ('ONLINE', 'OFFLINE'):
- raise ValueError("state 必须是 'ONLINE' 或 'OFFLINE':" + repr(state))
- return client.post(
- '/projects/{}/workflow-definition/{}/release'.format(project_code, wf_code),
- form_data={'releaseState': state},
- expect_business_ok=True,
- )
- def release_schedule(client, project_code, sched_id, state):
- # type: (DSClient, int, int, str) -> Dict[str, Any]
- """切 schedule 上下线状态。state in {'ONLINE', 'OFFLINE'}。
- DS 把 ONLINE / OFFLINE 拆成两条 endpoint:/schedules/{id}/online
- 与 /schedules/{id}/offline,body 空 form_data。
- """
- if state not in ('ONLINE', 'OFFLINE'):
- raise ValueError("state 必须是 'ONLINE' 或 'OFFLINE':" + repr(state))
- suffix = 'online' if state == 'ONLINE' else 'offline'
- return client.post(
- '/projects/{}/schedules/{}/{}'.format(project_code, sched_id, suffix),
- form_data={},
- expect_business_ok=True,
- )
|