Ver Fonte

feat(datax): speed 三级注入(L1 conf/tuning + L2 ini [speed] + L3 CLI)

- 新 conf/datax-tuning.conf 承载 L1 默认:严格/宽松时段 + 时段边界(HH:MM)
- 新 dw_base/datax/tuning.py:load_tuning_conf + merge_speed + HH:MM 解析
- job_config_generator.assemble() 从硬编码改为 tuning.merge_speed(L1, ini, cli)
- cli.py gen-json 加 -channel/-byte/-record,透传 JobConfigGenerator
- runner / entry / bin 入口链路透传 speed_overrides(dict)
- 三级合并每 key 打印来源日志便于审计
- tests/unit/datax/test_tuning.py 加 10 条(HH:MM / load / merge 三层 / 边界)
- test_cli.py 加 2 条 CLI 透传测试
- 时段判断左闭右开 [start, stop)
- fetchSize / batchSize 维持现状(硬编码兜底 + ini 覆盖),不进本轮 scope

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu há 2 semanas atrás
pai
commit
07eb3e4ae1

+ 10 - 0
bin/datax-hdfs-export-starter.py

@@ -44,6 +44,13 @@ def main():
                         help='只生成 json 不执行 datax.py')
     parser.add_argument('-skip-check', action='store_true',
                         help='跳过 HDFS 源路径 check(默认开启 check,missing/empty 算失败;显式关闭后不检查直接交 datax)')
+    # L3 speed 覆盖(L1 conf/datax-tuning.conf < L2 ini [speed] 段 < L3 本参数)
+    parser.add_argument('-channel', type=int, default=None,
+                        help='L3 speed.channel 覆盖(不传则走 L2 ini / L1 conf)')
+    parser.add_argument('-byte', type=int, default=None,
+                        help='L3 speed.byte 覆盖(单位 bytes)')
+    parser.add_argument('-record', type=int, default=None,
+                        help='L3 speed.record 覆盖')
     args = parser.parse_args()
 
     # 默认日期:昨天 → 今天(对齐老 datax-single-job-starter.sh:207-219 行为)
@@ -66,6 +73,8 @@ def main():
         datax_home=os.environ['DATAX_HOME'],
         log_root_dir=os.environ['LOG_ROOT_DIR'],
     )
+    speed_overrides = {'channel': args.channel, 'byte': args.byte, 'record': args.record}
+
     failed = exporter.run(
         inis=args.ini,
         inis_dirs=args.inis,
@@ -76,6 +85,7 @@ def main():
         parallel=args.parallel,
         skip_datax=args.skip_datax,
         skip_check=args.skip_check,
+        speed_overrides=speed_overrides,
     )
     sys.exit(failed)
 

+ 11 - 0
bin/datax-hive-import-starter.py

@@ -73,6 +73,13 @@ def main():
     # -----------------------------------------------------------------------------
     parser.add_argument('-backfill', action='store_true',
                         help='【高级用法】存量回填:-start-date/-stop-date 作外层范围按日循环(DS 任务不加此 flag)')
+    # L3 speed 覆盖(L1 conf/datax-tuning.conf < L2 ini [speed] 段 < L3 本参数)
+    parser.add_argument('-channel', type=int, default=None,
+                        help='L3 speed.channel 覆盖(不传则走 L2 ini / L1 conf)')
+    parser.add_argument('-byte', type=int, default=None,
+                        help='L3 speed.byte 覆盖(单位 bytes)')
+    parser.add_argument('-record', type=int, default=None,
+                        help='L3 speed.record 覆盖')
     args = parser.parse_args()
 
     # 默认日期:昨天 → 今天(对齐老 datax-single-job-starter.sh:207-219 行为)
@@ -87,6 +94,8 @@ def main():
     print('  start_date={s} stop_date={e} backfill={b}'.format(
         s=args.start_date, e=args.stop_date, b=args.backfill))
 
