worker.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. # -*- coding:utf-8 -*-
  2. """
  3. DataX worker 选择工具(从 bin/common/init.sh:15-28 + bin/datax-single-job-starter.sh:30-55
  4. 的 select_worker 搬迁 Python 版)。
  5. 职责:
  6. - load_workers_ini: 读 conf/workers.ini 返回 WorkerPool(release_host + weights + 加权随机池)
  7. - select_worker: 三连回退规则决定单任务跑在哪台 worker
  8. 分发粒度:每次 select_worker 独立掷骰子,不做全局负载均衡(见 kb/91 §4.4)。
  9. """
  10. import random as _random
  11. from configparser import ConfigParser
  12. from typing import Dict, List, NamedTuple, Optional
  13. class WorkerPool(NamedTuple):
  14. release_host: str
  15. weights: Dict[str, int]
  16. queue: List[str] # 按权重展开后的加权随机池
  17. def load_workers_ini(path: str) -> WorkerPool:
  18. """
  19. 读 conf/workers.ini 返回 WorkerPool。
  20. ini 格式(见 conf/workers.ini):
  21. [release]
  22. host = cdhmaster02
  23. [weights]
  24. cdhmaster02 = 1
  25. cdhnode01 = 3
  26. ...
  27. """
  28. cp = ConfigParser()
  29. cp.read(path)
  30. release_host = cp.get('release', 'host').strip()
  31. weights = {} # type: Dict[str, int]
  32. for key, value in cp.items('weights'):
  33. weights[key.strip()] = int(value.strip())
  34. queue = [] # type: List[str]
  35. for host, w in weights.items():
  36. queue.extend([host] * w)
  37. return WorkerPool(release_host=release_host, weights=weights, queue=queue)
  38. def select_worker(pool: WorkerPool,
  39. is_release_user: bool,
  40. is_in_release_dir: bool,
  41. current_host: str,
  42. host: Optional[str] = None,
  43. use_random: bool = False,
  44. rand: Optional[_random.Random] = None) -> str:
  45. """
  46. 三连回退规则选 worker(对齐 bin/datax-single-job-starter.sh:30-55):
  47. 1. 非 release 用户 → current_host
  48. 2. 不在 release 目录 → current_host
  49. 3. 指定了 host → 用 host
  50. 4. use_random=True → 从加权池随机选
  51. 5. 都不指定 → current_host
  52. Args:
  53. pool: 加载好的 WorkerPool
  54. is_release_user: 当前 USER 是否 == conf/env.sh 的 RELEASE_USER
  55. is_in_release_dir: BASE_DIR 是否落在 RELEASE_ROOT_DIR 下
  56. current_host: 当前主机名(hostname -s)
  57. host: 显式指定的 worker 名(优先于 use_random)
  58. use_random: 是否从加权池随机
  59. rand: 注入的 Random 实例(单测用),默认 module 级 random
  60. Returns:
  61. 被选中的 worker 主机名
  62. """
  63. if not is_release_user:
  64. return current_host
  65. if not is_in_release_dir:
  66. return current_host
  67. if host:
  68. return host
  69. if use_random:
  70. r = rand or _random
  71. return r.choice(pool.queue)
  72. return current_host