| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- # -*- coding:utf-8 -*-
- from unittest.mock import patch
- import pytest
- from dw_base.datax.runner import run_job
- class _RC:
- def __init__(self, code): self.returncode = code
- @patch('dw_base.datax.runner.subprocess.run')
- def test_run_job_local_two_subprocess_calls(mock_run, tmp_path):
- mock_run.return_value = _RC(0)
- rc = run_job(
- ini_path=str(tmp_path / 'x.ini'),
- start_date='20260422', stop_date='20260423',
- worker_host='cdhmaster02', current_host='cdhmaster02',
- base_dir=str(tmp_path), python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- )
- assert rc == 0
- assert mock_run.call_count == 2 # 生成 json + 执行 datax.py
- first_argv = mock_run.call_args_list[0][0][0]
- assert first_argv[0] == '/usr/bin/python3'
- assert 'dw_base.datax.cli' in first_argv
- assert 'gen-json' in first_argv
- @patch('dw_base.datax.runner.subprocess.run')
- def test_run_job_remote_uses_ssh(mock_run, tmp_path):
- mock_run.return_value = _RC(0)
- rc = run_job(
- ini_path=str(tmp_path / 'x.ini'),
- start_date='20260422', stop_date='20260423',
- worker_host='cdhnode02', current_host='cdhmaster02',
- base_dir=str(tmp_path), python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- )
- assert rc == 0
- first_argv = mock_run.call_args_list[0][0][0]
- assert first_argv[0] == 'ssh'
- assert first_argv[1] == 'cdhnode02'
- # remote_cmd 是单字符串参数,cd base_dir && <cmd>
- assert first_argv[2].startswith('cd ')
- assert 'dw_base.datax.cli' in first_argv[2]
- assert 'gen-json' in first_argv[2]
- @patch('dw_base.datax.runner.subprocess.run')
- def test_run_job_local_injects_pythonpath(mock_run, tmp_path):
- """本机 subprocess 必须带 PYTHONPATH=base_dir 让 python -m 能找到 dw_base 包。"""
- mock_run.return_value = _RC(0)
- run_job(
- ini_path=str(tmp_path / 'x.ini'),
- start_date='20260422', stop_date='20260423',
- worker_host='cdhmaster02', current_host='cdhmaster02',
- base_dir=str(tmp_path), python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- )
- call_kwargs = mock_run.call_args_list[0][1]
- assert 'env' in call_kwargs
- assert str(tmp_path) in call_kwargs['env']['PYTHONPATH']
- @patch('dw_base.datax.runner.subprocess.run')
- def test_run_job_gen_failure_raises(mock_run, tmp_path):
- mock_run.return_value = _RC(1)
- with pytest.raises(RuntimeError, match='生成 DataX json 失败'):
- run_job(
- ini_path=str(tmp_path / 'x.ini'),
- start_date='20260422', stop_date='20260423',
- worker_host='cdhmaster02', current_host='cdhmaster02',
- base_dir=str(tmp_path), python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- )
- @patch('dw_base.datax.runner.subprocess.run')
- def test_run_job_skip_datax_only_runs_gen(mock_run, tmp_path):
- mock_run.return_value = _RC(0)
- rc = run_job(
- ini_path=str(tmp_path / 'x.ini'),
- start_date='20260422', stop_date='20260423',
- worker_host='cdhmaster02', current_host='cdhmaster02',
- base_dir=str(tmp_path), python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- skip_datax=True,
- )
- assert rc == 0
- assert mock_run.call_count == 1
- import textwrap
- from dw_base.datax.runner import _hdfs_src_check
- def _ini_with_reader(tmp_path, reader_body):
- p = tmp_path / 'reader.ini'
- p.write_text(textwrap.dedent(reader_body), encoding='utf-8')
- return str(p)
- def test_hdfs_src_check_na_for_non_hdfs_reader(tmp_path):
- ini = _ini_with_reader(tmp_path, '''\
- [reader]
- dataSource = postgresql/dev-x
- path = /some/path
- ''')
- assert _hdfs_src_check(ini, '20260422') == 'n/a'
- def test_hdfs_src_check_na_when_no_reader(tmp_path):
- ini = tmp_path / 'empty.ini'
- ini.write_text('', encoding='utf-8')
- assert _hdfs_src_check(str(ini), '20260422') == 'n/a'
- @patch('dw_base.datax.runner.subprocess.run')
- def test_hdfs_src_check_missing(mock_run, tmp_path):
- # hadoop fs -test -e 返回非 0 → missing
- mock_run.return_value = _RC(1)
- ini = _ini_with_reader(tmp_path, '''\
- [reader]
- dataSource = hdfs/prd-ha
- path = /user/hive/warehouse/x/dt=${dt}/
- ''')
- assert _hdfs_src_check(ini, '20260422') == 'missing'
- @patch('dw_base.datax.runner.subprocess.run')
- def test_hdfs_src_check_empty_when_du_size_zero(mock_run, tmp_path):
- # 第一次调用 test -e → 0 成功,第二次调 du -s 返回 "0 .." → empty
- call_count = {'n': 0}
- def side_effect(*args, **kwargs):
- call_count['n'] += 1
- if call_count['n'] == 1:
- return _RC(0)
- return type('R', (), {'returncode': 0, 'stdout': b'0 0 /path\n', 'stderr': b''})()
- mock_run.side_effect = side_effect
- ini = _ini_with_reader(tmp_path, '''\
- [reader]
- dataSource = hdfs/prd-ha
- path = /user/hive/warehouse/x/dt=${dt}/
- ''')
- assert _hdfs_src_check(ini, '20260422') == 'empty'
- @patch('dw_base.datax.runner._hdfs_src_check', return_value='missing')
- @patch('dw_base.datax.runner.subprocess.run')
- def test_run_job_fails_when_hdfs_src_missing(mock_run, _mock_check, tmp_path):
- """默认 HDFS missing → rc=1(失败,不 silent skip)。"""
- rc = run_job(
- ini_path=str(tmp_path / 'x.ini'),
- start_date='20260422', stop_date='20260423',
- worker_host='cdhmaster02', current_host='cdhmaster02',
- base_dir=str(tmp_path), python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- )
- assert rc == 1
- # 直接失败 → 不调 gen / exec
- assert mock_run.call_count == 0
- @patch('dw_base.datax.runner._hdfs_src_check', return_value='empty')
- @patch('dw_base.datax.runner.subprocess.run')
- def test_run_job_fails_when_hdfs_src_empty(mock_run, _mock_check, tmp_path):
- """默认 HDFS empty → rc=1(失败)。"""
- mock_run.return_value = _RC(0)
- rc = run_job(
- ini_path=str(tmp_path / 'x.ini'),
- start_date='20260422', stop_date='20260423',
- worker_host='cdhmaster02', current_host='cdhmaster02',
- base_dir=str(tmp_path), python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- )
- assert rc == 1
- assert mock_run.call_count == 0
- @patch('dw_base.datax.runner._hdfs_src_check')
- @patch('dw_base.datax.runner.subprocess.run')
- def test_run_job_skip_check_bypasses_hdfs_check(mock_run, mock_check, tmp_path):
- """skip_check=True → 不调 _hdfs_src_check,直接 gen-json + 跑 datax。"""
- mock_run.return_value = _RC(0)
- rc = run_job(
- ini_path=str(tmp_path / 'x.ini'),
- start_date='20260422', stop_date='20260423',
- worker_host='cdhmaster02', current_host='cdhmaster02',
- base_dir=str(tmp_path), python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- skip_check=True,
- )
- assert rc == 0
- mock_check.assert_not_called()
- assert mock_run.call_count == 2 # gen + exec 都照跑
|