# -*- coding:utf-8 -*- """ DolphinScheduler API 客户端最小封装。 职责: - 读 conf/ds.ini 拿 base_url + token - requests.Session 注入 token header - 通用 get / post,2xx 返回 json,非 2xx 抛 RuntimeError - 不预封任何具体 endpoint,调用方按需拼 path """ import os from configparser import ConfigParser from typing import Any, Dict, Optional import requests _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) _DEFAULT_CONF_PATH = os.path.join(_PROJECT_ROOT, 'conf', 'ds.ini') class DSClient: def __init__(self, conf_path: Optional[str] = None): path = conf_path or _DEFAULT_CONF_PATH if not os.path.isfile(path): raise RuntimeError('DS 配置文件不存在:' + path) cp = ConfigParser() cp.read(path, encoding='utf-8') self.base_url = cp.get('dolphinscheduler', 'base_url').rstrip('/') self.token = cp.get('dolphinscheduler', 'token') self.session = requests.Session() self.session.headers['token'] = self.token def _url(self, path: str) -> str: return self.base_url + '/' + path.lstrip('/') def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any: resp = self.session.get(self._url(path), params=params) if resp.status_code // 100 != 2: raise RuntimeError('GET {} failed rc={}: {}'.format( path, resp.status_code, resp.text)) try: return resp.json() except ValueError: raise RuntimeError('GET {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format( path, resp.text[:200])) def post(self, path: str, json_body: Optional[Dict[str, Any]] = None, form_data: Optional[Dict[str, Any]] = None) -> Any: if json_body is not None and form_data is not None: raise ValueError('json_body 与 form_data 互斥') if form_data is not None: resp = self.session.post(self._url(path), data=form_data) else: resp = self.session.post(self._url(path), json=json_body) if resp.status_code // 100 != 2: raise RuntimeError('POST {} failed rc={}: {}'.format( path, resp.status_code, resp.text)) try: return resp.json() except ValueError: raise RuntimeError('POST {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format( path, resp.text[:200]))