| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- # -*- coding:utf-8 -*-
- from configparser import ConfigParser
- import pytest
- from dw_base.datax.tuning import _parse_hhmm, load_tuning_conf, merge_speed
- def test_parse_hhmm_standard():
- assert _parse_hhmm('07:50') == 750
- assert _parse_hhmm('19:00') == 1900
- assert _parse_hhmm('00:00') == 0
- assert _parse_hhmm('23:59') == 2359
- assert _parse_hhmm('00:05') == 5
- def test_parse_hhmm_invalid():
- with pytest.raises(ValueError, match='HH:MM'):
- _parse_hhmm('0750')
- with pytest.raises(ValueError, match='越界'):
- _parse_hhmm('24:00')
- with pytest.raises(ValueError, match='越界'):
- _parse_hhmm('07:60')
- with pytest.raises(ValueError, match='非数字'):
- _parse_hhmm('ab:cd')
- def test_load_tuning_conf(tmp_path):
- p = tmp_path / 'tuning.conf'
- p.write_text(
- '# 注释\n'
- 'key.a value1\n'
- '\n'
- 'key.b value with spaces\n'
- 'key.c\t42\n',
- encoding='utf-8'
- )
- cfg = load_tuning_conf(str(p))
- assert cfg['key.a'] == 'value1'
- assert cfg['key.b'] == 'value with spaces'
- assert cfg['key.c'] == '42'
- def _mk_l1():
- return {
- 'speed.relaxed_period.start': '06:00',
- 'speed.relaxed_period.stop': '12:00',
- 'speed.strict.channel': '10',
- 'speed.strict.byte': '10485760',
- 'speed.strict.record': '40000',
- 'speed.relaxed.channel': '6',
- 'speed.relaxed.byte': '268435456',
- 'speed.relaxed.record': '100000',
- }
- def test_merge_speed_l1_relaxed_period():
- """now=1000 在宽松区间 [600, 1200) 内,走 L1 relaxed 档。"""
- ini = ConfigParser()
- speed = merge_speed(_mk_l1(), ini, now_hhmm=1000)
- assert speed == {'channel': 6, 'byte': 268435456, 'record': 100000}
- def test_merge_speed_l1_strict_period():
- """now=1300 在宽松区间外,走 L1 strict 档。"""
- ini = ConfigParser()
- speed = merge_speed(_mk_l1(), ini, now_hhmm=1300)
- assert speed == {'channel': 10, 'byte': 10485760, 'record': 40000}
- def test_merge_speed_boundary_relaxed_start_inclusive():
- """now 恰 == relaxed_start(左闭)属宽松。"""
- ini = ConfigParser()
- speed = merge_speed(_mk_l1(), ini, now_hhmm=600)
- assert speed['channel'] == 6 # relaxed
- def test_merge_speed_boundary_relaxed_stop_exclusive():
- """now 恰 == relaxed_stop(右开)属严格。"""
- ini = ConfigParser()
- speed = merge_speed(_mk_l1(), ini, now_hhmm=1200)
- assert speed['channel'] == 10 # strict
- def test_merge_speed_l2_ini_overrides_l1_per_field():
- """L2 只写 channel,byte/record 回落 L1 当前时段。"""
- ini = ConfigParser()
- ini.add_section('speed')
- ini.set('speed', 'channel', '20')
- speed = merge_speed(_mk_l1(), ini, now_hhmm=1000)
- assert speed == {'channel': 20, 'byte': 268435456, 'record': 100000}
- def test_merge_speed_l3_cli_overrides_l2_l1():
- """L3 CLI 覆盖 L2 ini;CLI 里 None 的字段不覆盖。"""
- ini = ConfigParser()
- ini.add_section('speed')
- ini.set('speed', 'channel', '20')
- cli = {'channel': 30, 'byte': None, 'record': 50000}
- speed = merge_speed(_mk_l1(), ini, cli_overrides=cli, now_hhmm=1000)
- # channel: L3=30 胜;byte: None 不覆盖,回落 L1 relaxed;record: L3=50000
- assert speed == {'channel': 30, 'byte': 268435456, 'record': 50000}
- def test_merge_speed_l3_only_without_ini_section():
- """ini 没 [speed] 段也能正常工作,L3 直接覆盖 L1。"""
- ini = ConfigParser()
- cli = {'channel': 15, 'byte': None, 'record': None}
- speed = merge_speed(_mk_l1(), ini, cli_overrides=cli, now_hhmm=1000)
- assert speed == {'channel': 15, 'byte': 268435456, 'record': 100000}
|