# -*- coding:utf-8 -*- """ DolphinScheduler API CLI 入口。 用法: python -m dw_base.ds.cli get [-p k=v ...] python -m dw_base.ds.cli post [-j ''] 示例: python -m dw_base.ds.cli get /projects -p pageSize=10 -p pageNo=1 python -m dw_base.ds.cli post /projects/123/process-definition/query -j '{}' """ import argparse import json import sys from dw_base.ds.api import DSClient def _parse_kv(items): d = {} for it in items or []: if '=' not in it: sys.stderr.write('参数 -p 需要 k=v 格式,跳过:' + it + '\n') continue k, v = it.split('=', 1) d[k] = v return d def main(): parser = argparse.ArgumentParser(prog='dw_base.ds.cli', description='DolphinScheduler API CLI') sub = parser.add_subparsers(dest='cmd') sub.required = True g = sub.add_parser('get', help='HTTP GET') g.add_argument('path', help='API path,如 /projects') g.add_argument('-p', action='append', default=[], metavar='k=v', help='query 参数(可多次)') g.add_argument('-o', dest='output', default=None, metavar='FILE', help='写 JSON 到文件(不打 stdout)') p = sub.add_parser('post', help='HTTP POST') p.add_argument('path') p.add_argument('-j', dest='body', default=None, metavar='JSON', help='JSON body') p.add_argument('-o', dest='output', default=None, metavar='FILE', help='写 JSON 到文件(不打 stdout)') args = parser.parse_args() client = DSClient() if args.cmd == 'get': result = client.get(args.path, params=_parse_kv(args.p)) else: body = json.loads(args.body) if args.body else None result = client.post(args.path, json_body=body) text = json.dumps(result, ensure_ascii=False, indent=2) if args.output: with open(args.output, 'w', encoding='utf-8') as f: f.write(text) f.write('\n') sys.stderr.write('[ds.cli] 已写到 {}\n'.format(args.output)) else: print(text) if __name__ == '__main__': main()