+    speed_overrides = {'channel': args.channel, 'byte': args.byte, 'record': args.record}
+
     importer = DataxImport(
         base_dir=project_root_dir,
         workers_ini_path=os.path.join(project_root_dir, 'conf', 'workers.ini'),
@@ -109,6 +118,7 @@ def main():
             skip_partition=args.skip_partition,
             skip_datax=args.skip_datax,
             skip_check=args.skip_check,
+            speed_overrides=speed_overrides,
             extra_partition_tables=args.extra_partition_tables,
         )
         sys.exit(total_failed)
@@ -124,6 +134,7 @@ def main():
         skip_partition=args.skip_partition,
         skip_datax=args.skip_datax,
         skip_check=args.skip_check,
+        speed_overrides=speed_overrides,
         extra_partition_tables=args.extra_partition_tables,
     )
     sys.exit(failed)

+ 19 - 0
conf/datax-tuning.conf

@@ -0,0 +1,19 @@
+# DataX 默认参数(L1 全局,优先级最低)
+# 覆盖规则:L1 本文件 < L2 ini [speed] 段 < L3 CLI -channel/-byte/-record
+# 加载入口:dw_base/datax/job_config_generator.py 生成 json 前
+
+# ========== speed ==========
+# 严格时段:显式指定起止(HH:MM 24 小时制)
+# 此区间内走 strict 资源档(每通道吞吐低,保护业务 DB 从库)
+# 此区间外自动走 relaxed 档(每通道吞吐高)
+# 判断采用左闭右开 [start, stop);start 时分属严格,stop 时分属宽松
+speed.strict_period.start          07:50
+speed.strict_period.stop           19:00
+
+speed.strict.channel               10
+speed.strict.byte                  10485760
+speed.strict.record                40000
+
+speed.relaxed.channel              6
+speed.relaxed.byte                 268435456
+speed.relaxed.record               100000

+ 17 - 2
dw_base/datax/cli.py

@@ -4,10 +4,13 @@ DataX Python 层 CLI 入口(供 runner 本机调用 + 远端 ssh 调用统一
 
 当前子命令:
   gen-json <ini> -start-date <yyyymmdd> -stop-date <yyyymmdd>
+                 [-channel N] [-byte N] [-record N]
     读 ini 调 JobConfigGenerator 生成 json 到 conf/datax-json/{job_name}.json
+    -channel/-byte/-record 是 L3 CLI speed 覆盖(L1 conf < L2 ini [speed] < L3 CLI)
 
 用法:
-  python3 -m dw_base.datax.cli gen-json <ini> -start-date 20260422 -stop-date 20260423
+  python3 -m dw_base.datax.cli gen-json <ini> -start-date 20260422 -stop-date 20260423 \
+    -channel 20 -byte 20971520 -record 50000
 """
 import argparse
 import os
@@ -20,7 +23,13 @@ def _cmd_gen_json(args):
     base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
     out = json_output_path(base_dir, args.ini)
     os.makedirs(os.path.dirname(out), exist_ok=True)
-    gen = JobConfigGenerator(base_dir, args.ini, args.start_date, args.stop_date, out)
+    cli_speed_overrides = {
+        'channel': args.channel,
+        'byte': args.byte,
+        'record': args.record,
+    }
+    gen = JobConfigGenerator(base_dir, args.ini, args.start_date, args.stop_date, out,
+                             cli_speed_overrides=cli_speed_overrides)
     gen.run()
     print('生成 DataX json: {out}'.format(out=out))
 
@@ -34,6 +43,12 @@ def main(argv=None):
     g.add_argument('ini', help='DataX ini 路径')
     g.add_argument('-start-date', required=True, dest='start_date', metavar='YYYYMMDD')
     g.add_argument('-stop-date', required=True, dest='stop_date', metavar='YYYYMMDD')
+    g.add_argument('-channel', type=int, default=None,
+                   help='L3 speed.channel 覆盖(不传则走 L2 ini / L1 conf)')
+    g.add_argument('-byte', type=int, default=None,
+                   help='L3 speed.byte 覆盖(单位 bytes)')
+    g.add_argument('-record', type=int, default=None,
+                   help='L3 speed.record 覆盖')
     g.set_defaults(func=_cmd_gen_json)
 
     args = parser.parse_args(argv)

+ 13 - 4
dw_base/datax/entry.py

@@ -66,7 +66,8 @@ class _BaseDatax:
                       use_random: bool,
                       parallel: bool,
                       skip_datax: bool,
-                      skip_check: bool):
+                      skip_check: bool,
+                      speed_overrides: Optional[dict] = None):
         is_rel_user = _is_release_user(self.release_user)
         is_in_rel_dir = _is_in_release_dir(self.base_dir, self.release_root_dir, self.project_name)
 
@@ -94,6 +95,7 @@ class _BaseDatax:
                         datax_home=self.datax_home,
                         skip_datax=skip_datax,
                         skip_check=skip_check,
+                        speed_overrides=speed_overrides,
                         stdout=fh, stderr=fh,
                     )
                 print('[datax] ini={j} done rc={rc}'.format(j=job_name, rc=rc))
@@ -106,6 +108,7 @@ class _BaseDatax:
                     datax_home=self.datax_home,
                     skip_datax=skip_datax,
                     skip_check=skip_check,
+                    speed_overrides=speed_overrides,
                     tee_to=fh,
                 )
             print('[datax] ini={j} done rc={rc}'.format(j=job_name, rc=rc))
@@ -130,6 +133,7 @@ class DataxImport(_BaseDatax):
             skip_partition: bool = False,
             skip_datax: bool = False,
             skip_check: bool = False,
+            speed_overrides: Optional[dict] = None,
             extra_partition_tables: Optional[List[str]] = None) -> int:
         """
         Returns: 失败任务数(0 = 全部成功)
@@ -153,7 +157,8 @@ class DataxImport(_BaseDatax):
                         tbl=tbl, dt=dt))
             partition.execute_ddls(ddls)
 
