Browse Source

refactor(datax): runner 加 stdout/stderr 参数(供 batch 层重定向日志)

默认 None = 继承父进程(老行为不变)
batch 并行模式传文件句柄,实现每任务独立 log 文件(对齐老 > LOG 2>&1 &)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 1 week ago
parent
commit
8dcdc412b8
1 changed files with 12 additions and 7 deletions
  1. 12 7
      dw_base/datax/runner.py

+ 12 - 7
dw_base/datax/runner.py

@@ -25,7 +25,9 @@ def run_job(ini_path: str,
             base_dir: str,
             python3_path: str,
             datax_home: str,
-            skip_datax: bool = False) -> int:
+            skip_datax: bool = False,
+            stdout=None,
+            stderr=None) -> int:
     """
     单任务执行:生成 json → 执行 datax.py(本机或 ssh 到远端)。
 
@@ -38,6 +40,8 @@ def run_job(ini_path: str,
         python3_path: python3 可执行路径(conf/env.sh 的 PYTHON3_PATH)
         datax_home: DataX 安装路径(conf/env.sh 的 DATAX_HOME)
         skip_datax: 只生成 json 不执行
+        stdout / stderr: 传给 subprocess.run 的重定向 target(文件句柄等);
+                        None = 继承父进程。batch 并行模式传文件句柄隔离每任务日志
 
     Returns:
         datax.py 的 returncode(0 = 成功);skip_datax=True 时返回 0
@@ -55,7 +59,7 @@ def run_job(ini_path: str,
         '-start-date', start_date,
         '-stop-date', stop_date,
     ]
-    gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host)
+    gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host, stdout=stdout, stderr=stderr)
     if gen_rc != 0:
         raise RuntimeError('生成 DataX json 失败: ' + ini_path)
 
@@ -67,15 +71,16 @@ def run_job(ini_path: str,
         os.path.join(datax_home, 'bin', 'datax.py'),
         json_path,
     ]
-    return _run_local_or_remote(exec_argv, worker_host, current_host)
+    return _run_local_or_remote(exec_argv, worker_host, current_host, stdout=stdout, stderr=stderr)
 
 
-def _run_local_or_remote(argv, worker_host: str, current_host: str) -> int:
+def _run_local_or_remote(argv, worker_host: str, current_host: str,
+                         stdout=None, stderr=None) -> int:
     """
     本机 = subprocess.run 直跑;远端 = ssh worker_host '<cmd>' 传字符串。
-    stdout/stderr 继承父进程,由上游(batch 层)决定 tee / 重定向
+    stdout/stderr 默认继承父进程;batch 层可传文件句柄做每任务独立日志
     """
     if worker_host == current_host:
-        return subprocess.run(argv).returncode
+        return subprocess.run(argv, stdout=stdout, stderr=stderr).returncode
     remote_cmd = ' '.join(shlex.quote(a) for a in argv)
-    return subprocess.run(['ssh', worker_host, remote_cmd]).returncode
+    return subprocess.run(['ssh', worker_host, remote_cmd], stdout=stdout, stderr=stderr).returncode