Przeglądaj źródła

feat(datax): 新增 worker 模块(workers.ini 加载 + 加权随机选)

load_workers_ini 读 conf/workers.ini 返回 WorkerPool (NamedTuple)
select_worker 复刻老 bin/datax-single-job-starter.sh:30-55 三连回退
不做全局负载均衡(单次独立掷骰子,与老逻辑一致)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 2 tygodni temu
rodzic
commit
83c918509b
1 zmienionych plików z 84 dodań i 0 usunięć
  1. 84 0
      dw_base/datax/worker.py

+ 84 - 0
dw_base/datax/worker.py

@@ -0,0 +1,84 @@
+# -*- coding:utf-8 -*-
+"""
+DataX worker 选择工具(从 bin/common/init.sh:15-28 + bin/datax-single-job-starter.sh:30-55
+的 select_worker 搬迁 Python 版)。
+
+职责:
+- load_workers_ini: 读 conf/workers.ini 返回 WorkerPool(release_host + weights + 加权随机池)
+- select_worker: 三连回退规则决定单任务跑在哪台 worker
+
+分发粒度:每次 select_worker 独立掷骰子,不做全局负载均衡(见 kb/91 §4.4)。
+"""
+import random as _random
+from configparser import ConfigParser
+from typing import Dict, List, NamedTuple, Optional
+
+
+class WorkerPool(NamedTuple):
+    release_host: str
+    weights: Dict[str, int]
+    queue: List[str]  # 按权重展开后的加权随机池
+
+
+def load_workers_ini(path: str) -> WorkerPool:
+    """
+    读 conf/workers.ini 返回 WorkerPool。
+
+    ini 格式(见 conf/workers.ini):
+        [release]
+        host = cdhmaster02
+        [weights]
+        cdhmaster02 = 1
+        cdhnode01 = 3
+        ...
+    """
+    cp = ConfigParser()
+    cp.read(path)
+    release_host = cp.get('release', 'host').strip()
+    weights = {}  # type: Dict[str, int]
+    for key, value in cp.items('weights'):
+        weights[key.strip()] = int(value.strip())
+    queue = []  # type: List[str]
+    for host, w in weights.items():
+        queue.extend([host] * w)
+    return WorkerPool(release_host=release_host, weights=weights, queue=queue)
+
+
+def select_worker(pool: WorkerPool,
+                  is_release_user: bool,
+                  is_in_release_dir: bool,
+                  current_host: str,
+                  host: Optional[str] = None,
+                  use_random: bool = False,
+                  rand: Optional[_random.Random] = None) -> str:
+    """
+    三连回退规则选 worker(对齐 bin/datax-single-job-starter.sh:30-55):
+
+    1. 非 release 用户 → current_host
+    2. 不在 release 目录 → current_host
+    3. 指定了 host → 用 host
+    4. use_random=True → 从加权池随机选
+    5. 都不指定 → current_host
+
+    Args:
+        pool: 加载好的 WorkerPool
+        is_release_user: 当前 USER 是否 == conf/env.sh 的 RELEASE_USER
+        is_in_release_dir: BASE_DIR 是否落在 RELEASE_ROOT_DIR 下
+        current_host: 当前主机名(hostname -s)
+        host: 显式指定的 worker 名(优先于 use_random)
+        use_random: 是否从加权池随机
+        rand: 注入的 Random 实例(单测用),默认 module 级 random
+
+    Returns:
+        被选中的 worker 主机名
+    """
+    if not is_release_user:
+        return current_host
+    if not is_in_release_dir:
+        return current_host
+    if host:
+        return host
+    if use_random:
+        r = rand or _random
+        return r.choice(pool.queue)
+    return current_host