Sfoglia il codice sorgente

refactor(datax): tuning 时段基准 strict→relaxed(解决跨午夜表达问题)

tianyu.chu 1 settimana fa
parent
commit
453d79d56c
3 ha cambiato i file con 35 aggiunte e 35 eliminazioni
  1. 6 6
      conf/datax-tuning.conf
  2. 8 8
      dw_base/datax/tuning.py
  3. 21 21
      tests/unit/datax/test_tuning.py

+ 6 - 6
conf/datax-tuning.conf

@@ -3,12 +3,12 @@
 # 加载入口:dw_base/datax/job_config_generator.py 生成 json 前
 
 # ========== speed ==========
-# 严格时段:显式指定起止(HH:MM 24 小时制)
-# 此区间内走 strict 资源档(每通道吞吐低,保护业务 DB 从库
-# 此区间外自动走 relaxed 档(每通道吞吐高
-# 判断采用左闭右开 [start, stop);start 时分属严格,stop 时分属宽松
-speed.strict_period.start          12:00
-speed.strict_period.stop           06:00
+# 宽松时段:显式指定起止(HH:MM 24 小时制)
+# 此区间内走 relaxed 资源档(每通道吞吐高,业务低峰可加速
+# 此区间外自动走 strict 档(每通道吞吐低,保护业务 DB 从库
+# 判断采用左闭右开 [start, stop);start 时分属宽松,stop 时分属严格
+speed.relaxed_period.start         06:00
+speed.relaxed_period.stop          12:00
 
 speed.strict.channel               10
 speed.strict.byte                  10485760

+ 8 - 8
dw_base/datax/tuning.py

