# -*- coding:utf-8 -*- from pathlib import Path import pytest from dw_base.datax.batch import expand_ini_inputs, run_batch def test_expand_ini_inputs_single_and_dir(tmp_path: Path): d = tmp_path / 'inis' d.mkdir() (d / 'a.ini').write_text('', encoding='utf-8') (d / 'b.ini').write_text('', encoding='utf-8') (d / 'c.txt').write_text('', encoding='utf-8') # 非 .ini 应被忽略 extra = tmp_path / 'x.ini' extra.write_text('', encoding='utf-8') result = expand_ini_inputs([str(extra)], [str(d)]) names = [Path(p).name for p in result] assert names == ['x.ini', 'a.ini', 'b.ini'] # 先 ini 参数、再目录扫描、排序 def test_expand_ini_inputs_dedup(tmp_path: Path): d = tmp_path / 'inis' d.mkdir() (d / 'a.ini').write_text('', encoding='utf-8') # 同一 ini 既通过 -ini 又通过 -inis 传入,只保留一份 result = expand_ini_inputs([str(d / 'a.ini')], [str(d)]) assert len(result) == 1 def test_expand_ini_inputs_dir_not_exist_raises(tmp_path: Path): with pytest.raises(ValueError, match='ini 目录不存在'): expand_ini_inputs([], [str(tmp_path / 'nonexistent')]) def test_run_batch_serial_success_and_fail_counts(): calls = [] def run_one(ini): calls.append(ini) return 0 if 'good' in ini else 1 success, failed = run_batch(['good_1.ini', 'bad_1.ini', 'good_2.ini'], run_one, parallel=False) assert success == 2 assert failed == 1 assert calls == ['good_1.ini', 'bad_1.ini', 'good_2.ini'] # 串行顺序 def test_run_batch_empty_inis(): assert run_batch([], lambda p: 0) == (0, 0) def test_run_batch_parallel_counts(): import threading lock = threading.Lock() seen = [] def run_one(ini): with lock: seen.append(ini) return 0 if 'good' in ini else 1 success, failed = run_batch(['good_1.ini', 'bad_1.ini', 'good_2.ini'], run_one, parallel=True, sleep_between=0) assert success == 2 assert failed == 1 assert set(seen) == {'good_1.ini', 'bad_1.ini', 'good_2.ini'} # 并行无顺序保证 def test_run_batch_parallel_exception_counts_as_failure(): def run_one(ini): raise RuntimeError('boom') success, failed = run_batch(['a.ini'], run_one, parallel=True, sleep_between=0) assert success == 0 assert failed == 1