# -*- coding:utf-8 -*- """ DolphinScheduler 工作流 / schedule 查询 + 状态切换领域 helper。 只抽 workspace 脚本反复重写 ≥ 2 次的操作;不预封所有 endpoint (DS 升级 endpoint 全要跟,一次性脚本生命周期短,直接拼 path 干净)。 所有 helper 都用 DSClient(expect_business_ok=True),业务码 != 0 直接抛 RuntimeError 含完整 response。 """ from typing import Any, Dict, List, Optional from dw_base.ds.api import DSClient def find_workflow_by_name(client, project_code, name, page_size=200): # type: (DSClient, int, str, int) -> Optional[Dict[str, Any]] """按名找工作流,找不到返 None。 搜索仅在前 page_size 条内(默认 200,覆盖中等规模项目)。 """ resp = client.get( '/projects/{}/workflow-definition'.format(project_code), params={'pageNo': 1, 'pageSize': page_size, 'searchVal': ''}, expect_business_ok=True, ) for w in resp['data']['totalList']: if w['name'] == name: return w return None def list_workflow_names(client, project_code, page_size=200): # type: (DSClient, int, int) -> List[str] """列项目下所有工作流名(idempotent 跳过用)。""" resp = client.get( '/projects/{}/workflow-definition'.format(project_code), params={'pageNo': 1, 'pageSize': page_size, 'searchVal': ''}, expect_business_ok=True, ) return [w['name'] for w in resp['data']['totalList']] def get_workflow_full(client, project_code, wf_code): # type: (DSClient, int, int) -> Dict[str, Any] """拿工作流完整定义。 返 {workflowDefinition, taskDefinitionList, workflowTaskRelationList}。 PUT 改造前必跑:从 taskDefinitionList[0]['version'] 拿真实 version (不能硬编 0)。 """ resp = client.get( '/projects/{}/workflow-definition/{}'.format(project_code, wf_code), expect_business_ok=True, ) return resp['data'] def gen_task_codes(client, project_code, n=1): # type: (DSClient, int, int) -> List[int] """前置 GET,拿 n 个真实 long taskCode。 建工作流前必跑:传 0 报 code 50036 check workflow task relation error。 """ resp = client.get( '/projects/{}/task-definition/gen-task-codes'.format(project_code), params={'genNum': n}, expect_business_ok=True, ) return resp['data'] def find_schedule_by_workflow(client, project_code, wf_code): # type: (DSClient, int, int) -> Optional[Dict[str, Any]] """按 workflow code 找 schedule,找不到返 None。 返字典含 id / releaseState / workflowDefinitionCode 等字段。 """ resp = client.get( '/projects/{}/schedules'.format(project_code), params={'pageNo': 1, 'pageSize': 100, 'searchVal': '', 'workflowDefinitionCode': wf_code}, expect_business_ok=True, ) for s in resp['data']['totalList']: if s.get('workflowDefinitionCode') == wf_code: return s return None def release_workflow(client, project_code, wf_code, state): # type: (DSClient, int, int, str) -> Dict[str, Any] """切 workflow 上下线状态。state in {'ONLINE', 'OFFLINE'}。""" if state not in ('ONLINE', 'OFFLINE'): raise ValueError("state 必须是 'ONLINE' 或 'OFFLINE':" + repr(state)) return client.post( '/projects/{}/workflow-definition/{}/release'.format(project_code, wf_code), form_data={'releaseState': state}, expect_business_ok=True, ) def release_schedule(client, project_code, sched_id, state): # type: (DSClient, int, int, str) -> Dict[str, Any] """切 schedule 上下线状态。state in {'ONLINE', 'OFFLINE'}。 DS 把 ONLINE / OFFLINE 拆成两条 endpoint:/schedules/{id}/online 与 /schedules/{id}/offline,body 空 form_data。 """ if state not in ('ONLINE', 'OFFLINE'): raise ValueError("state 必须是 'ONLINE' 或 'OFFLINE':" + repr(state)) suffix = 'online' if state == 'ONLINE' else 'offline' return client.post( '/projects/{}/schedules/{}/{}'.format(project_code, sched_id, suffix), form_data={}, expect_business_ok=True, )