| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- # -*- coding:utf-8 -*-
- from pathlib import Path
- import pytest
- from dw_base.datax.batch import expand_ini_inputs, run_batch
- def test_expand_ini_inputs_single_and_dir(tmp_path: Path):
- d = tmp_path / 'inis'
- d.mkdir()
- (d / 'a.ini').write_text('', encoding='utf-8')
- (d / 'b.ini').write_text('', encoding='utf-8')
- (d / 'c.txt').write_text('', encoding='utf-8') # 非 .ini 应被忽略
- extra = tmp_path / 'x.ini'
- extra.write_text('', encoding='utf-8')
- result = expand_ini_inputs([str(extra)], [str(d)])
- names = [Path(p).name for p in result]
- assert names == ['x.ini', 'a.ini', 'b.ini'] # 先 ini 参数、再目录扫描、排序
- def test_expand_ini_inputs_dedup(tmp_path: Path):
- d = tmp_path / 'inis'
- d.mkdir()
- (d / 'a.ini').write_text('', encoding='utf-8')
- # 同一 ini 既通过 -ini 又通过 -inis 传入,只保留一份
- result = expand_ini_inputs([str(d / 'a.ini')], [str(d)])
- assert len(result) == 1
- def test_expand_ini_inputs_dir_not_exist_raises(tmp_path: Path):
- with pytest.raises(ValueError, match='ini 目录不存在'):
- expand_ini_inputs([], [str(tmp_path / 'nonexistent')])
- def test_run_batch_serial_success_and_fail_counts():
- calls = []
- def run_one(ini):
- calls.append(ini)
- return 0 if 'good' in ini else 1
- success, failed = run_batch(['good_1.ini', 'bad_1.ini', 'good_2.ini'], run_one, parallel=False)
- assert success == 2
- assert failed == 1
- assert calls == ['good_1.ini', 'bad_1.ini', 'good_2.ini'] # 串行顺序
- def test_run_batch_empty_inis():
- assert run_batch([], lambda p: 0) == (0, 0)
- def test_run_batch_parallel_counts():
- import threading
- lock = threading.Lock()
- seen = []
- def run_one(ini):
- with lock:
- seen.append(ini)
- return 0 if 'good' in ini else 1
- success, failed = run_batch(['good_1.ini', 'bad_1.ini', 'good_2.ini'],
- run_one, parallel=True, sleep_between=0)
- assert success == 2
- assert failed == 1
- assert set(seen) == {'good_1.ini', 'bad_1.ini', 'good_2.ini'} # 并行无顺序保证
- def test_run_batch_parallel_exception_counts_as_failure():
- def run_one(ini):
- raise RuntimeError('boom')
- success, failed = run_batch(['a.ini'], run_one, parallel=True, sleep_between=0)
- assert success == 0
- assert failed == 1
|