api.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # -*- coding:utf-8 -*-
  2. """
  3. DolphinScheduler API 客户端最小封装。
  4. 职责:
  5. - 读 conf/ds.ini 拿 base_url + token
  6. - requests.Session 注入 token header
  7. - 通用 get / post / put:2xx 返 JSON;非 2xx 抛 RuntimeError;
  8. expect_business_ok=True 时还会检查 DS 业务级 code == 0
  9. - 不预封任何具体 endpoint,调用方按需拼 path
  10. DS 业务级响应约定:
  11. - 2xx HTTP + body 形如 {"code": 0, "data": {...}, "msg": "success"}
  12. - code != 0 表示业务级失败(参数错 / 权限 / 资源冲突);HTTP 仍 2xx
  13. - expect_business_ok=True 把这两层失败收口到一个 RuntimeError 里
  14. 含 method + path + 完整 response,caller 不再需要自己写 post_check helper
  15. DS 3.4 PUT /workflow-definition/{code} 必传字段(实证踩坑):
  16. - taskDefinitionJson 中每个 task 字典必须含 version 字段——DS 端用 (code, version) 识别
  17. task identity;缺则被当新建 task 处理,原 task 被丢弃
  18. - taskRelationJson 中 preTaskVersion / postTaskVersion 必须用任务真实 version,不能硬编 0
  19. - locations 字段必须传 [{taskCode, x, y}, ...] 含全部 task 坐标,否则 UI 不渲染节点
  20. (taskDefinitionList 数据上有,但 DAG 看不到)
  21. 工作流 body 拼装:dw_base/ds/builders.py
  22. 工作流查询 / 状态切换:dw_base/ds/workflow.py
  23. 完整使用样板:workspace/20260507/extend_raw_to_ods_workflows.py
  24. """
  25. import os
  26. from configparser import ConfigParser
  27. from typing import Any, Dict, Optional
  28. import requests
  29. _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  30. _DEFAULT_CONF_PATH = os.path.join(_PROJECT_ROOT, 'conf', 'ds.ini')
  31. class DSClient:
  32. def __init__(self, conf_path=None):
  33. # type: (Optional[str]) -> None
  34. path = conf_path or _DEFAULT_CONF_PATH
  35. if not os.path.isfile(path):
  36. raise RuntimeError('DS 配置文件不存在:' + path)
  37. cp = ConfigParser()
  38. cp.read(path, encoding='utf-8')
  39. self.base_url = cp.get('dolphinscheduler', 'base_url').rstrip('/')
  40. self.token = cp.get('dolphinscheduler', 'token')
  41. self.session = requests.Session()
  42. self.session.headers['token'] = self.token
  43. def _url(self, path):
  44. # type: (str) -> str
  45. return self.base_url + '/' + path.lstrip('/')
  46. def _request(self, method, path,
  47. params=None, json_body=None, form_data=None,
  48. expect_business_ok=False):
  49. # type: (str, str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any
  50. """get / post / put 共用。3 层错误统一收口:
  51. 1. 非 2xx HTTP → RuntimeError 含 method/path/rc/text
  52. 2. 2xx + 非 JSON(path 错被 SPA fallback 返 HTML)→ RuntimeError 含 raw text 前 200 字符
  53. 3. expect_business_ok=True 且 dict 响应 code != 0 → RuntimeError 含完整 response
  54. """
  55. if json_body is not None and form_data is not None:
  56. raise ValueError('json_body 与 form_data 互斥')
  57. kwargs = {}
  58. if params is not None:
  59. kwargs['params'] = params
  60. if form_data is not None:
  61. kwargs['data'] = form_data
  62. elif json_body is not None:
  63. kwargs['json'] = json_body
  64. resp = self.session.request(method, self._url(path), **kwargs)
  65. if resp.status_code // 100 != 2:
  66. raise RuntimeError('{} {} failed rc={}: {}'.format(
  67. method, path, resp.status_code, resp.text))
  68. try:
  69. data = resp.json()
  70. except ValueError:
  71. raise RuntimeError('{} {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format(
  72. method, path, resp.text[:200]))
  73. if expect_business_ok and isinstance(data, dict) and data.get('code') != 0:
  74. raise RuntimeError('{} {} 业务码 != 0:{}'.format(method, path, data))
  75. return data
  76. def get(self, path, params=None, expect_business_ok=False):
  77. # type: (str, Optional[Dict[str, Any]], bool) -> Any
  78. return self._request('GET', path, params=params,
  79. expect_business_ok=expect_business_ok)
  80. def post(self, path, json_body=None, form_data=None, expect_business_ok=False):
  81. # type: (str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any
  82. return self._request('POST', path,
  83. json_body=json_body, form_data=form_data,
  84. expect_business_ok=expect_business_ok)
  85. def put(self, path, json_body=None, form_data=None, expect_business_ok=False):
  86. # type: (str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any
  87. return self._request('PUT', path,
  88. json_body=json_body, form_data=form_data,
  89. expect_business_ok=expect_business_ok)