Przeglądaj źródła

feat(ds): 加 DS API CLI 最小骨架

conf/ds.ini 存 base_url + token;dw_base/ds/api.py 通用 GET/POST;
dw_base/ds/cli.py 提供 python -m 命令行入口,不封具体 endpoint。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu 5 dni temu
rodzic
commit
cd67785df4
4 zmienionych plików z 115 dodań i 0 usunięć
  1. 9 0
      conf/ds.ini
  2. 0 0
      dw_base/ds/__init__.py
  3. 49 0
      dw_base/ds/api.py
  4. 57 0
      dw_base/ds/cli.py

+ 9 - 0
conf/ds.ini

@@ -0,0 +1,9 @@
+; DolphinScheduler API 接入配置
+; 消费点:dw_base/ds/api.py:DSClient
+;
+; base_url:DS 3.4.1 默认 API 端口 12345 + path /dolphinscheduler
+; token:admin 令牌,权限顶级;后续按需迁 datasource/ds/ 不入库
+
+[dolphinscheduler]
+base_url = http://100.64.0.61:12345/dolphinscheduler
+token = 921755ce53b15c74d417978802eda884

+ 0 - 0
dw_base/ds/__init__.py


+ 49 - 0
dw_base/ds/api.py

@@ -0,0 +1,49 @@
+# -*- coding:utf-8 -*-
+"""
+DolphinScheduler API 客户端最小封装。
+
+职责:
+- 读 conf/ds.ini 拿 base_url + token
+- requests.Session 注入 token header
+- 通用 get / post,2xx 返回 json,非 2xx 抛 RuntimeError
+- 不预封任何具体 endpoint,调用方按需拼 path
+"""
+import os
+from configparser import ConfigParser
+from typing import Any, Dict, Optional
+
+import requests
+
+
+_PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+_DEFAULT_CONF_PATH = os.path.join(_PROJECT_ROOT, 'conf', 'ds.ini')
+
+
+class DSClient:
+    def __init__(self, conf_path: Optional[str] = None):
+        path = conf_path or _DEFAULT_CONF_PATH
+        if not os.path.isfile(path):
+            raise RuntimeError('DS 配置文件不存在:' + path)
+        cp = ConfigParser()
+        cp.read(path, encoding='utf-8')
+        self.base_url = cp.get('dolphinscheduler', 'base_url').rstrip('/')
+        self.token = cp.get('dolphinscheduler', 'token')
+        self.session = requests.Session()
+        self.session.headers['token'] = self.token
+
+    def _url(self, path: str) -> str:
+        return self.base_url + '/' + path.lstrip('/')
+
+    def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
+        resp = self.session.get(self._url(path), params=params)
+        if resp.status_code // 100 != 2:
+            raise RuntimeError('GET {} failed rc={}: {}'.format(
+                path, resp.status_code, resp.text))
+        return resp.json()
+
+    def post(self, path: str, json_body: Optional[Dict[str, Any]] = None) -> Any:
+        resp = self.session.post(self._url(path), json=json_body)
+        if resp.status_code // 100 != 2:
+            raise RuntimeError('POST {} failed rc={}: {}'.format(
+                path, resp.status_code, resp.text))
+        return resp.json()

+ 57 - 0
dw_base/ds/cli.py

@@ -0,0 +1,57 @@
+# -*- coding:utf-8 -*-
+"""
+DolphinScheduler API CLI 入口。
+
+用法:
+  python -m dw_base.ds.cli get <path> [-p k=v ...]
+  python -m dw_base.ds.cli post <path> [-j '<json-body>']
+
+示例:
+  python -m dw_base.ds.cli get /projects -p pageSize=10 -p pageNo=1
+  python -m dw_base.ds.cli post /projects/123/process-definition/query -j '{}'
+"""
+import argparse
+import json
+import sys
+
+from dw_base.ds.api import DSClient
+
+
+def _parse_kv(items):
+    d = {}
+    for it in items or []:
+        if '=' not in it:
+            sys.stderr.write('参数 -p 需要 k=v 格式,跳过:' + it + '\n')
+            continue
+        k, v = it.split('=', 1)
+        d[k] = v
+    return d
+
+
+def main():
+    parser = argparse.ArgumentParser(prog='dw_base.ds.cli', description='DolphinScheduler API CLI')
+    sub = parser.add_subparsers(dest='cmd')
+    sub.required = True
+
+    g = sub.add_parser('get', help='HTTP GET')
+    g.add_argument('path', help='API path,如 /projects')
+    g.add_argument('-p', action='append', default=[], metavar='k=v', help='query 参数(可多次)')
+
+    p = sub.add_parser('post', help='HTTP POST')
+    p.add_argument('path')
+    p.add_argument('-j', dest='body', default=None, metavar='JSON', help='JSON body')
+
+    args = parser.parse_args()
+    client = DSClient()
+
+    if args.cmd == 'get':
+        result = client.get(args.path, params=_parse_kv(args.p))
+    else:
+        body = json.loads(args.body) if args.body else None
+        result = client.post(args.path, json_body=body)
+
+    print(json.dumps(result, ensure_ascii=False, indent=2))
+
+
+if __name__ == '__main__':
+    main()