| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- # -*- coding:utf-8 -*-
- """
- DataX 运行时调优参数三级合并。
- 三级:
- - L1 conf/datax-tuning.conf:项目级默认(严格/宽松时段 + speed 三参)
- - L2 ini [speed] 段:单 ini 级覆盖
- - L3 CLI -channel/-byte/-record:运行时级覆盖
- 优先级:L1 < L2 < L3
- 合并发生在 JobConfigGenerator.assemble() 里,结果写进生成的 json。
- ini / conf 文件本身不改。
- """
- import time
- from configparser import ConfigParser
- from typing import Dict, Optional
- def _parse_hhmm(s: str) -> int:
- """
- 'HH:MM' → HHMM 整数,和 time.strftime('%H%M') 后 int() 的比较语义一致。
- '07:50' → 750;'19:00' → 1900;'00:05' → 5
- """
- s = s.strip()
- parts = s.split(':')
- if len(parts) != 2:
- raise ValueError('时段配置不是 HH:MM 格式: ' + s)
- try:
- h, m = int(parts[0]), int(parts[1])
- except ValueError:
- raise ValueError('时段配置非数字: ' + s)
- if not (0 <= h < 24 and 0 <= m < 60):
- raise ValueError('时段越界: ' + s)
- return h * 100 + m
- def load_tuning_conf(path: str) -> Dict[str, str]:
- """
- 读 conf/datax-tuning.conf,返回 {key: value_str} 扁平 dict。
- 格式:`key value`,# 注释行,空行忽略(对齐 conf/spark-tuning.conf 风格)。
- """
- conf = {}
- with open(path, 'r', encoding='utf-8') as fh:
- for line in fh:
- line = line.strip()
- if not line or line.startswith('#'):
- continue
- parts = line.split(None, 1)
- if len(parts) != 2:
- continue
- key, value = parts
- conf[key.strip()] = value.strip()
- return conf
- def merge_speed(
- l1: Dict[str, str],
- ini_cp: ConfigParser,
- cli_overrides: Optional[Dict[str, int]] = None,
- now_hhmm: Optional[int] = None,
- ) -> Dict[str, int]:
- """
- 合并 speed 三参(channel/byte/record):L1 < L2 ini [speed] < L3 CLI。
- 时段分档仅在 L1 层生效:L1 按 now 属严格/宽松选档;一旦 L2/L3 指定某字段,
- 直接覆盖,不再看时段。逐字段 merge,非整段替换。
- Args:
- l1: load_tuning_conf 返回的扁平 dict
- ini_cp: DataX ini 的 ConfigParser(已 read)
- cli_overrides: L3 CLI 传入,形如 {'channel': 20, 'byte': None, 'record': None}
- value 为 None 表示 CLI 未传该项,不覆盖
- now_hhmm: 测试注入用;默认当前系统时刻 HHMM 整数
- Returns:
- {'channel': int, 'byte': int, 'record': int}
- """
- if now_hhmm is None:
- now_hhmm = int(time.strftime('%H%M'))
- # 1) L1:按时段选档(以宽松时段为基准,区间内 relaxed、区间外 strict)
- relaxed_start = _parse_hhmm(l1['speed.relaxed_period.start'])
- relaxed_stop = _parse_hhmm(l1['speed.relaxed_period.stop'])
- if relaxed_start <= now_hhmm < relaxed_stop:
- bucket = 'relaxed'
- else:
- bucket = 'strict'
- print('[tuning] 当前时刻 {n:04d} → {b} 时段(宽松 [{s:04d}, {e:04d}))'.format(
- n=now_hhmm, b=bucket, s=relaxed_start, e=relaxed_stop))
- speed = {}
- for key in ('channel', 'byte', 'record'):
- l1_key = 'speed.{b}.{k}'.format(b=bucket, k=key)
- speed[key] = int(l1[l1_key])
- print('[tuning] L1 speed.{k} => {v}'.format(k=key, v=speed[key]))
- # 2) L2 ini [speed] 段
- if ini_cp.has_section('speed'):
- for key in ('channel', 'byte', 'record'):
- if ini_cp.has_option('speed', key):
- raw = ini_cp.get('speed', key).strip()
- if raw:
- speed[key] = int(raw)
- print('[tuning] L2 ini [speed] {k} => {v} 覆盖 L1'.format(k=key, v=speed[key]))
- # 3) L3 CLI
- if cli_overrides:
- for key in ('channel', 'byte', 'record'):
- if cli_overrides.get(key) is not None:
- speed[key] = int(cli_overrides[key])
- print('[tuning] L3 CLI -{k} => {v} 覆盖 L2'.format(k=key, v=speed[key]))
- print('[tuning] 最终 speed = {s}'.format(s=speed))
- return speed
|