ds_start_workflow.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. """
  2. start_workflow(project_name, process_name, start_params):
  3. project_name: 项目名称
  4. process_name: 工作流名称
  5. start_params: 工作流参数(dict)
  6. """
  7. import json
  8. import requests
  9. import yaml
  10. import re
  11. import os
  12. import logging
  13. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  14. abspath = os.path.abspath(__file__)
  15. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse/", abspath)
  16. config_path = os.getenv("CONFIG_PATH", "dw_base/ds/config/base_config.yaml")
  17. process_code_path = os.getenv("PROCESS_CODE_PATH", "dw_base/ds/config/process_code.yaml")
  18. def load_yaml_config(path):
  19. try:
  20. with open(path, 'r') as file:
  21. config = yaml.safe_load(file)
  22. return config
  23. except FileNotFoundError:
  24. logging.error(f"配置文件 {path} 未找到")
  25. return {}
  26. except Exception as e:
  27. logging.error(f"读取配置文件时发生错误: {e}")
  28. return {}
  29. def init_params(config):
  30. params: dict = config.get("request_params")
  31. for key in params.keys():
  32. if params[key] is None:
  33. params[key] = ""
  34. else:
  35. params[key] = str(params.get(key))
  36. return params
  37. def send_request(url, headers, params):
  38. if params is None:
  39. return
  40. try:
  41. result = requests.post(url=url, headers=headers, params=params)
  42. result.raise_for_status()
  43. logging.info(result.json())
  44. except requests.exceptions.RequestException as e:
  45. logging.error(f"请求失败: {e}")
  46. def get_request_base(project_name, process_name, token):
  47. base_config: dict = load_yaml_config(root_path + config_path)
  48. base_url = base_config.get("base_url")
  49. headers = {
  50. "token": token
  51. }
  52. params = init_params(base_config)
  53. process_code_config: dict = load_yaml_config(root_path + process_code_path)
  54. project_code = process_code_config.get("project_code").get(project_name)
  55. url = f"{base_url}/projects/{project_code}/executors/start-process-instance"
  56. process_code = process_code_config.get("process_code").get(project_name).get(process_name)
  57. params["project_code"] = str(project_code)
  58. params["processDefinitionCode"] = str(process_code)
  59. return url, headers, params
  60. def start_workflow(project_name, process_name, start_params, token):
  61. url, headers, params = get_request_base(project_name, process_name, token)
  62. params["startParams"] = json.dumps(start_params)
  63. send_request(url, headers, params)