# -*- coding:utf-8 -*- """ DolphinScheduler API CLI 入口。 用法: python -m dw_base.ds.cli get [-p k=v ...] python -m dw_base.ds.cli post [-j ''] 示例: python -m dw_base.ds.cli get /projects -p pageSize=10 -p pageNo=1 python -m dw_base.ds.cli post /projects/123/process-definition/query -j '{}' """ import argparse import json import sys from dw_base.ds.api import DSClient def _parse_kv(items): d = {} for it in items or []: if '=' not in it: sys.stderr.write('参数 -p 需要 k=v 格式,跳过:' + it + '\n') continue k, v = it.split('=', 1) d[k] = v return d def main(): parser = argparse.ArgumentParser(prog='dw_base.ds.cli', description='DolphinScheduler API CLI') sub = parser.add_subparsers(dest='cmd') sub.required = True g = sub.add_parser('get', help='HTTP GET') g.add_argument('path', help='API path,如 /projects') g.add_argument('-p', action='append', default=[], metavar='k=v', help='query 参数(可多次)') p = sub.add_parser('post', help='HTTP POST') p.add_argument('path') p.add_argument('-j', dest='body', default=None, metavar='JSON', help='JSON body') args = parser.parse_args() client = DSClient() if args.cmd == 'get': result = client.get(args.path, params=_parse_kv(args.p)) else: body = json.loads(args.body) if args.body else None result = client.post(args.path, json_body=body) print(json.dumps(result, ensure_ascii=False, indent=2)) if __name__ == '__main__': main()