api.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. # -*- coding:utf-8 -*-
  2. """
  3. DolphinScheduler API 客户端最小封装。
  4. 职责:
  5. - 读 conf/ds.ini 拿 base_url + token
  6. - requests.Session 注入 token header
  7. - 通用 get / post,2xx 返回 json,非 2xx 抛 RuntimeError
  8. - 不预封任何具体 endpoint,调用方按需拼 path
  9. DS 3.4 PUT /workflow-definition/{code} 必传字段(实证踩坑):
  10. - taskDefinitionJson 中每个 task 字典必须含 version 字段——DS 端用 (code, version) 识别
  11. task identity;缺则被当新建 task 处理,原 task 被丢弃
  12. - taskRelationJson 中 preTaskVersion / postTaskVersion 必须用任务真实 version,不能硬编 0
  13. - locations 字段必须传 [{taskCode, x, y}, ...] 含全部 task 坐标,否则 UI 不渲染节点
  14. (taskDefinitionList 数据上有,但 DAG 看不到)
  15. 实现参考:workspace/20260507/extend_raw_to_ods_workflows.py(含 verify_checklist 校验)
  16. """
  17. import os
  18. from configparser import ConfigParser
  19. from typing import Any, Dict, Optional
  20. import requests
  21. _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  22. _DEFAULT_CONF_PATH = os.path.join(_PROJECT_ROOT, 'conf', 'ds.ini')
  23. class DSClient:
  24. def __init__(self, conf_path: Optional[str] = None):
  25. path = conf_path or _DEFAULT_CONF_PATH
  26. if not os.path.isfile(path):
  27. raise RuntimeError('DS 配置文件不存在:' + path)
  28. cp = ConfigParser()
  29. cp.read(path, encoding='utf-8')
  30. self.base_url = cp.get('dolphinscheduler', 'base_url').rstrip('/')
  31. self.token = cp.get('dolphinscheduler', 'token')
  32. self.session = requests.Session()
  33. self.session.headers['token'] = self.token
  34. def _url(self, path: str) -> str:
  35. return self.base_url + '/' + path.lstrip('/')
  36. def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
  37. resp = self.session.get(self._url(path), params=params)
  38. if resp.status_code // 100 != 2:
  39. raise RuntimeError('GET {} failed rc={}: {}'.format(
  40. path, resp.status_code, resp.text))
  41. try:
  42. return resp.json()
  43. except ValueError:
  44. raise RuntimeError('GET {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format(
  45. path, resp.text[:200]))
  46. def post(self, path: str, json_body: Optional[Dict[str, Any]] = None,
  47. form_data: Optional[Dict[str, Any]] = None) -> Any:
  48. if json_body is not None and form_data is not None:
  49. raise ValueError('json_body 与 form_data 互斥')
  50. if form_data is not None:
  51. resp = self.session.post(self._url(path), data=form_data)
  52. else:
  53. resp = self.session.post(self._url(path), json=json_body)
  54. if resp.status_code // 100 != 2:
  55. raise RuntimeError('POST {} failed rc={}: {}'.format(
  56. path, resp.status_code, resp.text))
  57. try:
  58. return resp.json()
  59. except ValueError:
  60. raise RuntimeError('POST {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format(
  61. path, resp.text[:200]))