| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- # -*- coding:utf-8 -*-
- """
- DataxImport / DataxExport 门面类单测。
- 不跑真实 subprocess / hive / ssh,全部 mock;只验"流程串联"(调了哪些模块、顺序、参数)。
- 真实链路由 tests/integration/datax/hive_import/ 集成测试覆盖。
- """
- import textwrap
- from pathlib import Path
- from unittest.mock import patch
- import pytest
- from dw_base.datax.entry import DataxExport, DataxImport
- @pytest.fixture
- def fake_env(tmp_path: Path):
- """造一个假项目根 + workers.ini + 1 份冒烟 ini。"""
- base_dir = tmp_path / 'release' / 'proj'
- base_dir.mkdir(parents=True)
- workers_ini = tmp_path / 'workers.ini'
- workers_ini.write_text(textwrap.dedent('''\
- [release]
- host = master1
- [weights]
- master1 = 1
- node1 = 3
- '''), encoding='utf-8')
- ini_dir = base_dir / 'jobs' / 'raw'
- ini_dir.mkdir(parents=True)
- sample_ini = ini_dir / 'sample.ini'
- sample_ini.write_text(textwrap.dedent('''\
- [reader]
- dataSource = postgresql/dev-x
- [writer]
- dataSource = hdfs/prd-ha
- path = /user/hive/warehouse/test.db/raw_sample/dt=${dt}/
- '''), encoding='utf-8')
- return dict(
- base_dir=str(base_dir),
- workers_ini_path=str(workers_ini),
- release_user='bigdata',
- release_root_dir=str(tmp_path / 'release'),
- python3_path='/usr/bin/python3',
- datax_home='/opt/datax',
- log_root_dir=str(tmp_path / 'log'),
- sample_ini=str(sample_ini),
- ini_dir=str(ini_dir),
- )
- @patch('dw_base.datax.entry.runner.run_job', return_value=0)
- @patch('dw_base.datax.entry.partition.execute_ddls')
- def test_import_calls_partition_then_runner(mock_exec_ddls, mock_run, fake_env):
- e = DataxImport(
- base_dir=fake_env['base_dir'],
- workers_ini_path=fake_env['workers_ini_path'],
- release_user=fake_env['release_user'],
- release_root_dir=fake_env['release_root_dir'],
- python3_path=fake_env['python3_path'],
- datax_home=fake_env['datax_home'],
- log_root_dir=fake_env['log_root_dir'],
- )
- failed = e.run(inis=[fake_env['sample_ini']], inis_dirs=[],
- start_date='20260422', stop_date='20260423')
- assert failed == 0
- assert mock_exec_ddls.called # 分区预建被调
- ddls_called = mock_exec_ddls.call_args[0][0]
- assert any('PARTITION(dt=20260422)' in d for d in ddls_called)
- assert mock_run.called
- @patch('dw_base.datax.entry.runner.run_job', return_value=0)
- @patch('dw_base.datax.entry.partition.execute_ddls')
- def test_import_skip_partition_bypasses_ddl(mock_exec_ddls, mock_run, fake_env):
- e = DataxImport(
- base_dir=fake_env['base_dir'],
- workers_ini_path=fake_env['workers_ini_path'],
- release_user=fake_env['release_user'],
- release_root_dir=fake_env['release_root_dir'],
- python3_path=fake_env['python3_path'],
- datax_home=fake_env['datax_home'],
- log_root_dir=fake_env['log_root_dir'],
- )
- e.run(inis=[fake_env['sample_ini']], inis_dirs=[],
- start_date='20260422', stop_date='20260423',
- skip_partition=True)
- assert not mock_exec_ddls.called
- assert mock_run.called
- @patch('dw_base.datax.entry.runner.run_job', return_value=1)
- @patch('dw_base.datax.entry.partition.execute_ddls')
- def test_import_returns_failure_count(mock_exec_ddls, mock_run, fake_env):
- e = DataxImport(
- base_dir=fake_env['base_dir'],
- workers_ini_path=fake_env['workers_ini_path'],
- release_user=fake_env['release_user'],
- release_root_dir=fake_env['release_root_dir'],
- python3_path=fake_env['python3_path'],
- datax_home=fake_env['datax_home'],
- log_root_dir=fake_env['log_root_dir'],
- )
- failed = e.run(inis=[fake_env['sample_ini']], inis_dirs=[],
- start_date='20260422', stop_date='20260423')
- assert failed == 1
- @patch('dw_base.datax.entry.runner.run_job', return_value=0)
- @patch('dw_base.datax.entry.partition.execute_ddls')
- def test_export_no_partition_phase(mock_exec_ddls, mock_run, fake_env):
- e = DataxExport(
- base_dir=fake_env['base_dir'],
- workers_ini_path=fake_env['workers_ini_path'],
- release_user=fake_env['release_user'],
- release_root_dir=fake_env['release_root_dir'],
- python3_path=fake_env['python3_path'],
- datax_home=fake_env['datax_home'],
- log_root_dir=fake_env['log_root_dir'],
- )
- failed = e.run(inis=[fake_env['sample_ini']], inis_dirs=[],
- start_date='20260422', stop_date='20260423')
- assert failed == 0
- assert not mock_exec_ddls.called
- assert mock_run.called
- @patch('dw_base.datax.entry.runner.run_job', return_value=0)
- @patch('dw_base.datax.entry.partition.execute_ddls')
- def test_import_expands_ini_dir(mock_exec_ddls, mock_run, fake_env):
- e = DataxImport(
- base_dir=fake_env['base_dir'],
- workers_ini_path=fake_env['workers_ini_path'],
- release_user=fake_env['release_user'],
- release_root_dir=fake_env['release_root_dir'],
- python3_path=fake_env['python3_path'],
- datax_home=fake_env['datax_home'],
- log_root_dir=fake_env['log_root_dir'],
- )
- e.run(inis=[], inis_dirs=[fake_env['ini_dir']],
- start_date='20260422', stop_date='20260423')
- # 目录扫到 sample.ini,runner.run_job 被调
- assert mock_run.called
- def _make_importer(fake_env):
- return DataxImport(
- base_dir=fake_env['base_dir'],
- workers_ini_path=fake_env['workers_ini_path'],
- release_user=fake_env['release_user'],
- release_root_dir=fake_env['release_root_dir'],
- python3_path=fake_env['python3_path'],
- datax_home=fake_env['datax_home'],
- log_root_dir=fake_env['log_root_dir'],
- )
- @patch('dw_base.datax.entry.runner.run_job', return_value=0)
- @patch('dw_base.datax.entry.partition.execute_ddls')
- def test_backfill_loops_days_inclusive_start_exclusive_stop(mock_exec_ddls, mock_run, fake_env):
- """backfill 按 [start, stop) 循环,3 天 → run_job 调 3 次,每次单日语义。"""
- e = _make_importer(fake_env)
- rc = e.backfill(
- inis=[fake_env['sample_ini']], inis_dirs=[],
- start_date='20260420', stop_date='20260423',
- )
- assert rc == 0
- # 3 天 × 1 ini = run_job 调 3 次
- assert mock_run.call_count == 3
- # 取每次调用的 start_date / stop_date 参数验证单日切片
- calls = [c.kwargs for c in mock_run.call_args_list]
- dates = [(c['start_date'], c['stop_date']) for c in calls]
- assert dates == [
- ('20260420', '20260421'),
- ('20260421', '20260422'),
- ('20260422', '20260423'),
- ]
- @patch('dw_base.datax.entry.runner.run_job', return_value=1) # 每次 run_job 都失败
- @patch('dw_base.datax.entry.partition.execute_ddls')
- def test_backfill_accumulates_failure_count(mock_exec_ddls, mock_run, fake_env):
- """3 天全失败 → 返回 3。"""
- e = _make_importer(fake_env)
- rc = e.backfill(
- inis=[fake_env['sample_ini']], inis_dirs=[],
- start_date='20260420', stop_date='20260423',
- )
- assert rc == 3
- @patch('dw_base.datax.entry.runner.run_job', return_value=0)
- @patch('dw_base.datax.entry.partition.execute_ddls')
- def test_backfill_invalid_range_raises(mock_exec_ddls, mock_run, fake_env):
- """stop_date <= start_date 抛 ValueError,不跑任何一天。"""
- e = _make_importer(fake_env)
- with pytest.raises(ValueError, match='stop_date 必须大于 start_date'):
- e.backfill(
- inis=[fake_env['sample_ini']], inis_dirs=[],
- start_date='20260423', stop_date='20260423',
- )
- assert mock_run.call_count == 0
|