| 123456789101112131415161718192021222324252627282930313233343536373839404142 |
- # -*- coding:utf-8 -*-
- """
- DataX 入口侧的路径拼接工具(纯函数,无副作用)。
- 集中点:
- - job_name_from_ini: ini 路径 → job 名(去目录、去 .ini 扩展)
- - json_output_path: ini → json 作业配置输出绝对路径(扁平化 conf/datax-json/{job_name}.json)
- - log_path: 日志路径统一模板 {LOG_ROOT_DIR}/{module}/{dt}/{job_name}.log
- 对齐 kb/90 §2.5(JSON 路径扁平化)和 §7.2.1(日志路径)。
- """
- import os
- def job_name_from_ini(ini_path: str) -> str:
- """
- 从 ini 路径提取 job 名(basename 去 .ini 扩展)。
- >>> job_name_from_ini('/a/b/c/app_user_cert_info.ini')
- 'app_user_cert_info'
- >>> job_name_from_ini('jobs/raw/usr/xxx.ini')
- 'xxx'
- """
- basename = os.path.basename(ini_path)
- if basename.endswith('.ini'):
- basename = basename[:-len('.ini')]
- return basename
- def json_output_path(base_dir: str, ini_path: str) -> str:
- """
- 按 ini 推导 DataX json 作业配置的输出绝对路径。
- 扁平化:{base_dir}/conf/datax-json/{job_name}.json。
- """
- return os.path.join(base_dir, 'conf', 'datax-json', job_name_from_ini(ini_path) + '.json')
- def log_path(log_root_dir: str, module: str, dt: str, job_name: str) -> str:
- """
- 日志路径:{log_root_dir}/{module}/{dt}/{job_name}.log
- """
- return os.path.join(log_root_dir, module, dt, job_name + '.log')
|