| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- # -*- coding:utf-8 -*-
- from unittest.mock import patch
- import pytest
- from dw_base.datax.runner import run_job
- class _RC:
- def __init__(self, code): self.returncode = code
- @patch('dw_base.datax.runner.subprocess.run')
- def test_run_job_local_two_subprocess_calls(mock_run, tmp_path):
- mock_run.return_value = _RC(0)
- rc = run_job(
- ini_path=str(tmp_path / 'x.ini'),
- start_date='20260422', stop_date='20260423',
- worker_host='cdhmaster02', current_host='cdhmaster02',
- base_dir=str(tmp_path), python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- )
- assert rc == 0
- assert mock_run.call_count == 2 # 生成 json + 执行 datax.py
- first_argv = mock_run.call_args_list[0][0][0]
- assert first_argv[0] == '/usr/bin/python3'
- assert 'dw_base.datax.cli' in first_argv
- assert 'gen-json' in first_argv
- @patch('dw_base.datax.runner.subprocess.run')
- def test_run_job_remote_uses_ssh(mock_run, tmp_path):
- mock_run.return_value = _RC(0)
- rc = run_job(
- ini_path=str(tmp_path / 'x.ini'),
- start_date='20260422', stop_date='20260423',
- worker_host='cdhnode02', current_host='cdhmaster02',
- base_dir=str(tmp_path), python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- )
- assert rc == 0
- first_argv = mock_run.call_args_list[0][0][0]
- assert first_argv[0] == 'ssh'
- assert first_argv[1] == 'cdhnode02'
- # remote_cmd 是单字符串参数,cd base_dir && <cmd>
- assert first_argv[2].startswith('cd ')
- assert 'dw_base.datax.cli' in first_argv[2]
- assert 'gen-json' in first_argv[2]
- @patch('dw_base.datax.runner.subprocess.run')
- def test_run_job_local_injects_pythonpath(mock_run, tmp_path):
- """本机 subprocess 必须带 PYTHONPATH=base_dir 让 python -m 能找到 dw_base 包。"""
- mock_run.return_value = _RC(0)
- run_job(
- ini_path=str(tmp_path / 'x.ini'),
- start_date='20260422', stop_date='20260423',
- worker_host='cdhmaster02', current_host='cdhmaster02',
- base_dir=str(tmp_path), python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- )
- call_kwargs = mock_run.call_args_list[0][1]
- assert 'env' in call_kwargs
- assert str(tmp_path) in call_kwargs['env']['PYTHONPATH']
- @patch('dw_base.datax.runner.subprocess.run')
- def test_run_job_gen_failure_raises(mock_run, tmp_path):
- mock_run.return_value = _RC(1)
- with pytest.raises(RuntimeError, match='生成 DataX json 失败'):
- run_job(
- ini_path=str(tmp_path / 'x.ini'),
- start_date='20260422', stop_date='20260423',
- worker_host='cdhmaster02', current_host='cdhmaster02',
- base_dir=str(tmp_path), python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- )
- @patch('dw_base.datax.runner.subprocess.run')
- def test_run_job_skip_datax_only_runs_gen(mock_run, tmp_path):
- mock_run.return_value = _RC(0)
- rc = run_job(
- ini_path=str(tmp_path / 'x.ini'),
- start_date='20260422', stop_date='20260423',
- worker_host='cdhmaster02', current_host='cdhmaster02',
- base_dir=str(tmp_path), python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- skip_datax=True,
- )
- assert rc == 0
- assert mock_run.call_count == 1
|