| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- # -*- coding:utf-8 -*-
- """
- DolphinScheduler API 客户端最小封装。
- 职责:
- - 读 conf/ds.ini 拿 base_url + token
- - requests.Session 注入 token header
- - 通用 get / post,2xx 返回 json,非 2xx 抛 RuntimeError
- - 不预封任何具体 endpoint,调用方按需拼 path
- DS 3.4 PUT /workflow-definition/{code} 必传字段(实证踩坑):
- - taskDefinitionJson 中每个 task 字典必须含 version 字段——DS 端用 (code, version) 识别
- task identity;缺则被当新建 task 处理,原 task 被丢弃
- - taskRelationJson 中 preTaskVersion / postTaskVersion 必须用任务真实 version,不能硬编 0
- - locations 字段必须传 [{taskCode, x, y}, ...] 含全部 task 坐标,否则 UI 不渲染节点
- (taskDefinitionList 数据上有,但 DAG 看不到)
- 实现参考:workspace/20260507/extend_raw_to_ods_workflows.py(含 verify_checklist 校验)
- """
- import os
- from configparser import ConfigParser
- from typing import Any, Dict, Optional
- import requests
- _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- _DEFAULT_CONF_PATH = os.path.join(_PROJECT_ROOT, 'conf', 'ds.ini')
- class DSClient:
- def __init__(self, conf_path: Optional[str] = None):
- path = conf_path or _DEFAULT_CONF_PATH
- if not os.path.isfile(path):
- raise RuntimeError('DS 配置文件不存在:' + path)
- cp = ConfigParser()
- cp.read(path, encoding='utf-8')
- self.base_url = cp.get('dolphinscheduler', 'base_url').rstrip('/')
- self.token = cp.get('dolphinscheduler', 'token')
- self.session = requests.Session()
- self.session.headers['token'] = self.token
- def _url(self, path: str) -> str:
- return self.base_url + '/' + path.lstrip('/')
- def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
- resp = self.session.get(self._url(path), params=params)
- if resp.status_code // 100 != 2:
- raise RuntimeError('GET {} failed rc={}: {}'.format(
- path, resp.status_code, resp.text))
- try:
- return resp.json()
- except ValueError:
- raise RuntimeError('GET {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format(
- path, resp.text[:200]))
- def post(self, path: str, json_body: Optional[Dict[str, Any]] = None,
- form_data: Optional[Dict[str, Any]] = None) -> Any:
- if json_body is not None and form_data is not None:
- raise ValueError('json_body 与 form_data 互斥')
- if form_data is not None:
- resp = self.session.post(self._url(path), data=form_data)
- else:
- resp = self.session.post(self._url(path), json=json_body)
- if resp.status_code // 100 != 2:
- raise RuntimeError('POST {} failed rc={}: {}'.format(
- path, resp.status_code, resp.text))
- try:
- return resp.json()
- except ValueError:
- raise RuntimeError('POST {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format(
- path, resp.text[:200]))
|