-        run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax, skip_check)
+        run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax, skip_check,
+                                     speed_overrides)
         _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
         return failed
 
@@ -168,6 +173,7 @@ class DataxImport(_BaseDatax):
                  skip_partition: bool = False,
                  skip_datax: bool = False,
                  skip_check: bool = False,
+                 speed_overrides: Optional[dict] = None,
                  extra_partition_tables: Optional[List[str]] = None) -> int:
         """
         存量回填:start_date/stop_date 作外层范围 [含, 不含),按日循环调 self.run() 单日语义。
@@ -198,6 +204,7 @@ class DataxImport(_BaseDatax):
                 skip_partition=skip_partition,
                 skip_datax=skip_datax,
                 skip_check=skip_check,
+                speed_overrides=speed_overrides,
                 extra_partition_tables=extra_partition_tables,
             )
             if failed > 0:
@@ -227,7 +234,8 @@ class DataxExport(_BaseDatax):
             use_random: bool = False,
             parallel: bool = False,
             skip_datax: bool = False,
-            skip_check: bool = False) -> int:
+            skip_check: bool = False,
+            speed_overrides: Optional[dict] = None) -> int:
         """
         Returns: 失败任务数(0 = 全部成功)
         """
@@ -236,6 +244,7 @@ class DataxExport(_BaseDatax):
         ini_list = batch.expand_ini_inputs(resolved_inis, resolved_dirs)
         if not ini_list:
             return 0
-        run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax, skip_check)
+        run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax, skip_check,
+                                     speed_overrides)
         _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
         return failed

+ 27 - 26
dw_base/datax/job_config_generator.py

@@ -1,34 +1,40 @@
 # -*- coding:utf-8 -*-
 
 import json
+import os
 from configparser import ConfigParser
+from typing import Dict, Optional
 
 from dw_base.datax.datax_constants import *
 from dw_base.datax.plugins.plugin_factory import PluginFactory
-from dw_base.utils import datetime_utils
+from dw_base.datax.tuning import load_tuning_conf, merge_speed
 from dw_base.utils.file_utils import delete_file, get_abs_path
 
 
 class JobConfigGenerator(object):
     """
