# -*- coding:utf-8 -*- """ DataX 运行时调优参数三级合并。 三级: - L1 conf/datax-tuning.conf:项目级默认(严格/宽松时段 + speed 三参) - L2 ini [speed] 段:单 ini 级覆盖 - L3 CLI -channel/-byte/-record:运行时级覆盖 优先级:L1 < L2 < L3 合并发生在 JobConfigGenerator.assemble() 里,结果写进生成的 json。 ini / conf 文件本身不改。 """ import time from configparser import ConfigParser from typing import Dict, Optional def _parse_hhmm(s: str) -> int: """ 'HH:MM' → HHMM 整数,和 time.strftime('%H%M') 后 int() 的比较语义一致。 '07:50' → 750;'19:00' → 1900;'00:05' → 5 """ s = s.strip() parts = s.split(':') if len(parts) != 2: raise ValueError('时段配置不是 HH:MM 格式: ' + s) try: h, m = int(parts[0]), int(parts[1]) except ValueError: raise ValueError('时段配置非数字: ' + s) if not (0 <= h < 24 and 0 <= m < 60): raise ValueError('时段越界: ' + s) return h * 100 + m def load_tuning_conf(path: str) -> Dict[str, str]: """ 读 conf/datax-tuning.conf,返回 {key: value_str} 扁平 dict。 格式:`key value`,# 注释行,空行忽略(对齐 conf/spark-tuning.conf 风格)。 """ conf = {} with open(path, 'r', encoding='utf-8') as fh: for line in fh: line = line.strip() if not line or line.startswith('#'): continue parts = line.split(None, 1) if len(parts) != 2: continue key, value = parts conf[key.strip()] = value.strip() return conf def merge_speed( l1: Dict[str, str], ini_cp: ConfigParser, cli_overrides: Optional[Dict[str, int]] = None, now_hhmm: Optional[int] = None, ) -> Dict[str, int]: """ 合并 speed 三参(channel/byte/record):L1 < L2 ini [speed] < L3 CLI。 时段分档仅在 L1 层生效:L1 按 now 属严格/宽松选档;一旦 L2/L3 指定某字段, 直接覆盖,不再看时段。逐字段 merge,非整段替换。 Args: l1: load_tuning_conf 返回的扁平 dict ini_cp: DataX ini 的 ConfigParser(已 read) cli_overrides: L3 CLI 传入,形如 {'channel': 20, 'byte': None, 'record': None} value 为 None 表示 CLI 未传该项,不覆盖 now_hhmm: 测试注入用;默认当前系统时刻 HHMM 整数 Returns: {'channel': int, 'byte': int, 'record': int} """ if now_hhmm is None: now_hhmm = int(time.strftime('%H%M')) # 1) L1:按时段选档(以宽松时段为基准,区间内 relaxed、区间外 strict) relaxed_start = _parse_hhmm(l1['speed.relaxed_period.start']) relaxed_stop = _parse_hhmm(l1['speed.relaxed_period.stop']) if relaxed_start <= now_hhmm < relaxed_stop: bucket = 'relaxed' else: bucket = 'strict' print('[tuning] 当前时刻 {n:04d} → {b} 时段(宽松 [{s:04d}, {e:04d}))'.format( n=now_hhmm, b=bucket, s=relaxed_start, e=relaxed_stop)) speed = {} for key in ('channel', 'byte', 'record'): l1_key = 'speed.{b}.{k}'.format(b=bucket, k=key) speed[key] = int(l1[l1_key]) print('[tuning] L1 speed.{k} => {v}'.format(k=key, v=speed[key])) # 2) L2 ini [speed] 段 if ini_cp.has_section('speed'): for key in ('channel', 'byte', 'record'): if ini_cp.has_option('speed', key): raw = ini_cp.get('speed', key).strip() if raw: speed[key] = int(raw) print('[tuning] L2 ini [speed] {k} => {v} 覆盖 L1'.format(k=key, v=speed[key])) # 3) L3 CLI if cli_overrides: for key in ('channel', 'byte', 'record'): if cli_overrides.get(key) is not None: speed[key] = int(cli_overrides[key]) print('[tuning] L3 CLI -{k} => {v} 覆盖 L2'.format(k=key, v=speed[key])) print('[tuning] 最终 speed = {s}'.format(s=speed)) return speed