# -*- coding:utf-8 -*- """ DataxImport / DataxExport 门面类单测。 不跑真实 subprocess / hive / ssh,全部 mock;只验"流程串联"(调了哪些模块、顺序、参数)。 真实链路由 tests/integration/datax/hive_import/ 集成测试覆盖。 """ import textwrap from pathlib import Path from unittest.mock import patch import pytest from dw_base.datax.entry import DataxExport, DataxImport @pytest.fixture def fake_env(tmp_path: Path): """造一个假项目根 + workers.ini + 1 份冒烟 ini。""" base_dir = tmp_path / 'release' / 'proj' base_dir.mkdir(parents=True) workers_ini = tmp_path / 'workers.ini' workers_ini.write_text(textwrap.dedent('''\ [release] host = master1 [weights] master1 = 1 node1 = 3 '''), encoding='utf-8') ini_dir = base_dir / 'jobs' / 'raw' ini_dir.mkdir(parents=True) sample_ini = ini_dir / 'sample.ini' sample_ini.write_text(textwrap.dedent('''\ [reader] dataSource = postgresql/dev-x [writer] dataSource = hdfs/prd-ha path = /user/hive/warehouse/test.db/raw_sample/dt=${dt}/ '''), encoding='utf-8') return dict( base_dir=str(base_dir), workers_ini_path=str(workers_ini), release_user='bigdata', release_root_dir=str(tmp_path / 'release'), python3_path='/usr/bin/python3', datax_home='/opt/datax', log_root_dir=str(tmp_path / 'log'), sample_ini=str(sample_ini), ini_dir=str(ini_dir), ) @patch('dw_base.datax.entry.runner.run_job', return_value=0) @patch('dw_base.datax.entry.partition.execute_ddls') def test_import_calls_partition_then_runner(mock_exec_ddls, mock_run, fake_env): e = DataxImport( base_dir=fake_env['base_dir'], workers_ini_path=fake_env['workers_ini_path'], release_user=fake_env['release_user'], release_root_dir=fake_env['release_root_dir'], python3_path=fake_env['python3_path'], datax_home=fake_env['datax_home'], log_root_dir=fake_env['log_root_dir'], ) failed = e.run(inis=[fake_env['sample_ini']], inis_dirs=[], start_date='20260422', stop_date='20260423') assert failed == 0 assert mock_exec_ddls.called # 分区预建被调 ddls_called = mock_exec_ddls.call_args[0][0] assert any('PARTITION(dt=20260422)' in d for d in ddls_called) assert mock_run.called @patch('dw_base.datax.entry.runner.run_job', return_value=0) @patch('dw_base.datax.entry.partition.execute_ddls') def test_import_skip_partition_bypasses_ddl(mock_exec_ddls, mock_run, fake_env): e = DataxImport( base_dir=fake_env['base_dir'], workers_ini_path=fake_env['workers_ini_path'], release_user=fake_env['release_user'], release_root_dir=fake_env['release_root_dir'], python3_path=fake_env['python3_path'], datax_home=fake_env['datax_home'], log_root_dir=fake_env['log_root_dir'], ) e.run(inis=[fake_env['sample_ini']], inis_dirs=[], start_date='20260422', stop_date='20260423', skip_partition=True) assert not mock_exec_ddls.called assert mock_run.called @patch('dw_base.datax.entry.runner.run_job', return_value=1) @patch('dw_base.datax.entry.partition.execute_ddls') def test_import_returns_failure_count(mock_exec_ddls, mock_run, fake_env): e = DataxImport( base_dir=fake_env['base_dir'], workers_ini_path=fake_env['workers_ini_path'], release_user=fake_env['release_user'], release_root_dir=fake_env['release_root_dir'], python3_path=fake_env['python3_path'], datax_home=fake_env['datax_home'], log_root_dir=fake_env['log_root_dir'], ) failed = e.run(inis=[fake_env['sample_ini']], inis_dirs=[], start_date='20260422', stop_date='20260423') assert failed == 1 @patch('dw_base.datax.entry.runner.run_job', return_value=0) @patch('dw_base.datax.entry.partition.execute_ddls') def test_export_no_partition_phase(mock_exec_ddls, mock_run, fake_env): e = DataxExport( base_dir=fake_env['base_dir'], workers_ini_path=fake_env['workers_ini_path'], release_user=fake_env['release_user'], release_root_dir=fake_env['release_root_dir'], python3_path=fake_env['python3_path'], datax_home=fake_env['datax_home'], log_root_dir=fake_env['log_root_dir'], ) failed = e.run(inis=[fake_env['sample_ini']], inis_dirs=[], start_date='20260422', stop_date='20260423') assert failed == 0 assert not mock_exec_ddls.called assert mock_run.called @patch('dw_base.datax.entry.runner.run_job', return_value=0) @patch('dw_base.datax.entry.partition.execute_ddls') def test_import_expands_ini_dir(mock_exec_ddls, mock_run, fake_env): e = DataxImport( base_dir=fake_env['base_dir'], workers_ini_path=fake_env['workers_ini_path'], release_user=fake_env['release_user'], release_root_dir=fake_env['release_root_dir'], python3_path=fake_env['python3_path'], datax_home=fake_env['datax_home'], log_root_dir=fake_env['log_root_dir'], ) e.run(inis=[], inis_dirs=[fake_env['ini_dir']], start_date='20260422', stop_date='20260423') # 目录扫到 sample.ini,runner.run_job 被调 assert mock_run.called