-    生成DataX作业配置文件
+    生成 DataX 作业配置文件(json)。
+
+    speed 三参走三级合并:L1 conf/datax-tuning.conf < L2 ini [speed] 段 < L3 cli_speed_overrides。
+    合并后的 speed 写进 job.setting.speed + core.transport.channel.speed。
     """
 
-    def __init__(self, base_dir: str, generator_config: str, start_date: str, stop_date: str, output: str):
+    def __init__(self, base_dir: str, generator_config: str, start_date: str, stop_date: str, output: str,
+                 cli_speed_overrides: Optional[Dict[str, int]] = None):
         """
-        初始化
         Args:
             base_dir: 项目目录
-            generator_config: DataX作业配置生成器配置文件,格式为ini文件,包含两部分reader和writer,两部分都包含dataSource配置
-            start_date: 内参,开始日期
-            stop_date: 内参,结束日期
-            output: 结果(DataX作业配置文件)输出文件路径
+            generator_config: DataX 作业配置生成器配置文件路径(.ini)
+            start_date / stop_date: 内部日期
+            output: 生成的 DataX json 输出路径
+            cli_speed_overrides: L3 CLI 覆盖,形如 {'channel': 20, 'byte': None, 'record': None}
         """
         self.generator_config = get_abs_path(generator_config)
         self.base_dir = base_dir
         self.start_date = start_date
         self.stop_date = stop_date
         self.output = output
+        self.cli_speed_overrides = cli_speed_overrides or {}
         self.config_parser = ConfigParser()
         self.config_parser.read(self.generator_config)
 
@@ -40,31 +46,26 @@ class JobConfigGenerator(object):
         writer = PluginFactory.get_plugin('writer', self.base_dir, self.config_parser, self.start_date, self.stop_date)
         return writer.configure()
 
-    @staticmethod
-    def get_speed(channel=6, byte=268435456, record=100000):
-        return {JOB_SETTING_SPEED_CHANNEL: channel, JOB_SETTING_SPEED_BYTE: byte, JOB_SETTING_SPEED_RECORD: record}
-
-    @staticmethod
-    def get_core_speed(byte=268435456, record=100000):
-        return {
+    def assemble(self):
+        # speed 三级合并(L1 conf < L2 ini [speed] < L3 CLI)
+        tuning_conf_path = os.path.join(self.base_dir, 'conf', 'datax-tuning.conf')
+        l1 = load_tuning_conf(tuning_conf_path)
+        merged = merge_speed(l1, self.config_parser, self.cli_speed_overrides)
+        speed = {
+            JOB_SETTING_SPEED_CHANNEL: merged['channel'],
+            JOB_SETTING_SPEED_BYTE: merged['byte'],
+            JOB_SETTING_SPEED_RECORD: merged['record'],
+        }
+        core_speed = {
             'transport': {
                 'channel': {
                     'speed': {
-                        'byte': byte,
-                        'record': record
+                        'byte': merged['byte'],
+                        'record': merged['record'],
                     }
                 }
             }
         }
-
-    def assemble(self):
-        local_time = int(datetime_utils.formatted_now('%H%M'))
-        if 750 < local_time < 1900:
-            speed = self.get_speed(10, byte=10485760, record=40000)
-            core_speed = self.get_core_speed(byte=10485760, record=40000)
-        else:
-            speed = self.get_speed()
-            core_speed = self.get_core_speed()
         job_config_json = {
             'job': {
                 'content': [

+ 7 - 0
dw_base/datax/runner.py

@@ -75,6 +75,7 @@ def run_job(ini_path: str,
             datax_home: str,
             skip_datax: bool = False,
             skip_check: bool = False,
+            speed_overrides: dict = None,
             stdout=None,
             stderr=None,
             tee_to=None) -> int:
@@ -83,6 +84,7 @@ def run_job(ini_path: str,
 
     skip_check=False(默认):hdfs reader 跑源路径存在性 check,missing/empty → return 1 失败
     skip_check=True:跳过整段 check,直接生成 json + 跑 datax(对齐老 silent skip 行为)
+    speed_overrides: L3 CLI 透传到 gen-json,形如 {'channel': 20, 'byte': None, 'record': 50000}
     """
     # 1. HDFS 源路径 check(默认开启;--skip-check 显式关闭后走老 silent skip 行为)
     if not skip_check:
