# -*- coding:utf-8 -*- from configparser import ConfigParser import pytest from dw_base.datax.tuning import _parse_hhmm, load_tuning_conf, merge_speed def test_parse_hhmm_standard(): assert _parse_hhmm('07:50') == 750 assert _parse_hhmm('19:00') == 1900 assert _parse_hhmm('00:00') == 0 assert _parse_hhmm('23:59') == 2359 assert _parse_hhmm('00:05') == 5 def test_parse_hhmm_invalid(): with pytest.raises(ValueError, match='HH:MM'): _parse_hhmm('0750') with pytest.raises(ValueError, match='越界'): _parse_hhmm('24:00') with pytest.raises(ValueError, match='越界'): _parse_hhmm('07:60') with pytest.raises(ValueError, match='非数字'): _parse_hhmm('ab:cd') def test_load_tuning_conf(tmp_path): p = tmp_path / 'tuning.conf' p.write_text( '# 注释\n' 'key.a value1\n' '\n' 'key.b value with spaces\n' 'key.c\t42\n', encoding='utf-8' ) cfg = load_tuning_conf(str(p)) assert cfg['key.a'] == 'value1' assert cfg['key.b'] == 'value with spaces' assert cfg['key.c'] == '42' def _mk_l1(): return { 'speed.relaxed_period.start': '06:00', 'speed.relaxed_period.stop': '12:00', 'speed.strict.channel': '10', 'speed.strict.byte': '10485760', 'speed.strict.record': '40000', 'speed.relaxed.channel': '6', 'speed.relaxed.byte': '268435456', 'speed.relaxed.record': '100000', } def test_merge_speed_l1_relaxed_period(): """now=1000 在宽松区间 [600, 1200) 内,走 L1 relaxed 档。""" ini = ConfigParser() speed = merge_speed(_mk_l1(), ini, now_hhmm=1000) assert speed == {'channel': 6, 'byte': 268435456, 'record': 100000} def test_merge_speed_l1_strict_period(): """now=1300 在宽松区间外,走 L1 strict 档。""" ini = ConfigParser() speed = merge_speed(_mk_l1(), ini, now_hhmm=1300) assert speed == {'channel': 10, 'byte': 10485760, 'record': 40000} def test_merge_speed_boundary_relaxed_start_inclusive(): """now 恰 == relaxed_start(左闭)属宽松。""" ini = ConfigParser() speed = merge_speed(_mk_l1(), ini, now_hhmm=600) assert speed['channel'] == 6 # relaxed def test_merge_speed_boundary_relaxed_stop_exclusive(): """now 恰 == relaxed_stop(右开)属严格。""" ini = ConfigParser() speed = merge_speed(_mk_l1(), ini, now_hhmm=1200) assert speed['channel'] == 10 # strict def test_merge_speed_l2_ini_overrides_l1_per_field(): """L2 只写 channel,byte/record 回落 L1 当前时段。""" ini = ConfigParser() ini.add_section('speed') ini.set('speed', 'channel', '20') speed = merge_speed(_mk_l1(), ini, now_hhmm=1000) assert speed == {'channel': 20, 'byte': 268435456, 'record': 100000} def test_merge_speed_l3_cli_overrides_l2_l1(): """L3 CLI 覆盖 L2 ini;CLI 里 None 的字段不覆盖。""" ini = ConfigParser() ini.add_section('speed') ini.set('speed', 'channel', '20') cli = {'channel': 30, 'byte': None, 'record': 50000} speed = merge_speed(_mk_l1(), ini, cli_overrides=cli, now_hhmm=1000) # channel: L3=30 胜;byte: None 不覆盖,回落 L1 relaxed;record: L3=50000 assert speed == {'channel': 30, 'byte': 268435456, 'record': 50000} def test_merge_speed_l3_only_without_ini_section(): """ini 没 [speed] 段也能正常工作,L3 直接覆盖 L1。""" ini = ConfigParser() cli = {'channel': 15, 'byte': None, 'record': None} speed = merge_speed(_mk_l1(), ini, cli_overrides=cli, now_hhmm=1000) assert speed == {'channel': 15, 'byte': 268435456, 'record': 100000}