| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- """
- start_workflow(project_name, process_name, start_params):
- project_name: 项目名称
- process_name: 工作流名称
- start_params: 工作流参数(dict)
- """
- import json
- import requests
- import yaml
- import re
- import os
- import logging
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse/", abspath)
- config_path = os.getenv("CONFIG_PATH", "dw_base/ds/config/base_config.yaml")
- process_code_path = os.getenv("PROCESS_CODE_PATH", "dw_base/ds/config/process_code.yaml")
- def load_yaml_config(path):
- try:
- with open(path, 'r') as file:
- config = yaml.safe_load(file)
- return config
- except FileNotFoundError:
- logging.error(f"配置文件 {path} 未找到")
- return {}
- except Exception as e:
- logging.error(f"读取配置文件时发生错误: {e}")
- return {}
- def init_params(config):
- params: dict = config.get("request_params")
- for key in params.keys():
- if params[key] is None:
- params[key] = ""
- else:
- params[key] = str(params.get(key))
- return params
- def send_request(url, headers, params):
- if params is None:
- return
- try:
- result = requests.post(url=url, headers=headers, params=params)
- result.raise_for_status()
- logging.info(result.json())
- except requests.exceptions.RequestException as e:
- logging.error(f"请求失败: {e}")
- def get_request_base(project_name, process_name, token):
- base_config: dict = load_yaml_config(root_path + config_path)
- base_url = base_config.get("base_url")
- headers = {
- "token": token
- }
- params = init_params(base_config)
- process_code_config: dict = load_yaml_config(root_path + process_code_path)
- project_code = process_code_config.get("project_code").get(project_name)
- url = f"{base_url}/projects/{project_code}/executors/start-process-instance"
- process_code = process_code_config.get("process_code").get(project_name).get(process_name)
- params["project_code"] = str(project_code)
- params["processDefinitionCode"] = str(process_code)
- return url, headers, params
- def start_workflow(project_name, process_name, start_params, token):
- url, headers, params = get_request_base(project_name, process_name, token)
- params["startParams"] = json.dumps(start_params)
- send_request(url, headers, params)
|