فهرست منبع

test(datax): path_utils + worker 单测(11 条)

path_utils 5 条(绝对/相对/无扩展 job_name + json/log 模板)
worker 6 条(ini 加载 + 三连回退各分支 + 加权池展开)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 1 هفته پیش
والد
کامیت
f133e17ce4
2فایلهای تغییر یافته به همراه101 افزوده شده و 0 حذف شده
  1. 35 0
      tests/unit/datax/test_path_utils.py
  2. 66 0
      tests/unit/datax/test_worker.py

+ 35 - 0
tests/unit/datax/test_path_utils.py

@@ -0,0 +1,35 @@
+# -*- coding:utf-8 -*-
+import os
+
+from dw_base.datax.path_utils import job_name_from_ini, json_output_path, log_path
+
+
+def test_job_name_from_ini_absolute():
+    assert job_name_from_ini(
+        '/home/bigdata/release/poyee-data-warehouse/tests/integration/datax/hive_import/app_user_cert_info.ini'
+    ) == 'app_user_cert_info'
+
+
+def test_job_name_from_ini_relative():
+    assert job_name_from_ini('jobs/raw/usr/xxx.ini') == 'xxx'
+
+
+def test_job_name_from_ini_no_ext():
+    assert job_name_from_ini('xxx') == 'xxx'
+
+
+def test_json_output_path_flat():
+    out = json_output_path(
+        '/opt/release/poyee-data-warehouse',
+        'tests/integration/datax/hive_import/app_user_cert_info.ini',
+    )
+    assert os.path.normpath(out) == os.path.normpath(
+        '/opt/release/poyee-data-warehouse/conf/datax-json/app_user_cert_info.json'
+    )
+
+
+def test_log_path_template():
+    out = log_path('/home/bigdata/log', 'datax', '20260422', 'app_user_cert_info')
+    assert os.path.normpath(out) == os.path.normpath(
+        '/home/bigdata/log/datax/20260422/app_user_cert_info.log'
+    )

+ 66 - 0
tests/unit/datax/test_worker.py

@@ -0,0 +1,66 @@
+# -*- coding:utf-8 -*-
+import random
+import textwrap
+from pathlib import Path
+
+import pytest
+
+from dw_base.datax.worker import load_workers_ini, select_worker
+
+
+@pytest.fixture
+def pool_ini(tmp_path: Path) -> str:
+    p = tmp_path / 'workers.ini'
+    p.write_text(textwrap.dedent('''\
+        [release]
+        host = cdhmaster02
+
+        [weights]
+        cdhmaster02 = 1
+        cdhnode01 = 3
+        cdhnode02 = 3
+        cdhnode03 = 3
+    '''))
+    return str(p)
+
+
+def test_load_workers_ini(pool_ini):
+    pool = load_workers_ini(pool_ini)
+    assert pool.release_host == 'cdhmaster02'
+    assert pool.weights == {'cdhmaster02': 1, 'cdhnode01': 3, 'cdhnode02': 3, 'cdhnode03': 3}
+    assert len(pool.queue) == 10
+    assert pool.queue.count('cdhnode01') == 3
+    assert pool.queue.count('cdhmaster02') == 1
+
+
+def test_select_worker_non_release_user_falls_back_local(pool_ini):
+    pool = load_workers_ini(pool_ini)
+    assert select_worker(pool, is_release_user=False, is_in_release_dir=True,
+                         current_host='dev-box', use_random=True) == 'dev-box'
+
+
+def test_select_worker_outside_release_dir_falls_back_local(pool_ini):
+    pool = load_workers_ini(pool_ini)
+    assert select_worker(pool, is_release_user=True, is_in_release_dir=False,
+                         current_host='cdhmaster02', use_random=True) == 'cdhmaster02'
+
+
+def test_select_worker_explicit_host_overrides_random(pool_ini):
+    pool = load_workers_ini(pool_ini)
+    assert select_worker(pool, is_release_user=True, is_in_release_dir=True,
+                         current_host='cdhmaster02',
+                         host='cdhnode02', use_random=True) == 'cdhnode02'
+
+
+def test_select_worker_random_pick_in_queue(pool_ini):
+    pool = load_workers_ini(pool_ini)
+    rand = random.Random(42)
+    pick = select_worker(pool, is_release_user=True, is_in_release_dir=True,
+                         current_host='cdhmaster02', use_random=True, rand=rand)
+    assert pick in pool.queue
+
+
+def test_select_worker_nothing_specified_falls_back_local(pool_ini):
+    pool = load_workers_ini(pool_ini)
+    assert select_worker(pool, is_release_user=True, is_in_release_dir=True,
+                         current_host='cdhmaster02') == 'cdhmaster02'