# -*- coding:utf-8 -*- """ DataX worker 选择工具(从 bin/common/init.sh:15-28 + bin/datax-single-job-starter.sh:30-55 的 select_worker 搬迁 Python 版)。 职责: - load_workers_ini: 读 conf/workers.ini 返回 WorkerPool(release_host + weights + 加权随机池) - select_worker: 三连回退规则决定单任务跑在哪台 worker 分发粒度:每次 select_worker 独立掷骰子,不做全局负载均衡(见 kb/91 §4.4)。 """ import random as _random from configparser import ConfigParser from typing import Dict, List, NamedTuple, Optional class WorkerPool(NamedTuple): release_host: str weights: Dict[str, int] queue: List[str] # 按权重展开后的加权随机池 def load_workers_ini(path: str) -> WorkerPool: """ 读 conf/workers.ini 返回 WorkerPool。 ini 格式(见 conf/workers.ini): [release] host = cdhmaster02 [weights] cdhmaster02 = 1 cdhnode01 = 3 ... """ cp = ConfigParser() cp.read(path) release_host = cp.get('release', 'host').strip() weights = {} # type: Dict[str, int] for key, value in cp.items('weights'): weights[key.strip()] = int(value.strip()) queue = [] # type: List[str] for host, w in weights.items(): queue.extend([host] * w) return WorkerPool(release_host=release_host, weights=weights, queue=queue) def select_worker(pool: WorkerPool, is_release_user: bool, is_in_release_dir: bool, current_host: str, host: Optional[str] = None, use_random: bool = False, rand: Optional[_random.Random] = None) -> str: """ 三连回退规则选 worker(对齐 bin/datax-single-job-starter.sh:30-55): 1. 非 release 用户 → current_host 2. 不在 release 目录 → current_host 3. 指定了 host → 用 host 4. use_random=True → 从加权池随机选 5. 都不指定 → current_host Args: pool: 加载好的 WorkerPool is_release_user: 当前 USER 是否 == conf/env.sh 的 RELEASE_USER is_in_release_dir: BASE_DIR 是否落在 RELEASE_ROOT_DIR 下 current_host: 当前主机名(hostname -s) host: 显式指定的 worker 名(优先于 use_random) use_random: 是否从加权池随机 rand: 注入的 Random 实例(单测用),默认 module 级 random Returns: 被选中的 worker 主机名 """ if not is_release_user: return current_host if not is_in_release_dir: return current_host if host: return host if use_random: r = rand or _random return r.choice(pool.queue) return current_host