@@ -79,15 +79,15 @@ def merge_speed(
     if now_hhmm is None:
         now_hhmm = int(time.strftime('%H%M'))
 
-    # 1) L1:按时段选档
-    strict_start = _parse_hhmm(l1['speed.strict_period.start'])
-    strict_stop = _parse_hhmm(l1['speed.strict_period.stop'])
-    if strict_start <= now_hhmm < strict_stop:
-        bucket = 'strict'
-    else:
+    # 1) L1:按时段选档(以宽松时段为基准,区间内 relaxed、区间外 strict)
+    relaxed_start = _parse_hhmm(l1['speed.relaxed_period.start'])
+    relaxed_stop = _parse_hhmm(l1['speed.relaxed_period.stop'])
+    if relaxed_start <= now_hhmm < relaxed_stop:
         bucket = 'relaxed'
-    print('[tuning] 当前时刻 {n:04d} → {b} 时段(严格 [{s:04d}, {e:04d}))'.format(
-        n=now_hhmm, b=bucket, s=strict_start, e=strict_stop))
+    else:
+        bucket = 'strict'
+    print('[tuning] 当前时刻 {n:04d} → {b} 时段(宽松 [{s:04d}, {e:04d}))'.format(
+        n=now_hhmm, b=bucket, s=relaxed_start, e=relaxed_stop))
 
     speed = {}
     for key in ('channel', 'byte', 'record'):

+ 21 - 21
tests/unit/datax/test_tuning.py

@@ -43,8 +43,8 @@ def test_load_tuning_conf(tmp_path):
 
 def _mk_l1():
     return {
-        'speed.strict_period.start': '07:50',
-        'speed.strict_period.stop': '19:00',
+        'speed.relaxed_period.start': '06:00',
+        'speed.relaxed_period.stop': '12:00',
         'speed.strict.channel': '10',
         'speed.strict.byte': '10485760',
         'speed.strict.record': '40000',
@@ -54,32 +54,32 @@ def _mk_l1():
     }
 
 
-def test_merge_speed_l1_strict_period():
-    """now=1000 属严格时段,走 L1 strict 档。"""
+def test_merge_speed_l1_relaxed_period():
+    """now=1000 在宽松区间 [600, 1200) 内,走 L1 relaxed 档。"""
     ini = ConfigParser()
     speed = merge_speed(_mk_l1(), ini, now_hhmm=1000)
-    assert speed == {'channel': 10, 'byte': 10485760, 'record': 40000}
+    assert speed == {'channel': 6, 'byte': 268435456, 'record': 100000}
 
 
-def test_merge_speed_l1_relaxed_period():
-    """now=2000 属宽松时段,走 L1 relaxed 档。"""
+def test_merge_speed_l1_strict_period():
+    """now=1300 在宽松区间外,走 L1 strict 档。"""
     ini = ConfigParser()
-    speed = merge_speed(_mk_l1(), ini, now_hhmm=2000)
-    assert speed == {'channel': 6, 'byte': 268435456, 'record': 100000}
+    speed = merge_speed(_mk_l1(), ini, now_hhmm=1300)
+    assert speed == {'channel': 10, 'byte': 10485760, 'record': 40000}
 
 
-def test_merge_speed_boundary_strict_start_inclusive():
-    """now 恰 == strict_start(左闭)属严格。"""
+def test_merge_speed_boundary_relaxed_start_inclusive():
+    """now 恰 == relaxed_start(左闭)属宽松。"""
     ini = ConfigParser()
-    speed = merge_speed(_mk_l1(), ini, now_hhmm=750)
-    assert speed['channel'] == 10  # strict
+    speed = merge_speed(_mk_l1(), ini, now_hhmm=600)
+    assert speed['channel'] == 6  # relaxed
 
 
-def test_merge_speed_boundary_strict_stop_exclusive():
-    """now 恰 == strict_stop(右开)属宽松。"""
+def test_merge_speed_boundary_relaxed_stop_exclusive():
+    """now 恰 == relaxed_stop(右开)属严格。"""
     ini = ConfigParser()
-    speed = merge_speed(_mk_l1(), ini, now_hhmm=1900)
-    assert speed['channel'] == 6  # relaxed
+    speed = merge_speed(_mk_l1(), ini, now_hhmm=1200)
+    assert speed['channel'] == 10  # strict
 
 
 def test_merge_speed_l2_ini_overrides_l1_per_field():
@@ -88,7 +88,7 @@ def test_merge_speed_l2_ini_overrides_l1_per_field():
     ini.add_section('speed')
     ini.set('speed', 'channel', '20')
     speed = merge_speed(_mk_l1(), ini, now_hhmm=1000)
-    assert speed == {'channel': 20, 'byte': 10485760, 'record': 40000}
+    assert speed == {'channel': 20, 'byte': 268435456, 'record': 100000}
 
 
 def test_merge_speed_l3_cli_overrides_l2_l1():
@@ -98,8 +98,8 @@ def test_merge_speed_l3_cli_overrides_l2_l1():
     ini.set('speed', 'channel', '20')
     cli = {'channel': 30, 'byte': None, 'record': 50000}
     speed = merge_speed(_mk_l1(), ini, cli_overrides=cli, now_hhmm=1000)
-    # channel: L3=30 胜;byte: None 不覆盖,回落 L1 strict;record: L3=50000
-    assert speed == {'channel': 30, 'byte': 10485760, 'record': 50000}
+    # channel: L3=30 胜;byte: None 不覆盖,回落 L1 relaxed;record: L3=50000
+    assert speed == {'channel': 30, 'byte': 268435456, 'record': 50000}
 
 
 def test_merge_speed_l3_only_without_ini_section():
@@ -107,4 +107,4 @@ def test_merge_speed_l3_only_without_ini_section():
     ini = ConfigParser()
     cli = {'channel': 15, 'byte': None, 'record': None}
     speed = merge_speed(_mk_l1(), ini, cli_overrides=cli, now_hhmm=1000)
-    assert speed == {'channel': 15, 'byte': 10485760, 'record': 40000}
+    assert speed == {'channel': 15, 'byte': 268435456, 'record': 100000}