@@ -104,6 +106,11 @@ def run_job(ini_path: str,
         '-start-date', start_date,
         '-stop-date', stop_date,
     ]
+    # L3 CLI speed 透传(None 不传)
+    if speed_overrides:
+        for key in ('channel', 'byte', 'record'):
+            if speed_overrides.get(key) is not None:
+                gen_argv.extend(['-' + key, str(speed_overrides[key])])
     print('[datax] {ini} 开始生成 json @ worker={w}'.format(ini=ini_path, w=worker_host))
     gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host, base_dir,
                                   stdout=stdout, stderr=stderr, tee_to=tee_to)

+ 115 - 0
dw_base/datax/tuning.py

@@ -0,0 +1,115 @@
+# -*- coding:utf-8 -*-
+"""
+DataX 运行时调优参数三级合并。
+
+三级:
+- L1 conf/datax-tuning.conf:项目级默认(严格/宽松时段 + speed 三参)
+- L2 ini [speed] 段:单 ini 级覆盖
+- L3 CLI -channel/-byte/-record:运行时级覆盖
+
+优先级:L1 < L2 < L3
+
+合并发生在 JobConfigGenerator.assemble() 里,结果写进生成的 json。
+ini / conf 文件本身不改。
+"""
+import time
+from configparser import ConfigParser
+from typing import Dict, Optional
+
+
+def _parse_hhmm(s: str) -> int:
+    """
+    'HH:MM' → HHMM 整数,和 time.strftime('%H%M') 后 int() 的比较语义一致。
+    '07:50' → 750;'19:00' → 1900;'00:05' → 5
+    """
+    s = s.strip()
+    parts = s.split(':')
+    if len(parts) != 2:
+        raise ValueError('时段配置不是 HH:MM 格式: ' + s)
+    try:
+        h, m = int(parts[0]), int(parts[1])
+    except ValueError:
+        raise ValueError('时段配置非数字: ' + s)
+    if not (0 <= h < 24 and 0 <= m < 60):
+        raise ValueError('时段越界: ' + s)
+    return h * 100 + m
+
+
+def load_tuning_conf(path: str) -> Dict[str, str]:
+    """
+    读 conf/datax-tuning.conf,返回 {key: value_str} 扁平 dict。
+    格式:`key value`,# 注释行,空行忽略(对齐 conf/spark-tuning.conf 风格)。
+    """
+    conf = {}
+    with open(path, 'r', encoding='utf-8') as fh:
+        for line in fh:
+            line = line.strip()
+            if not line or line.startswith('#'):
+                continue
+            parts = line.split(None, 1)
+            if len(parts) != 2:
+                continue
+            key, value = parts
+            conf[key.strip()] = value.strip()
+    return conf
+
+
+def merge_speed(
+    l1: Dict[str, str],
+    ini_cp: ConfigParser,
+    cli_overrides: Optional[Dict[str, int]] = None,
+    now_hhmm: Optional[int] = None,
+) -> Dict[str, int]:
+    """
+    合并 speed 三参(channel/byte/record):L1 < L2 ini [speed] < L3 CLI。
+
+    时段分档仅在 L1 层生效:L1 按 now 属严格/宽松选档;一旦 L2/L3 指定某字段,
+    直接覆盖,不再看时段。逐字段 merge,非整段替换。
+
+    Args:
+        l1: load_tuning_conf 返回的扁平 dict
+        ini_cp: DataX ini 的 ConfigParser(已 read)
+        cli_overrides: L3 CLI 传入,形如 {'channel': 20, 'byte': None, 'record': None}
+                       value 为 None 表示 CLI 未传该项,不覆盖
+        now_hhmm: 测试注入用;默认当前系统时刻 HHMM 整数
+
+    Returns:
+        {'channel': int, 'byte': int, 'record': int}
+    """
+    if now_hhmm is None:
+        now_hhmm = int(time.strftime('%H%M'))
+
+    # 1) L1:按时段选档
+    strict_start = _parse_hhmm(l1['speed.strict_period.start'])
+    strict_stop = _parse_hhmm(l1['speed.strict_period.stop'])
+    if strict_start <= now_hhmm < strict_stop:
+        bucket = 'strict'
+    else:
+        bucket = 'relaxed'
+    print('[tuning] 当前时刻 {n:04d} → {b} 时段(严格 [{s:04d}, {e:04d}))'.format(
+        n=now_hhmm, b=bucket, s=strict_start, e=strict_stop))
+
+    speed = {}
+    for key in ('channel', 'byte', 'record'):
+        l1_key = 'speed.{b}.{k}'.format(b=bucket, k=key)
+        speed[key] = int(l1[l1_key])
+        print('[tuning] L1 speed.{k} => {v}'.format(k=key, v=speed[key]))
+
+    # 2) L2 ini [speed] 段
+    if ini_cp.has_section('speed'):
+        for key in ('channel', 'byte', 'record'):
+            if ini_cp.has_option('speed', key):
+                raw = ini_cp.get('speed', key).strip()
+                if raw:
+                    speed[key] = int(raw)
+                    print('[tuning] L2 ini [speed] {k} => {v} 覆盖 L1'.format(k=key, v=speed[key]))
+
+    # 3) L3 CLI
+    if cli_overrides:
+        for key in ('channel', 'byte', 'record'):
+            if cli_overrides.get(key) is not None:
+                speed[key] = int(cli_overrides[key])
+                print('[tuning] L3 CLI -{k} => {v} 覆盖 L2'.format(k=key, v=speed[key]))
+
+    print('[tuning] 最终 speed = {s}'.format(s=speed))
+    return speed

