ソースを参照

test(datax): partition + runner 单测(11 条)

partition 7 条(dt 计算 / 月边界 / 分区 ini / 非分区 / 缺 writer.path
/ dt 与 stop-1 对齐 / execute_ddls 空列表 no-op)
runner 4 条(本机 / 远端 ssh / gen 失败抛错 / skip_datax 只生成)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 1 週間 前
コミット
8180983c7f
2 ファイル変更138 行追加0 行削除
  1. 65 0
      tests/unit/datax/test_partition.py
  2. 73 0
      tests/unit/datax/test_runner.py

+ 65 - 0
tests/unit/datax/test_partition.py

@@ -0,0 +1,65 @@
+# -*- coding:utf-8 -*-
+import textwrap
+from pathlib import Path
+from unittest.mock import patch
+
+from dw_base.datax.partition import compute_partition_dt, execute_ddls, parse_ini_partition
+
+
+def _write_ini(tmp_path: Path, content: str) -> str:
+    p = tmp_path / 'x.ini'
+    p.write_text(textwrap.dedent(content))
+    return str(p)
+
+
+def test_compute_partition_dt_stop_minus_1():
+    assert compute_partition_dt('20260423') == '20260422'
+
+
+def test_compute_partition_dt_month_boundary():
+    assert compute_partition_dt('20260401') == '20260331'
+
+
+def test_parse_partitioned_writer(tmp_path):
+    ini = _write_ini(tmp_path, '''\
+        [writer]
+        dataSource = hdfs/prd-ha
+        path = /user/hive/warehouse/test.db/raw_usr_app_user_cert_info_inc_d/dt=${dt}/
+    ''')
+    ddl = parse_ini_partition(ini, stop_date='20260423')
+    assert ddl == ('ALTER TABLE test.raw_usr_app_user_cert_info_inc_d '
+                   'ADD IF NOT EXISTS PARTITION(dt=20260422);')
+
+
+def test_parse_non_partitioned(tmp_path):
+    ini = _write_ini(tmp_path, '''\
+        [writer]
+        dataSource = hdfs/xx
+        path = /user/hive/warehouse/test.db/non_partitioned/
+    ''')
+    assert parse_ini_partition(ini, stop_date='20260423') is None
+
+
+def test_parse_no_writer_path(tmp_path):
+    ini = _write_ini(tmp_path, '''\
+        [writer]
+        dataSource = mongo/xx
+    ''')
+    assert parse_ini_partition(ini, stop_date='20260423') is None
+
+
+def test_parse_dt_aligns_with_stop_minus_1_for_multiday(tmp_path):
+    # 多日范围 start=20260401 stop=20260410:dt 应 = stop-1 = 20260409
+    # 避免老脚本 START_DATE=dt 假设引入的多日范围分区错位
+    ini = _write_ini(tmp_path, '''\
+        [writer]
+        path = /user/hive/warehouse/db1.db/t1/dt=${dt}/
+    ''')
+    ddl = parse_ini_partition(ini, stop_date='20260410')
+    assert 'PARTITION(dt=20260409)' in ddl
+
+
+@patch('dw_base.datax.partition.subprocess.run')
+def test_execute_ddls_empty_is_noop(mock_run):
+    execute_ddls([])
+    mock_run.assert_not_called()

+ 73 - 0
tests/unit/datax/test_runner.py

@@ -0,0 +1,73 @@
+# -*- coding:utf-8 -*-
+from unittest.mock import patch
+
+import pytest
+
+from dw_base.datax.runner import run_job
+
+
+class _RC:
+    def __init__(self, code): self.returncode = code
+
+
+@patch('dw_base.datax.runner.subprocess.run')
+def test_run_job_local_two_subprocess_calls(mock_run, tmp_path):
+    mock_run.return_value = _RC(0)
+    rc = run_job(
+        ini_path=str(tmp_path / 'x.ini'),
+        start_date='20260422', stop_date='20260423',
+        worker_host='cdhmaster02', current_host='cdhmaster02',
+        base_dir=str(tmp_path), python3_path='/usr/bin/python3',
+        datax_home='/opt/datax',
+    )
+    assert rc == 0
+    assert mock_run.call_count == 2  # 生成 json + 执行 datax.py
+    first_argv = mock_run.call_args_list[0][0][0]
+    assert first_argv[0] == '/usr/bin/python3'
+    assert 'datax-job-config-generator.py' in first_argv[2]
+
+
+@patch('dw_base.datax.runner.subprocess.run')
+def test_run_job_remote_uses_ssh(mock_run, tmp_path):
+    mock_run.return_value = _RC(0)
+    rc = run_job(
+        ini_path=str(tmp_path / 'x.ini'),
+        start_date='20260422', stop_date='20260423',
+        worker_host='cdhnode02', current_host='cdhmaster02',
+        base_dir=str(tmp_path), python3_path='/usr/bin/python3',
+        datax_home='/opt/datax',
+    )
+    assert rc == 0
+    first_argv = mock_run.call_args_list[0][0][0]
+    assert first_argv[0] == 'ssh'
+    assert first_argv[1] == 'cdhnode02'
+    # remote_cmd 是单字符串参数
+    assert 'datax-job-config-generator.py' in first_argv[2]
+
+
+@patch('dw_base.datax.runner.subprocess.run')
+def test_run_job_gen_failure_raises(mock_run, tmp_path):
+    mock_run.return_value = _RC(1)
+    with pytest.raises(RuntimeError, match='生成 DataX json 失败'):
+        run_job(
+            ini_path=str(tmp_path / 'x.ini'),
+            start_date='20260422', stop_date='20260423',
+            worker_host='cdhmaster02', current_host='cdhmaster02',
+            base_dir=str(tmp_path), python3_path='/usr/bin/python3',
+            datax_home='/opt/datax',
+        )
+
+
+@patch('dw_base.datax.runner.subprocess.run')
+def test_run_job_skip_datax_only_runs_gen(mock_run, tmp_path):
+    mock_run.return_value = _RC(0)
+    rc = run_job(
+        ini_path=str(tmp_path / 'x.ini'),
+        start_date='20260422', stop_date='20260423',
+        worker_host='cdhmaster02', current_host='cdhmaster02',
+        base_dir=str(tmp_path), python3_path='/usr/bin/python3',
+        datax_home='/opt/datax',
+        skip_datax=True,
+    )
+    assert rc == 0
+    assert mock_run.call_count == 1