Bladeren bron

fix(datax): runner 注入 PYTHONPATH/远端 cd 让 python -m dw_base.datax.cli 能找包

subprocess 新起 python 进程时 sys.path 不含项目根(老 shim 自己 sys.path.append
绕过了,新 cli 走 -m 机制必须环境层注入):
- 本机:subprocess.run env 里 PYTHONPATH 前置 base_dir
- 远端:ssh worker 'cd base_dir && <cmd>' 让 cwd 和 -m 定位生效

冒烟 3 跑起来时报 ModuleNotFoundError: No module named 'dw_base' 暴露的缺陷。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 1 week geleden
bovenliggende
commit
5626e90cef
2 gewijzigde bestanden met toevoegingen van 30 en 7 verwijderingen
  1. 12 6
      dw_base/datax/runner.py
  2. 18 1
      tests/unit/datax/test_runner.py

+ 12 - 6
dw_base/datax/runner.py

@@ -58,7 +58,8 @@ def run_job(ini_path: str,
         '-start-date', start_date,
         '-stop-date', stop_date,
     ]
-    gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host, stdout=stdout, stderr=stderr)
+    gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host, base_dir,
+                                  stdout=stdout, stderr=stderr)
     if gen_rc != 0:
         raise RuntimeError('生成 DataX json 失败: ' + ini_path)
 
@@ -70,16 +71,21 @@ def run_job(ini_path: str,
         os.path.join(datax_home, 'bin', 'datax.py'),
         json_path,
     ]
-    return _run_local_or_remote(exec_argv, worker_host, current_host, stdout=stdout, stderr=stderr)
+    return _run_local_or_remote(exec_argv, worker_host, current_host, base_dir,
+                                stdout=stdout, stderr=stderr)
 
 
-def _run_local_or_remote(argv, worker_host: str, current_host: str,
+def _run_local_or_remote(argv, worker_host: str, current_host: str, base_dir: str,
                          stdout=None, stderr=None) -> int:
     """
-    本机 = subprocess.run 直跑;远端 = ssh worker_host '<cmd>' 传字符串。
+    本机:subprocess.run 直跑 + PYTHONPATH 注入 base_dir(让 python -m dw_base.datax.cli 能找到包)
+    远端:ssh worker_host 'cd <base_dir> && <cmd>'(cwd 为项目根,同样保证 -m 能找到 dw_base)
     stdout/stderr 默认继承父进程;batch 层可传文件句柄做每任务独立日志。
     """
     if worker_host == current_host:
-        return subprocess.run(argv, stdout=stdout, stderr=stderr).returncode
-    remote_cmd = ' '.join(shlex.quote(a) for a in argv)
+        env = os.environ.copy()
+        existing = env.get('PYTHONPATH', '')
+        env['PYTHONPATH'] = base_dir + (os.pathsep + existing if existing else '')
+        return subprocess.run(argv, stdout=stdout, stderr=stderr, env=env).returncode
+    remote_cmd = 'cd ' + shlex.quote(base_dir) + ' && ' + ' '.join(shlex.quote(a) for a in argv)
     return subprocess.run(['ssh', worker_host, remote_cmd], stdout=stdout, stderr=stderr).returncode

+ 18 - 1
tests/unit/datax/test_runner.py

@@ -42,11 +42,28 @@ def test_run_job_remote_uses_ssh(mock_run, tmp_path):
     first_argv = mock_run.call_args_list[0][0][0]
     assert first_argv[0] == 'ssh'
     assert first_argv[1] == 'cdhnode02'
-    # remote_cmd 是单字符串参数
+    # remote_cmd 是单字符串参数,cd base_dir && <cmd>
+    assert first_argv[2].startswith('cd ')
     assert 'dw_base.datax.cli' in first_argv[2]
     assert 'gen-json' in first_argv[2]
 
 
+@patch('dw_base.datax.runner.subprocess.run')
+def test_run_job_local_injects_pythonpath(mock_run, tmp_path):
+    """本机 subprocess 必须带 PYTHONPATH=base_dir 让 python -m 能找到 dw_base 包。"""
+    mock_run.return_value = _RC(0)
+    run_job(
+        ini_path=str(tmp_path / 'x.ini'),
+        start_date='20260422', stop_date='20260423',
+        worker_host='cdhmaster02', current_host='cdhmaster02',
+        base_dir=str(tmp_path), python3_path='/usr/bin/python3',
+        datax_home='/opt/datax',
+    )
+    call_kwargs = mock_run.call_args_list[0][1]
+    assert 'env' in call_kwargs
+    assert str(tmp_path) in call_kwargs['env']['PYTHONPATH']
+
+
 @patch('dw_base.datax.runner.subprocess.run')
 def test_run_job_gen_failure_raises(mock_run, tmp_path):
     mock_run.return_value = _RC(1)