api.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. # -*- coding:utf-8 -*-
  2. """
  3. DolphinScheduler API 客户端最小封装。
  4. 职责:
  5. - 读 conf/ds.ini 拿 base_url + token + 项目码映射
  6. - requests.Session 注入 token header
  7. - 通用 get / post / put:2xx 返 JSON;非 2xx 抛 RuntimeError;
  8. expect_business_ok=True 时还会检查 DS 业务级 code == 0
  9. - project_code(name) 查 conf [projects] 段,避免 workspace 脚本硬编 magic number
  10. - 不预封任何具体 endpoint,调用方按需拼 path
  11. DS 业务级响应约定:
  12. - 2xx HTTP + body 形如 {"code": 0, "data": {...}, "msg": "success"}
  13. - code != 0 表示业务级失败(参数错 / 权限 / 资源冲突);HTTP 仍 2xx
  14. - expect_business_ok=True 把这两层失败收口到一个 RuntimeError 里
  15. 含 method + path + 完整 response,caller 不再需要自己写 post_check helper
  16. DS 3.4 PUT /workflow-definition/{code} 必传字段(实证踩坑):
  17. - taskDefinitionJson 中每个 task 字典必须含 version 字段——DS 端用 (code, version) 识别
  18. task identity;缺则被当新建 task 处理,原 task 被丢弃
  19. - taskRelationJson 中 preTaskVersion / postTaskVersion 必须用任务真实 version,不能硬编 0
  20. - locations 字段必须传 [{taskCode, x, y}, ...] 含全部 task 坐标,否则 UI 不渲染节点
  21. (taskDefinitionList 数据上有,但 DAG 看不到)
  22. 工作流 body 拼装:dw_base/ds/builders.py
  23. 工作流查询 / 状态切换:dw_base/ds/workflow.py
  24. 完整使用样板:workspace/20260507/extend_raw_to_ods_workflows.py
  25. """
  26. import os
  27. from configparser import ConfigParser
  28. from typing import Any, Dict, Optional
  29. import requests
  30. _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  31. _DEFAULT_CONF_PATH = os.path.join(_PROJECT_ROOT, 'conf', 'ds.ini')
  32. class DSClient:
  33. def __init__(self, conf_path=None):
  34. # type: (Optional[str]) -> None
  35. path = conf_path or _DEFAULT_CONF_PATH
  36. if not os.path.isfile(path):
  37. raise RuntimeError('DS 配置文件不存在:' + path)
  38. cp = ConfigParser()
  39. cp.read(path, encoding='utf-8')
  40. self.base_url = cp.get('dolphinscheduler', 'base_url').rstrip('/')
  41. self.token = cp.get('dolphinscheduler', 'token')
  42. # [projects] 段可选(向后兼容旧 ds.ini);存在则读为 {name: code} dict
  43. self._projects = {} # type: Dict[str, int]
  44. if cp.has_section('projects'):
  45. for name, code_str in cp.items('projects'):
  46. self._projects[name] = int(code_str)
  47. self.session = requests.Session()
  48. self.session.headers['token'] = self.token
  49. def _url(self, path):
  50. # type: (str) -> str
  51. return self.base_url + '/' + path.lstrip('/')
  52. def project_code(self, name):
  53. # type: (str) -> int
  54. """查 conf/ds.ini [projects] 段拿项目码。
  55. 未注册抛 RuntimeError 提示加 conf。避免 workspace 脚本硬编
  56. magic number;项目码不算高敏,入库 ds.ini。
  57. """
  58. if name not in self._projects:
  59. raise RuntimeError(
  60. "项目码未注册:'{}';conf/ds.ini [projects] 段加 '{} = <code>'".format(
  61. name, name))
  62. return self._projects[name]
  63. def _request(self, method, path,
  64. params=None, json_body=None, form_data=None,
  65. expect_business_ok=False):
  66. # type: (str, str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any
  67. """get / post / put 共用。3 层错误统一收口:
  68. 1. 非 2xx HTTP → RuntimeError 含 method/path/rc/text
  69. 2. 2xx + 非 JSON(path 错被 SPA fallback 返 HTML)→ RuntimeError 含 raw text 前 200 字符
  70. 3. expect_business_ok=True 且 dict 响应 code != 0 → RuntimeError 含完整 response
  71. """
  72. if json_body is not None and form_data is not None:
  73. raise ValueError('json_body 与 form_data 互斥')
  74. kwargs = {}
  75. if params is not None:
  76. kwargs['params'] = params
  77. if form_data is not None:
  78. kwargs['data'] = form_data
  79. elif json_body is not None:
  80. kwargs['json'] = json_body
  81. resp = self.session.request(method, self._url(path), **kwargs)
  82. if resp.status_code // 100 != 2:
  83. raise RuntimeError('{} {} failed rc={}: {}'.format(
  84. method, path, resp.status_code, resp.text))
  85. try:
  86. data = resp.json()
  87. except ValueError:
  88. raise RuntimeError('{} {} 返回非 JSON(可能 path 错被 SPA fallback): {}'.format(
  89. method, path, resp.text[:200]))
  90. if expect_business_ok and isinstance(data, dict) and data.get('code') != 0:
  91. raise RuntimeError('{} {} 业务码 != 0:{}'.format(method, path, data))
  92. return data
  93. def get(self, path, params=None, expect_business_ok=False):
  94. # type: (str, Optional[Dict[str, Any]], bool) -> Any
  95. return self._request('GET', path, params=params,
  96. expect_business_ok=expect_business_ok)
  97. def post(self, path, json_body=None, form_data=None, expect_business_ok=False):
  98. # type: (str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any
  99. return self._request('POST', path,
  100. json_body=json_body, form_data=form_data,
  101. expect_business_ok=expect_business_ok)
  102. def put(self, path, json_body=None, form_data=None, expect_business_ok=False):
  103. # type: (str, Optional[Dict[str, Any]], Optional[Dict[str, Any]], bool) -> Any
  104. return self._request('PUT', path,
  105. json_body=json_body, form_data=form_data,
  106. expect_business_ok=expect_business_ok)