# -*- coding:utf-8 -*- from unittest.mock import patch import pytest from dw_base.datax.runner import run_job class _RC: def __init__(self, code): self.returncode = code @patch('dw_base.datax.runner.subprocess.run') def test_run_job_local_two_subprocess_calls(mock_run, tmp_path): mock_run.return_value = _RC(0) rc = run_job( ini_path=str(tmp_path / 'x.ini'), start_date='20260422', stop_date='20260423', worker_host='cdhmaster02', current_host='cdhmaster02', base_dir=str(tmp_path), python3_path='/usr/bin/python3', datax_home='/opt/datax', ) assert rc == 0 assert mock_run.call_count == 2 # 生成 json + 执行 datax.py first_argv = mock_run.call_args_list[0][0][0] assert first_argv[0] == '/usr/bin/python3' assert 'dw_base.datax.cli' in first_argv assert 'gen-json' in first_argv @patch('dw_base.datax.runner.subprocess.run') def test_run_job_remote_uses_ssh(mock_run, tmp_path): mock_run.return_value = _RC(0) rc = run_job( ini_path=str(tmp_path / 'x.ini'), start_date='20260422', stop_date='20260423', worker_host='cdhnode02', current_host='cdhmaster02', base_dir=str(tmp_path), python3_path='/usr/bin/python3', datax_home='/opt/datax', ) assert rc == 0 first_argv = mock_run.call_args_list[0][0][0] assert first_argv[0] == 'ssh' assert first_argv[1] == 'cdhnode02' # remote_cmd 是单字符串参数,cd base_dir && assert first_argv[2].startswith('cd ') assert 'dw_base.datax.cli' in first_argv[2] assert 'gen-json' in first_argv[2] @patch('dw_base.datax.runner.subprocess.run') def test_run_job_local_injects_pythonpath(mock_run, tmp_path): """本机 subprocess 必须带 PYTHONPATH=base_dir 让 python -m 能找到 dw_base 包。""" mock_run.return_value = _RC(0) run_job( ini_path=str(tmp_path / 'x.ini'), start_date='20260422', stop_date='20260423', worker_host='cdhmaster02', current_host='cdhmaster02', base_dir=str(tmp_path), python3_path='/usr/bin/python3', datax_home='/opt/datax', ) call_kwargs = mock_run.call_args_list[0][1] assert 'env' in call_kwargs assert str(tmp_path) in call_kwargs['env']['PYTHONPATH'] @patch('dw_base.datax.runner.subprocess.run') def test_run_job_gen_failure_raises(mock_run, tmp_path): mock_run.return_value = _RC(1) with pytest.raises(RuntimeError, match='生成 DataX json 失败'): run_job( ini_path=str(tmp_path / 'x.ini'), start_date='20260422', stop_date='20260423', worker_host='cdhmaster02', current_host='cdhmaster02', base_dir=str(tmp_path), python3_path='/usr/bin/python3', datax_home='/opt/datax', ) @patch('dw_base.datax.runner.subprocess.run') def test_run_job_skip_datax_only_runs_gen(mock_run, tmp_path): mock_run.return_value = _RC(0) rc = run_job( ini_path=str(tmp_path / 'x.ini'), start_date='20260422', stop_date='20260423', worker_host='cdhmaster02', current_host='cdhmaster02', base_dir=str(tmp_path), python3_path='/usr/bin/python3', datax_home='/opt/datax', skip_datax=True, ) assert rc == 0 assert mock_run.call_count == 1 import textwrap from dw_base.datax.runner import _hdfs_src_check def _ini_with_reader(tmp_path, reader_body): p = tmp_path / 'reader.ini' p.write_text(textwrap.dedent(reader_body), encoding='utf-8') return str(p) def test_hdfs_src_check_na_for_non_hdfs_reader(tmp_path): ini = _ini_with_reader(tmp_path, '''\ [reader] dataSource = postgresql/dev-x path = /some/path ''') assert _hdfs_src_check(ini, '20260422') == 'n/a' def test_hdfs_src_check_na_when_no_reader(tmp_path): ini = tmp_path / 'empty.ini' ini.write_text('', encoding='utf-8') assert _hdfs_src_check(str(ini), '20260422') == 'n/a' @patch('dw_base.datax.runner.subprocess.run') def test_hdfs_src_check_missing(mock_run, tmp_path): # hadoop fs -test -e 返回非 0 → missing mock_run.return_value = _RC(1) ini = _ini_with_reader(tmp_path, '''\ [reader] dataSource = hdfs/prd-ha path = /user/hive/warehouse/x/dt=${dt}/ ''') assert _hdfs_src_check(ini, '20260422') == 'missing' @patch('dw_base.datax.runner.subprocess.run') def test_hdfs_src_check_empty_when_du_size_zero(mock_run, tmp_path): # 第一次调用 test -e → 0 成功,第二次调 du -s 返回 "0 .." → empty call_count = {'n': 0} def side_effect(*args, **kwargs): call_count['n'] += 1 if call_count['n'] == 1: return _RC(0) return type('R', (), {'returncode': 0, 'stdout': b'0 0 /path\n', 'stderr': b''})() mock_run.side_effect = side_effect ini = _ini_with_reader(tmp_path, '''\ [reader] dataSource = hdfs/prd-ha path = /user/hive/warehouse/x/dt=${dt}/ ''') assert _hdfs_src_check(ini, '20260422') == 'empty' @patch('dw_base.datax.runner._hdfs_src_check', return_value='missing') @patch('dw_base.datax.runner.subprocess.run') def test_run_job_fails_when_hdfs_src_missing(mock_run, _mock_check, tmp_path): """默认 HDFS missing → rc=1(失败,不 silent skip)。""" rc = run_job( ini_path=str(tmp_path / 'x.ini'), start_date='20260422', stop_date='20260423', worker_host='cdhmaster02', current_host='cdhmaster02', base_dir=str(tmp_path), python3_path='/usr/bin/python3', datax_home='/opt/datax', ) assert rc == 1 # 直接失败 → 不调 gen / exec assert mock_run.call_count == 0 @patch('dw_base.datax.runner._hdfs_src_check', return_value='empty') @patch('dw_base.datax.runner.subprocess.run') def test_run_job_fails_when_hdfs_src_empty(mock_run, _mock_check, tmp_path): """默认 HDFS empty → rc=1(失败)。""" mock_run.return_value = _RC(0) rc = run_job( ini_path=str(tmp_path / 'x.ini'), start_date='20260422', stop_date='20260423', worker_host='cdhmaster02', current_host='cdhmaster02', base_dir=str(tmp_path), python3_path='/usr/bin/python3', datax_home='/opt/datax', ) assert rc == 1 assert mock_run.call_count == 0 @patch('dw_base.datax.runner._hdfs_src_check') @patch('dw_base.datax.runner.subprocess.run') def test_run_job_skip_check_bypasses_hdfs_check(mock_run, mock_check, tmp_path): """skip_check=True → 不调 _hdfs_src_check,直接 gen-json + 跑 datax。""" mock_run.return_value = _RC(0) rc = run_job( ini_path=str(tmp_path / 'x.ini'), start_date='20260422', stop_date='20260423', worker_host='cdhmaster02', current_host='cdhmaster02', base_dir=str(tmp_path), python3_path='/usr/bin/python3', datax_home='/opt/datax', skip_check=True, ) assert rc == 0 mock_check.assert_not_called() assert mock_run.call_count == 2 # gen + exec 都照跑