| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- # -*- coding:utf-8 -*-
- """
- DolphinScheduler API 客户端最小封装。
- 职责:
- - 读 conf/ds.ini 拿 base_url + token
- - requests.Session 注入 token header
- - 通用 get / post / put:2xx 返 JSON;非 2xx 抛 RuntimeError;
- expect_business_ok=True 时还会检查 DS 业务级 code == 0
- - 不预封任何具体 endpoint,调用方按需拼 path
- DS 业务级响应约定:
- - 2xx HTTP + body 形如 {"code": 0, "data": {...}, "msg": "success"}
- - code != 0 表示业务级失败(参数错 / 权限 / 资源冲突);HTTP 仍 2xx
- - expect_business_ok=True 把这两层失败收口到一个 RuntimeError 里
- 含 method + path + 完整 response,caller 不再需要自己写 post_check helper
- DS 3.4 PUT /workflow-definition/{code} 必传字段(实证踩坑):
- - taskDefinitionJson 中每个 task 字典必须含 version 字段——DS 端用 (code, version) 识别
- task identity;缺则被当新建 task 处理,原 task 被丢弃
- - taskRelationJson 中 preTaskVersion / postTaskVersion 必须用任务真实 version,不能硬编 0
- - locations 字段必须传 [{taskCode, x, y}, ...] 含全部 task 坐标,否则 UI 不渲染节点
- (taskDefinitionList 数据上有,但 DAG 看不到)
- 工作流 body 拼装:dw_base/ds/builders.py
- 工作流查询 / 状态切换:dw_base/ds/workflow.py
- 完整使用样板:workspace/20260507/extend_raw_to_ods_workflows.py
- """
- import os
- from configparser import ConfigParser
- from typing import Any, Dict, Optional
- import requests
- _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- _DEFAULT_CONF_PATH = os.path.join(_PROJECT_ROOT, 'conf', 'ds.ini')
- class DSClient:
- def __init__(self, conf_path=None):
- # type: (Optional[str]) -> None
- path = conf_path or _DEFAULT_CONF_PATH
- if not os.path.isfile(path):
- raise RuntimeError('DS 配置文件不存在:' + path)
- cp = ConfigParser()
- cp.read(path, encoding='utf-8')
- self.base_url = cp.get('dolphinscheduler', 'base_url').rstrip('/')
- self.token = cp.get('dolphinscheduler', 'token')
- self.session = requests.Session()
- self.session.headers['token'] = self.token
- def _url(self, path):
- # type: (str) -> str
- return self.base_url + '/' + path.lstrip('/')
- def _request(self, method, path,
- params=None, json_body=None, form_data=None,
- expect_business_ok=False):
- # type: (str, str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any
- """get / post / put 共用。3 层错误统一收口:
- 1. 非 2xx HTTP → RuntimeError 含 method/path/rc/text
- 2. 2xx + 非 JSON(path 错被 SPA fallback 返 HTML)→ RuntimeError 含 raw text 前 200 字符
- 3. expect_business_ok=True 且 dict 响应 code != 0 → RuntimeError 含完整 response
- """
- if json_body is not None and form_data is not None:
- raise ValueError('json_body 与 form_data 互斥')
- kwargs = {}
- if params is not None:
- kwargs['params'] = params
- if form_data is not None:
- kwargs['data'] = form_data
- elif json_body is not None:
- kwargs['json'] = json_body
- resp = self.session.request(method, self._url(path), **kwargs)
- if resp.status_code // 100 != 2:
- raise RuntimeError('{} {} failed rc={}: {}'.format(
- method, path, resp.status_code, resp.text))
- try:
- data = resp.json()
- except ValueError:
- raise RuntimeError('{} {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format(
- method, path, resp.text[:200]))
- if expect_business_ok and isinstance(data, dict) and data.get('code') != 0:
- raise RuntimeError('{} {} 业务码 != 0:{}'.format(method, path, data))
- return data
- def get(self, path, params=None, expect_business_ok=False):
- # type: (str, Optional[Dict[str, Any]], bool) -> Any
- return self._request('GET', path, params=params,
- expect_business_ok=expect_business_ok)
- def post(self, path, json_body=None, form_data=None, expect_business_ok=False):
- # type: (str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any
- return self._request('POST', path,
- json_body=json_body, form_data=form_data,
- expect_business_ok=expect_business_ok)
- def put(self, path, json_body=None, form_data=None, expect_business_ok=False):
- # type: (str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any
- return self._request('PUT', path,
- json_body=json_body, form_data=form_data,
- expect_business_ok=expect_business_ok)
|