+ 31 - 0
tests/unit/datax/test_cli.py

@@ -31,3 +31,34 @@ def test_gen_json_missing_args_exits():
 def test_unknown_subcommand_exits():
     with pytest.raises(SystemExit):
         cli.main(['bogus-cmd'])
+
+
+@patch('dw_base.datax.cli.JobConfigGenerator')
+def test_gen_json_speed_cli_args_flow_through(MockGen):
+    """L3 CLI -channel/-byte/-record 传给 JobConfigGenerator cli_speed_overrides kwarg。"""
+    mock_inst = MagicMock()
+    MockGen.return_value = mock_inst
+    cli.main(['gen-json', 'some.ini',
+              '-start-date', '20260422',
+              '-stop-date', '20260423',
+              '-channel', '20',
+              '-byte', '20971520',
+              '-record', '50000'])
+    kwargs = MockGen.call_args[1]
+    assert kwargs['cli_speed_overrides'] == {
+        'channel': 20, 'byte': 20971520, 'record': 50000,
+    }
+
+
+@patch('dw_base.datax.cli.JobConfigGenerator')
+def test_gen_json_speed_cli_default_none(MockGen):
+    """CLI 没传 speed args 时,cli_speed_overrides 里全是 None。"""
+    mock_inst = MagicMock()
+    MockGen.return_value = mock_inst
+    cli.main(['gen-json', 'some.ini',
+              '-start-date', '20260422',
+              '-stop-date', '20260423'])
+    kwargs = MockGen.call_args[1]
+    assert kwargs['cli_speed_overrides'] == {
+        'channel': None, 'byte': None, 'record': None,
+    }

+ 110 - 0
tests/unit/datax/test_tuning.py

