Kaynağa Gözat

test(datax): batch + entry 单测(12 条)

batch 7(ini 展开 / 去重 / 目录不存在 / 串行计数 / 空 ini / 并行计数 / 异常算失败)
entry 5(import 调 partition → runner / skip_partition 跳 DDL / 失败计数传递
  / export 无分区阶段 / import 目录扫描展开)
整套 43 条单测本地全过

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 2 hafta önce
ebeveyn
işleme
a6f82b1976
2 değiştirilmiş dosya ile 226 ekleme ve 0 silme
  1. 79 0
      tests/unit/datax/test_batch.py
  2. 147 0
      tests/unit/datax/test_entry.py

+ 79 - 0
tests/unit/datax/test_batch.py

@@ -0,0 +1,79 @@
+# -*- coding:utf-8 -*-
+from pathlib import Path
+
+import pytest
+
+from dw_base.datax.batch import expand_ini_inputs, run_batch
+
+
+def test_expand_ini_inputs_single_and_dir(tmp_path: Path):
+    d = tmp_path / 'inis'
+    d.mkdir()
+    (d / 'a.ini').write_text('', encoding='utf-8')
+    (d / 'b.ini').write_text('', encoding='utf-8')
+    (d / 'c.txt').write_text('', encoding='utf-8')  # 非 .ini 应被忽略
+
+    extra = tmp_path / 'x.ini'
+    extra.write_text('', encoding='utf-8')
+
+    result = expand_ini_inputs([str(extra)], [str(d)])
+    names = [Path(p).name for p in result]
+    assert names == ['x.ini', 'a.ini', 'b.ini']  # 先 ini 参数、再目录扫描、排序
+
+
+def test_expand_ini_inputs_dedup(tmp_path: Path):
+    d = tmp_path / 'inis'
+    d.mkdir()
+    (d / 'a.ini').write_text('', encoding='utf-8')
+
+    # 同一 ini 既通过 -ini 又通过 -inis 传入,只保留一份
+    result = expand_ini_inputs([str(d / 'a.ini')], [str(d)])
+    assert len(result) == 1
+
+
+def test_expand_ini_inputs_dir_not_exist_raises(tmp_path: Path):
+    with pytest.raises(ValueError, match='ini 目录不存在'):
+        expand_ini_inputs([], [str(tmp_path / 'nonexistent')])
+
+
+def test_run_batch_serial_success_and_fail_counts():
+    calls = []
+
+    def run_one(ini):
+        calls.append(ini)
+        return 0 if 'good' in ini else 1
+
+    success, failed = run_batch(['good_1.ini', 'bad_1.ini', 'good_2.ini'], run_one, parallel=False)
+    assert success == 2
+    assert failed == 1
+    assert calls == ['good_1.ini', 'bad_1.ini', 'good_2.ini']  # 串行顺序
+
+
+def test_run_batch_empty_inis():
+    assert run_batch([], lambda p: 0) == (0, 0)
+
+
+def test_run_batch_parallel_counts():
+    import threading
+    lock = threading.Lock()
+    seen = []
+
+    def run_one(ini):
+        with lock:
+            seen.append(ini)
+        return 0 if 'good' in ini else 1
+
+    success, failed = run_batch(['good_1.ini', 'bad_1.ini', 'good_2.ini'],
+                                run_one, parallel=True, sleep_between=0)
+    assert success == 2
+    assert failed == 1
+    assert set(seen) == {'good_1.ini', 'bad_1.ini', 'good_2.ini'}  # 并行无顺序保证
+
+
+def test_run_batch_parallel_exception_counts_as_failure():
+    def run_one(ini):
+        raise RuntimeError('boom')
+
+    success, failed = run_batch(['a.ini'], run_one, parallel=True, sleep_between=0)
+    assert success == 0
+    assert failed == 1

+ 147 - 0
tests/unit/datax/test_entry.py

