# -*- coding:utf-8 -*- """ DolphinScheduler API 客户端最小封装。 职责: - 读 conf/ds.ini 拿 base_url + token - requests.Session 注入 token header - 通用 get / post / put:2xx 返 JSON;非 2xx 抛 RuntimeError; expect_business_ok=True 时还会检查 DS 业务级 code == 0 - 不预封任何具体 endpoint,调用方按需拼 path DS 业务级响应约定: - 2xx HTTP + body 形如 {"code": 0, "data": {...}, "msg": "success"} - code != 0 表示业务级失败(参数错 / 权限 / 资源冲突);HTTP 仍 2xx - expect_business_ok=True 把这两层失败收口到一个 RuntimeError 里 含 method + path + 完整 response,caller 不再需要自己写 post_check helper DS 3.4 PUT /workflow-definition/{code} 必传字段(实证踩坑): - taskDefinitionJson 中每个 task 字典必须含 version 字段——DS 端用 (code, version) 识别 task identity;缺则被当新建 task 处理,原 task 被丢弃 - taskRelationJson 中 preTaskVersion / postTaskVersion 必须用任务真实 version,不能硬编 0 - locations 字段必须传 [{taskCode, x, y}, ...] 含全部 task 坐标,否则 UI 不渲染节点 (taskDefinitionList 数据上有,但 DAG 看不到) 工作流 body 拼装:dw_base/ds/builders.py 工作流查询 / 状态切换:dw_base/ds/workflow.py 完整使用样板:workspace/20260507/extend_raw_to_ods_workflows.py """ import os from configparser import ConfigParser from typing import Any, Dict, Optional import requests _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) _DEFAULT_CONF_PATH = os.path.join(_PROJECT_ROOT, 'conf', 'ds.ini') class DSClient: def __init__(self, conf_path=None): # type: (Optional[str]) -> None path = conf_path or _DEFAULT_CONF_PATH if not os.path.isfile(path): raise RuntimeError('DS 配置文件不存在:' + path) cp = ConfigParser() cp.read(path, encoding='utf-8') self.base_url = cp.get('dolphinscheduler', 'base_url').rstrip('/') self.token = cp.get('dolphinscheduler', 'token') self.session = requests.Session() self.session.headers['token'] = self.token def _url(self, path): # type: (str) -> str return self.base_url + '/' + path.lstrip('/') def _request(self, method, path, params=None, json_body=None, form_data=None, expect_business_ok=False): # type: (str, str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any """get / post / put 共用。3 层错误统一收口: 1. 非 2xx HTTP → RuntimeError 含 method/path/rc/text 2. 2xx + 非 JSON(path 错被 SPA fallback 返 HTML)→ RuntimeError 含 raw text 前 200 字符 3. expect_business_ok=True 且 dict 响应 code != 0 → RuntimeError 含完整 response """ if json_body is not None and form_data is not None: raise ValueError('json_body 与 form_data 互斥') kwargs = {} if params is not None: kwargs['params'] = params if form_data is not None: kwargs['data'] = form_data elif json_body is not None: kwargs['json'] = json_body resp = self.session.request(method, self._url(path), **kwargs) if resp.status_code // 100 != 2: raise RuntimeError('{} {} failed rc={}: {}'.format( method, path, resp.status_code, resp.text)) try: data = resp.json() except ValueError: raise RuntimeError('{} {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format( method, path, resp.text[:200])) if expect_business_ok and isinstance(data, dict) and data.get('code') != 0: raise RuntimeError('{} {} 业务码 != 0:{}'.format(method, path, data)) return data def get(self, path, params=None, expect_business_ok=False): # type: (str, Optional[Dict[str, Any]], bool) -> Any return self._request('GET', path, params=params, expect_business_ok=expect_business_ok) def post(self, path, json_body=None, form_data=None, expect_business_ok=False): # type: (str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any return self._request('POST', path, json_body=json_body, form_data=form_data, expect_business_ok=expect_business_ok) def put(self, path, json_body=None, form_data=None, expect_business_ok=False): # type: (str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any return self._request('PUT', path, json_body=json_body, form_data=form_data, expect_business_ok=expect_business_ok)