tuning.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # -*- coding:utf-8 -*-
  2. """
  3. DataX 运行时调优参数三级合并。
  4. 三级:
  5. - L1 conf/datax-tuning.conf:项目级默认(严格/宽松时段 + speed 三参)
  6. - L2 ini [speed] 段:单 ini 级覆盖
  7. - L3 CLI -channel/-byte/-record:运行时级覆盖
  8. 优先级:L1 < L2 < L3
  9. 合并发生在 JobConfigGenerator.assemble() 里,结果写进生成的 json。
  10. ini / conf 文件本身不改。
  11. """
  12. import time
  13. from configparser import ConfigParser
  14. from typing import Dict, Optional
  15. def _parse_hhmm(s: str) -> int:
  16. """
  17. 'HH:MM' → HHMM 整数,和 time.strftime('%H%M') 后 int() 的比较语义一致。
  18. '07:50' → 750;'19:00' → 1900;'00:05' → 5
  19. """
  20. s = s.strip()
  21. parts = s.split(':')
  22. if len(parts) != 2:
  23. raise ValueError('时段配置不是 HH:MM 格式: ' + s)
  24. try:
  25. h, m = int(parts[0]), int(parts[1])
  26. except ValueError:
  27. raise ValueError('时段配置非数字: ' + s)
  28. if not (0 <= h < 24 and 0 <= m < 60):
  29. raise ValueError('时段越界: ' + s)
  30. return h * 100 + m
  31. def load_tuning_conf(path: str) -> Dict[str, str]:
  32. """
  33. 读 conf/datax-tuning.conf,返回 {key: value_str} 扁平 dict。
  34. 格式:`key value`,# 注释行,空行忽略(对齐 conf/spark-tuning.conf 风格)。
  35. """
  36. conf = {}
  37. with open(path, 'r', encoding='utf-8') as fh:
  38. for line in fh:
  39. line = line.strip()
  40. if not line or line.startswith('#'):
  41. continue
  42. parts = line.split(None, 1)
  43. if len(parts) != 2:
  44. continue
  45. key, value = parts
  46. conf[key.strip()] = value.strip()
  47. return conf
  48. def merge_speed(
  49. l1: Dict[str, str],
  50. ini_cp: ConfigParser,
  51. cli_overrides: Optional[Dict[str, int]] = None,
  52. now_hhmm: Optional[int] = None,
  53. ) -> Dict[str, int]:
  54. """
  55. 合并 speed 三参(channel/byte/record):L1 < L2 ini [speed] < L3 CLI。
  56. 时段分档仅在 L1 层生效:L1 按 now 属严格/宽松选档;一旦 L2/L3 指定某字段,
  57. 直接覆盖,不再看时段。逐字段 merge,非整段替换。
  58. Args:
  59. l1: load_tuning_conf 返回的扁平 dict
  60. ini_cp: DataX ini 的 ConfigParser(已 read)
  61. cli_overrides: L3 CLI 传入,形如 {'channel': 20, 'byte': None, 'record': None}
  62. value 为 None 表示 CLI 未传该项,不覆盖
  63. now_hhmm: 测试注入用;默认当前系统时刻 HHMM 整数
  64. Returns:
  65. {'channel': int, 'byte': int, 'record': int}
  66. """
  67. if now_hhmm is None:
  68. now_hhmm = int(time.strftime('%H%M'))
  69. # 1) L1:按时段选档(以宽松时段为基准,区间内 relaxed、区间外 strict)
  70. relaxed_start = _parse_hhmm(l1['speed.relaxed_period.start'])
  71. relaxed_stop = _parse_hhmm(l1['speed.relaxed_period.stop'])
  72. if relaxed_start <= now_hhmm < relaxed_stop:
  73. bucket = 'relaxed'
  74. else:
  75. bucket = 'strict'
  76. print('[tuning] 当前时刻 {n:04d} → {b} 时段(宽松 [{s:04d}, {e:04d}))'.format(
  77. n=now_hhmm, b=bucket, s=relaxed_start, e=relaxed_stop))
  78. speed = {}
  79. for key in ('channel', 'byte', 'record'):
  80. l1_key = 'speed.{b}.{k}'.format(b=bucket, k=key)
  81. speed[key] = int(l1[l1_key])
  82. print('[tuning] L1 speed.{k} => {v}'.format(k=key, v=speed[key]))
  83. # 2) L2 ini [speed] 段
  84. if ini_cp.has_section('speed'):
  85. for key in ('channel', 'byte', 'record'):
  86. if ini_cp.has_option('speed', key):
  87. raw = ini_cp.get('speed', key).strip()
  88. if raw:
  89. speed[key] = int(raw)
  90. print('[tuning] L2 ini [speed] {k} => {v} 覆盖 L1'.format(k=key, v=speed[key]))
  91. # 3) L3 CLI
  92. if cli_overrides:
  93. for key in ('channel', 'byte', 'record'):
  94. if cli_overrides.get(key) is not None:
  95. speed[key] = int(cli_overrides[key])
  96. print('[tuning] L3 CLI -{k} => {v} 覆盖 L2'.format(k=key, v=speed[key]))
  97. print('[tuning] 最终 speed = {s}'.format(s=speed))
  98. return speed