""" start_workflow(project_name, process_name, start_params): project_name: 项目名称 process_name: 工作流名称 start_params: 工作流参数(dict) """ import json import requests import yaml import re import os import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse/", abspath) config_path = os.getenv("CONFIG_PATH", "dw_base/ds/config/base_config.yaml") process_code_path = os.getenv("PROCESS_CODE_PATH", "dw_base/ds/config/process_code.yaml") def load_yaml_config(path): try: with open(path, 'r') as file: config = yaml.safe_load(file) return config except FileNotFoundError: logging.error(f"配置文件 {path} 未找到") return {} except Exception as e: logging.error(f"读取配置文件时发生错误: {e}") return {} def init_params(config): params: dict = config.get("request_params") for key in params.keys(): if params[key] is None: params[key] = "" else: params[key] = str(params.get(key)) return params def send_request(url, headers, params): if params is None: return try: result = requests.post(url=url, headers=headers, params=params) result.raise_for_status() logging.info(result.json()) except requests.exceptions.RequestException as e: logging.error(f"请求失败: {e}") def get_request_base(project_name, process_name, token): base_config: dict = load_yaml_config(root_path + config_path) base_url = base_config.get("base_url") headers = { "token": token } params = init_params(base_config) process_code_config: dict = load_yaml_config(root_path + process_code_path) project_code = process_code_config.get("project_code").get(project_name) url = f"{base_url}/projects/{project_code}/executors/start-process-instance" process_code = process_code_config.get("process_code").get(project_name).get(process_name) params["project_code"] = str(project_code) params["processDefinitionCode"] = str(process_code) return url, headers, params def start_workflow(project_name, process_name, start_params, token): url, headers, params = get_request_base(project_name, process_name, token) params["startParams"] = json.dumps(start_params) send_request(url, headers, params)