# -*- coding:utf-8 -*- """ DataX 单任务执行器(从 bin/datax-single-job-starter.sh 搬迁 Python 版)。 流程(对齐老脚本 generate_job_config + run_single_datax_job): 1. 生成 json(调 bin/datax-job-config-generator.py,本机 subprocess / 远端 ssh) 2. 执行 datax.py(同样本机 / 远端分发) 注:本版继续透过老 shim bin/datax-job-config-generator.py 生成 json(批次 4 删老 脚本时再改为直接 import JobConfigGenerator);worker 选择由 worker.select_worker 提供, 本模块只接收最终 worker_host。 """ import os import shlex import subprocess from dw_base.datax import path_utils def run_job(ini_path: str, start_date: str, stop_date: str, worker_host: str, current_host: str, base_dir: str, python3_path: str, datax_home: str, skip_datax: bool = False, stdout=None, stderr=None) -> int: """ 单任务执行:生成 json → 执行 datax.py(本机或 ssh 到远端)。 Args: ini_path: DataX ini 绝对路径 start_date / stop_date: yyyyMMdd worker_host: 已由 worker.select_worker 决定的目标节点 current_host: hostname -s base_dir: 项目根绝对路径 python3_path: python3 可执行路径(conf/env.sh 的 PYTHON3_PATH) datax_home: DataX 安装路径(conf/env.sh 的 DATAX_HOME) skip_datax: 只生成 json 不执行 stdout / stderr: 传给 subprocess.run 的重定向 target(文件句柄等); None = 继承父进程。batch 并行模式传文件句柄隔离每任务日志 Returns: datax.py 的 returncode(0 = 成功);skip_datax=True 时返回 0 Raises: RuntimeError: 生成 json 失败 """ json_path = path_utils.json_output_path(base_dir, ini_path) os.makedirs(os.path.dirname(json_path), exist_ok=True) gen_argv = [ python3_path, '-u', os.path.join(base_dir, 'bin', 'datax-job-config-generator.py'), '-c', ini_path, '-start-date', start_date, '-stop-date', stop_date, ] gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host, stdout=stdout, stderr=stderr) if gen_rc != 0: raise RuntimeError('生成 DataX json 失败: ' + ini_path) if skip_datax: return 0 exec_argv = [ python3_path, '-u', os.path.join(datax_home, 'bin', 'datax.py'), json_path, ] return _run_local_or_remote(exec_argv, worker_host, current_host, stdout=stdout, stderr=stderr) def _run_local_or_remote(argv, worker_host: str, current_host: str, stdout=None, stderr=None) -> int: """ 本机 = subprocess.run 直跑;远端 = ssh worker_host '' 传字符串。 stdout/stderr 默认继承父进程;batch 层可传文件句柄做每任务独立日志。 """ if worker_host == current_host: return subprocess.run(argv, stdout=stdout, stderr=stderr).returncode remote_cmd = ' '.join(shlex.quote(a) for a in argv) return subprocess.run(['ssh', worker_host, remote_cmd], stdout=stdout, stderr=stderr).returncode