| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- # -*- coding:utf-8 -*-
- """
- DataX worker 选择工具(从 bin/common/init.sh:15-28 + bin/datax-single-job-starter.sh:30-55
- 的 select_worker 搬迁 Python 版)。
- 职责:
- - load_workers_ini: 读 conf/workers.ini 返回 WorkerPool(release_host + weights + 加权随机池)
- - select_worker: 三连回退规则决定单任务跑在哪台 worker
- 分发粒度:每次 select_worker 独立掷骰子,不做全局负载均衡(见 kb/91 §4.4)。
- """
- import random as _random
- from configparser import ConfigParser
- from typing import Dict, List, NamedTuple, Optional
- class WorkerPool(NamedTuple):
- release_host: str
- weights: Dict[str, int]
- queue: List[str] # 按权重展开后的加权随机池
- def load_workers_ini(path: str) -> WorkerPool:
- """
- 读 conf/workers.ini 返回 WorkerPool。
- ini 格式(见 conf/workers.ini):
- [release]
- host = cdhmaster02
- [weights]
- cdhmaster02 = 1
- cdhnode01 = 3
- ...
- """
- cp = ConfigParser()
- cp.read(path)
- release_host = cp.get('release', 'host').strip()
- weights = {} # type: Dict[str, int]
- for key, value in cp.items('weights'):
- weights[key.strip()] = int(value.strip())
- queue = [] # type: List[str]
- for host, w in weights.items():
- queue.extend([host] * w)
- return WorkerPool(release_host=release_host, weights=weights, queue=queue)
- def select_worker(pool: WorkerPool,
- is_release_user: bool,
- is_in_release_dir: bool,
- current_host: str,
- host: Optional[str] = None,
- use_random: bool = False,
- rand: Optional[_random.Random] = None) -> str:
- """
- 三连回退规则选 worker(对齐 bin/datax-single-job-starter.sh:30-55):
- 1. 非 release 用户 → current_host
- 2. 不在 release 目录 → current_host
- 3. 指定了 host → 用 host
- 4. use_random=True → 从加权池随机选
- 5. 都不指定 → current_host
- Args:
- pool: 加载好的 WorkerPool
- is_release_user: 当前 USER 是否 == conf/env.sh 的 RELEASE_USER
- is_in_release_dir: BASE_DIR 是否落在 RELEASE_ROOT_DIR 下
- current_host: 当前主机名(hostname -s)
- host: 显式指定的 worker 名(优先于 use_random)
- use_random: 是否从加权池随机
- rand: 注入的 Random 实例(单测用),默认 module 级 random
- Returns:
- 被选中的 worker 主机名
- """
- if not is_release_user:
- return current_host
- if not is_in_release_dir:
- return current_host
- if host:
- return host
- if use_random:
- r = rand or _random
- return r.choice(pool.queue)
- return current_host
|