Browse Source

feat(datax): 新增 runner 模块(单任务 ssh/本机执行)

run_job: 生成 json(透过老 bin/datax-job-config-generator.py 薄壳)
+ 执行 datax.py;本机 subprocess.run 直跑,远端 ssh worker_host '<cmd>'
stdout/stderr 继承父进程,tee/重定向由上游 batch 层控制

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 1 tuần trước cách đây
mục cha
commit
321d22f6b7
1 tập tin đã thay đổi với 81 bổ sung0 xóa
  1. 81 0
      dw_base/datax/runner.py

+ 81 - 0
dw_base/datax/runner.py

@@ -0,0 +1,81 @@
+# -*- coding:utf-8 -*-
+"""
+DataX 单任务执行器(从 bin/datax-single-job-starter.sh 搬迁 Python 版)。
+
+流程(对齐老脚本 generate_job_config + run_single_datax_job):
+1. 生成 json(调 bin/datax-job-config-generator.py,本机 subprocess / 远端 ssh)
+2. 执行 datax.py(同样本机 / 远端分发)
+
+注:本版继续透过老 shim bin/datax-job-config-generator.py 生成 json(批次 4 删老
+脚本时再改为直接 import JobConfigGenerator);worker 选择由 worker.select_worker 提供,
+本模块只接收最终 worker_host。
+"""
+import os
+import shlex
+import subprocess
+
+from dw_base.datax import path_utils
+
+
+def run_job(ini_path: str,
+            start_date: str,
+            stop_date: str,
+            worker_host: str,
+            current_host: str,
+            base_dir: str,
+            python3_path: str,
+            datax_home: str,
+            skip_datax: bool = False) -> int:
+    """
+    单任务执行:生成 json → 执行 datax.py(本机或 ssh 到远端)。
+
+    Args:
+        ini_path: DataX ini 绝对路径
+        start_date / stop_date: yyyyMMdd
+        worker_host: 已由 worker.select_worker 决定的目标节点
+        current_host: hostname -s
+        base_dir: 项目根绝对路径
+        python3_path: python3 可执行路径(conf/env.sh 的 PYTHON3_PATH)
+        datax_home: DataX 安装路径(conf/env.sh 的 DATAX_HOME)
+        skip_datax: 只生成 json 不执行
+
+    Returns:
+        datax.py 的 returncode(0 = 成功);skip_datax=True 时返回 0
+
+    Raises:
+        RuntimeError: 生成 json 失败
+    """
+    json_path = path_utils.json_output_path(base_dir, ini_path)
+    os.makedirs(os.path.dirname(json_path), exist_ok=True)
+
+    gen_argv = [
+        python3_path, '-u',
+        os.path.join(base_dir, 'bin', 'datax-job-config-generator.py'),
+        '-c', ini_path,
+        '-start-date', start_date,
+        '-stop-date', stop_date,
+    ]
+    gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host)
+    if gen_rc != 0:
+        raise RuntimeError('生成 DataX json 失败: ' + ini_path)
+
+    if skip_datax:
+        return 0
+
+    exec_argv = [
+        python3_path, '-u',
+        os.path.join(datax_home, 'bin', 'datax.py'),
+        json_path,
+    ]
+    return _run_local_or_remote(exec_argv, worker_host, current_host)
+
+
+def _run_local_or_remote(argv, worker_host: str, current_host: str) -> int:
+    """
+    本机 = subprocess.run 直跑;远端 = ssh worker_host '<cmd>' 传字符串。
+    stdout/stderr 继承父进程,由上游(batch 层)决定 tee / 重定向。
+    """
+    if worker_host == current_host:
+        return subprocess.run(argv).returncode
+    remote_cmd = ' '.join(shlex.quote(a) for a in argv)
+    return subprocess.run(['ssh', worker_host, remote_cmd]).returncode