| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- # -*- coding:utf-8 -*-
- """
- DolphinScheduler API 客户端最小封装。
- 职责:
- - 读 conf/ds.ini 拿 base_url + token
- - requests.Session 注入 token header
- - 通用 get / post,2xx 返回 json,非 2xx 抛 RuntimeError
- - 不预封任何具体 endpoint,调用方按需拼 path
- """
- import os
- from configparser import ConfigParser
- from typing import Any, Dict, Optional
- import requests
- _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- _DEFAULT_CONF_PATH = os.path.join(_PROJECT_ROOT, 'conf', 'ds.ini')
- class DSClient:
- def __init__(self, conf_path: Optional[str] = None):
- path = conf_path or _DEFAULT_CONF_PATH
- if not os.path.isfile(path):
- raise RuntimeError('DS 配置文件不存在:' + path)
- cp = ConfigParser()
- cp.read(path, encoding='utf-8')
- self.base_url = cp.get('dolphinscheduler', 'base_url').rstrip('/')
- self.token = cp.get('dolphinscheduler', 'token')
- self.session = requests.Session()
- self.session.headers['token'] = self.token
- def _url(self, path: str) -> str:
- return self.base_url + '/' + path.lstrip('/')
- def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
- resp = self.session.get(self._url(path), params=params)
- if resp.status_code // 100 != 2:
- raise RuntimeError('GET {} failed rc={}: {}'.format(
- path, resp.status_code, resp.text))
- try:
- return resp.json()
- except ValueError:
- raise RuntimeError('GET {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format(
- path, resp.text[:200]))
- def post(self, path: str, json_body: Optional[Dict[str, Any]] = None) -> Any:
- resp = self.session.post(self._url(path), json=json_body)
- if resp.status_code // 100 != 2:
- raise RuntimeError('POST {} failed rc={}: {}'.format(
- path, resp.status_code, resp.text))
- try:
- return resp.json()
- except ValueError:
- raise RuntimeError('POST {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format(
- path, resp.text[:200]))
|