# -*- coding:utf-8 -*- """ 批量展开 + 串/并行调度(对应 bin/datax-multiple-job-starter.sh)。 - expand_ini_inputs: 合并 -ini 多次 + -inis 目录扫描,去重返回 ini 绝对路径列表 - run_batch: 对每个 ini 调 run_one(ini);串行 for / 并行 threading 不做分布式 worker 选择(由 runner 内调 worker.select_worker 处理)。 不做 log tee(stdout/stderr 重定向由调用方传给 runner.run_job)。 注:老 -gcd 目录扫描非递归(for `"${dir}"/*` 只看直接子项),这里保持一致。 """ import os import threading import time from typing import Callable, List, Tuple def expand_ini_inputs(inis: List[str], inis_dirs: List[str]) -> List[str]: """ 合并 -ini 多次 + -inis 目录两种输入。 目录扫描非递归,只保留 `.ini` 结尾文件;去重(按绝对路径)。 """ collected = [] seen = set() for ini in inis: ap = os.path.abspath(ini) if ap not in seen: seen.add(ap) collected.append(ap) for d in inis_dirs: if not os.path.isdir(d): raise ValueError('ini 目录不存在: ' + d) for entry in sorted(os.listdir(d)): if entry.endswith('.ini'): ap = os.path.abspath(os.path.join(d, entry)) if ap not in seen: seen.add(ap) collected.append(ap) return collected def run_batch(inis: List[str], run_one: Callable[[str], int], parallel: bool = False, sleep_between: float = 0.5) -> Tuple[int, int]: """ 对每个 ini 调用 run_one(ini)。 Args: inis: 绝对路径 ini 列表 run_one: ini_path -> returncode(0 成功) parallel: False 串行 for;True threading 并发 sleep_between: 并行模式每任务启动间隔(秒),对齐老 multiple-job-starter.sh:243 sleep 0.5s Returns: (success_count, failure_count) """ if not inis: return 0, 0 if not parallel: success = failed = 0 for ini in inis: if run_one(ini) == 0: success += 1 else: failed += 1 return success, failed threads = [] results = [None] * len(inis) def _worker(idx, ini_path): try: results[idx] = run_one(ini_path) except Exception: results[idx] = -1 for i, ini in enumerate(inis): t = threading.Thread(target=_worker, args=(i, ini)) t.start() threads.append(t) if sleep_between > 0: time.sleep(sleep_between) for t in threads: t.join() success = sum(1 for rc in results if rc == 0) failed = len(results) - success return success, failed