# -*- coding:utf-8 -*- """ DataX 入口侧的路径拼接工具(纯函数,无副作用)。 集中点: - job_name_from_ini: ini 路径 → job 名(去目录、去 .ini 扩展) - json_output_path: ini → json 作业配置输出绝对路径(扁平化 conf/datax-json/{job_name}.json) - log_path: 日志路径统一模板 {LOG_ROOT_DIR}/{module}/{dt}/{job_name}.log 对齐 kb/90 §2.5(JSON 路径扁平化)和 §7.2.1(日志路径)。 """ import os def job_name_from_ini(ini_path: str) -> str: """ 从 ini 路径提取 job 名(basename 去 .ini 扩展)。 >>> job_name_from_ini('/a/b/c/app_user_cert_info.ini') 'app_user_cert_info' >>> job_name_from_ini('jobs/raw/usr/xxx.ini') 'xxx' """ basename = os.path.basename(ini_path) if basename.endswith('.ini'): basename = basename[:-len('.ini')] return basename def json_output_path(base_dir: str, ini_path: str) -> str: """ 按 ini 推导 DataX json 作业配置的输出绝对路径。 扁平化:{base_dir}/conf/datax-json/{job_name}.json。 """ return os.path.join(base_dir, 'conf', 'datax-json', job_name_from_ini(ini_path) + '.json') def log_path(log_root_dir: str, module: str, dt: str, job_name: str) -> str: """ 日志路径:{log_root_dir}/{module}/{dt}/{job_name}.log """ return os.path.join(log_root_dir, module, dt, job_name + '.log')