@@ -0,0 +1,147 @@
+# -*- coding:utf-8 -*-
+"""
+DataxImport / DataxExport 门面类单测。
+
+不跑真实 subprocess / hive / ssh,全部 mock;只验"流程串联"(调了哪些模块、顺序、参数)。
+真实链路由 tests/integration/datax/hive_import/ 集成测试覆盖。
+"""
+import textwrap
+from pathlib import Path
+from unittest.mock import patch
+
+import pytest
+
+from dw_base.datax.entry import DataxExport, DataxImport
+
+
+@pytest.fixture
+def fake_env(tmp_path: Path):
+    """造一个假项目根 + workers.ini + 1 份冒烟 ini。"""
+    base_dir = tmp_path / 'release' / 'proj'
+    base_dir.mkdir(parents=True)
+
+    workers_ini = tmp_path / 'workers.ini'
+    workers_ini.write_text(textwrap.dedent('''\
+        [release]
+        host = master1
+        [weights]
+        master1 = 1
+        node1 = 3
+    '''), encoding='utf-8')
+
+    ini_dir = base_dir / 'jobs' / 'raw'
+    ini_dir.mkdir(parents=True)
+    sample_ini = ini_dir / 'sample.ini'
+    sample_ini.write_text(textwrap.dedent('''\
+        [reader]
+        dataSource = postgresql/dev-x
+        [writer]
+        dataSource = hdfs/prd-ha
+        path = /user/hive/warehouse/test.db/raw_sample/dt=${dt}/
+    '''), encoding='utf-8')
+
+    return dict(
+        base_dir=str(base_dir),
+        workers_ini_path=str(workers_ini),
+        release_user='bigdata',
+        release_root_dir=str(tmp_path / 'release'),
+        python3_path='/usr/bin/python3',
+        datax_home='/opt/datax',
+        log_root_dir=str(tmp_path / 'log'),
+        sample_ini=str(sample_ini),
+        ini_dir=str(ini_dir),
+    )
+
+
+@patch('dw_base.datax.entry.runner.run_job', return_value=0)
+@patch('dw_base.datax.entry.partition.execute_ddls')
+def test_import_calls_partition_then_runner(mock_exec_ddls, mock_run, fake_env):
+    e = DataxImport(
+        base_dir=fake_env['base_dir'],
+        workers_ini_path=fake_env['workers_ini_path'],
+        release_user=fake_env['release_user'],
+        release_root_dir=fake_env['release_root_dir'],
+        python3_path=fake_env['python3_path'],
+        datax_home=fake_env['datax_home'],
+        log_root_dir=fake_env['log_root_dir'],
+    )
+    failed = e.run(inis=[fake_env['sample_ini']], inis_dirs=[],
+                   start_date='20260422', stop_date='20260423')
+    assert failed == 0
+    assert mock_exec_ddls.called  # 分区预建被调
+    ddls_called = mock_exec_ddls.call_args[0][0]
+    assert any('PARTITION(dt=20260422)' in d for d in ddls_called)
+    assert mock_run.called
+
+
+@patch('dw_base.datax.entry.runner.run_job', return_value=0)
+@patch('dw_base.datax.entry.partition.execute_ddls')
+def test_import_skip_partition_bypasses_ddl(mock_exec_ddls, mock_run, fake_env):
+    e = DataxImport(
+        base_dir=fake_env['base_dir'],
+        workers_ini_path=fake_env['workers_ini_path'],
+        release_user=fake_env['release_user'],
+        release_root_dir=fake_env['release_root_dir'],
+        python3_path=fake_env['python3_path'],
+        datax_home=fake_env['datax_home'],
+        log_root_dir=fake_env['log_root_dir'],
+    )
+    e.run(inis=[fake_env['sample_ini']], inis_dirs=[],
+          start_date='20260422', stop_date='20260423',
+          skip_partition=True)
+    assert not mock_exec_ddls.called
+    assert mock_run.called
+
+
+@patch('dw_base.datax.entry.runner.run_job', return_value=1)
+@patch('dw_base.datax.entry.partition.execute_ddls')
+def test_import_returns_failure_count(mock_exec_ddls, mock_run, fake_env):
+    e = DataxImport(
+        base_dir=fake_env['base_dir'],
+        workers_ini_path=fake_env['workers_ini_path'],
+        release_user=fake_env['release_user'],
+        release_root_dir=fake_env['release_root_dir'],
+        python3_path=fake_env['python3_path'],
+        datax_home=fake_env['datax_home'],
+        log_root_dir=fake_env['log_root_dir'],
+    )
+    failed = e.run(inis=[fake_env['sample_ini']], inis_dirs=[],
+                   start_date='20260422', stop_date='20260423')
+    assert failed == 1
+
+
+@patch('dw_base.datax.entry.runner.run_job', return_value=0)
+@patch('dw_base.datax.entry.partition.execute_ddls')
+def test_export_no_partition_phase(mock_exec_ddls, mock_run, fake_env):
+    e = DataxExport(
+        base_dir=fake_env['base_dir'],
+        workers_ini_path=fake_env['workers_ini_path'],
+        release_user=fake_env['release_user'],
+        release_root_dir=fake_env['release_root_dir'],
+        python3_path=fake_env['python3_path'],
+        datax_home=fake_env['datax_home'],
+        log_root_dir=fake_env['log_root_dir'],
+    )
+    failed = e.run(inis=[fake_env['sample_ini']], inis_dirs=[],
+                   start_date='20260422', stop_date='20260423')
+    assert failed == 0
+    assert not mock_exec_ddls.called
+    assert mock_run.called
+
+
+@patch('dw_base.datax.entry.runner.run_job', return_value=0)
+@patch('dw_base.datax.entry.partition.execute_ddls')
+def test_import_expands_ini_dir(mock_exec_ddls, mock_run, fake_env):
+    e = DataxImport(
+        base_dir=fake_env['base_dir'],
+        workers_ini_path=fake_env['workers_ini_path'],
+        release_user=fake_env['release_user'],
+        release_root_dir=fake_env['release_root_dir'],
+        python3_path=fake_env['python3_path'],
+        datax_home=fake_env['datax_home'],
+        log_root_dir=fake_env['log_root_dir'],
+    )
+    e.run(inis=[], inis_dirs=[fake_env['ini_dir']],
+          start_date='20260422', stop_date='20260423')
+    # 目录扫到 sample.ini,runner.run_job 被调
+    assert mock_run.called