path_utils.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. # -*- coding:utf-8 -*-
  2. """
  3. DataX 入口侧的路径拼接工具(纯函数,无副作用)。
  4. 集中点:
  5. - job_name_from_ini: ini 路径 → job 名(去目录、去 .ini 扩展)
  6. - json_output_path: ini → json 作业配置输出绝对路径(扁平化 conf/datax-json/{job_name}.json)
  7. - log_path: 日志路径统一模板 {LOG_ROOT_DIR}/{module}/{dt}/{job_name}.log
  8. 对齐 kb/90 §2.5(JSON 路径扁平化)和 §7.2.1(日志路径)。
  9. """
  10. import os
  11. def job_name_from_ini(ini_path: str) -> str:
  12. """
  13. 从 ini 路径提取 job 名(basename 去 .ini 扩展)。
  14. >>> job_name_from_ini('/a/b/c/app_user_cert_info.ini')
  15. 'app_user_cert_info'
  16. >>> job_name_from_ini('jobs/raw/usr/xxx.ini')
  17. 'xxx'
  18. """
  19. basename = os.path.basename(ini_path)
  20. if basename.endswith('.ini'):
  21. basename = basename[:-len('.ini')]
  22. return basename
  23. def json_output_path(base_dir: str, ini_path: str) -> str:
  24. """
  25. 按 ini 推导 DataX json 作业配置的输出绝对路径。
  26. 扁平化:{base_dir}/conf/datax-json/{job_name}.json。
  27. """
  28. return os.path.join(base_dir, 'conf', 'datax-json', job_name_from_ini(ini_path) + '.json')
  29. def log_path(log_root_dir: str, module: str, dt: str, job_name: str) -> str:
  30. """
  31. 日志路径:{log_root_dir}/{module}/{dt}/{job_name}.log
  32. """
  33. return os.path.join(log_root_dir, module, dt, job_name + '.log')