@@ -0,0 +1,110 @@
+# -*- coding:utf-8 -*-
+from configparser import ConfigParser
+
+import pytest
+
+from dw_base.datax.tuning import _parse_hhmm, load_tuning_conf, merge_speed
+
+
+def test_parse_hhmm_standard():
+    assert _parse_hhmm('07:50') == 750
+    assert _parse_hhmm('19:00') == 1900
+    assert _parse_hhmm('00:00') == 0
+    assert _parse_hhmm('23:59') == 2359
+    assert _parse_hhmm('00:05') == 5
+
+
+def test_parse_hhmm_invalid():
+    with pytest.raises(ValueError, match='HH:MM'):
+        _parse_hhmm('0750')
+    with pytest.raises(ValueError, match='越界'):
+        _parse_hhmm('24:00')
+    with pytest.raises(ValueError, match='越界'):
+        _parse_hhmm('07:60')
+    with pytest.raises(ValueError, match='非数字'):
+        _parse_hhmm('ab:cd')
+
+
+def test_load_tuning_conf(tmp_path):
+    p = tmp_path / 'tuning.conf'
+    p.write_text(
+        '# 注释\n'
+        'key.a  value1\n'
+        '\n'
+        'key.b  value with spaces\n'
+        'key.c\t42\n',
+        encoding='utf-8'
+    )
+    cfg = load_tuning_conf(str(p))
+    assert cfg['key.a'] == 'value1'
+    assert cfg['key.b'] == 'value with spaces'
+    assert cfg['key.c'] == '42'
+
+
+def _mk_l1():
+    return {
+        'speed.strict_period.start': '07:50',
+        'speed.strict_period.stop': '19:00',
+        'speed.strict.channel': '10',
+        'speed.strict.byte': '10485760',
+        'speed.strict.record': '40000',
+        'speed.relaxed.channel': '6',
+        'speed.relaxed.byte': '268435456',
+        'speed.relaxed.record': '100000',
+    }
+
+
+def test_merge_speed_l1_strict_period():
+    """now=1000 属严格时段,走 L1 strict 档。"""
+    ini = ConfigParser()
+    speed = merge_speed(_mk_l1(), ini, now_hhmm=1000)
+    assert speed == {'channel': 10, 'byte': 10485760, 'record': 40000}
+
+
+def test_merge_speed_l1_relaxed_period():
+    """now=2000 属宽松时段,走 L1 relaxed 档。"""
+    ini = ConfigParser()
+    speed = merge_speed(_mk_l1(), ini, now_hhmm=2000)
+    assert speed == {'channel': 6, 'byte': 268435456, 'record': 100000}
+
+
+def test_merge_speed_boundary_strict_start_inclusive():
+    """now 恰 == strict_start(左闭)属严格。"""
+    ini = ConfigParser()
+    speed = merge_speed(_mk_l1(), ini, now_hhmm=750)
+    assert speed['channel'] == 10  # strict
+
+
+def test_merge_speed_boundary_strict_stop_exclusive():
+    """now 恰 == strict_stop(右开)属宽松。"""
+    ini = ConfigParser()
+    speed = merge_speed(_mk_l1(), ini, now_hhmm=1900)
+    assert speed['channel'] == 6  # relaxed
+
+
+def test_merge_speed_l2_ini_overrides_l1_per_field():
+    """L2 只写 channel,byte/record 回落 L1 当前时段。"""
+    ini = ConfigParser()
+    ini.add_section('speed')
+    ini.set('speed', 'channel', '20')
+    speed = merge_speed(_mk_l1(), ini, now_hhmm=1000)
+    assert speed == {'channel': 20, 'byte': 10485760, 'record': 40000}
+
+
+def test_merge_speed_l3_cli_overrides_l2_l1():
+    """L3 CLI 覆盖 L2 ini;CLI 里 None 的字段不覆盖。"""
+    ini = ConfigParser()
+    ini.add_section('speed')
+    ini.set('speed', 'channel', '20')
+    cli = {'channel': 30, 'byte': None, 'record': 50000}
+    speed = merge_speed(_mk_l1(), ini, cli_overrides=cli, now_hhmm=1000)
+    # channel: L3=30 胜;byte: None 不覆盖,回落 L1 strict;record: L3=50000
+    assert speed == {'channel': 30, 'byte': 10485760, 'record': 50000}
+
+
+def test_merge_speed_l3_only_without_ini_section():
+    """ini 没 [speed] 段也能正常工作,L3 直接覆盖 L1。"""
+    ini = ConfigParser()
+    cli = {'channel': 15, 'byte': None, 'record': None}
+    speed = merge_speed(_mk_l1(), ini, cli_overrides=cli, now_hhmm=1000)
+    assert speed == {'channel': 15, 'byte': 10485760, 'record': 40000}