| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- # -*- coding:utf-8 -*-
- """
- 批量展开 + 串/并行调度(对应 bin/datax-multiple-job-starter.sh)。
- - expand_ini_inputs: 合并 -ini 多次 + -inis 目录扫描,去重返回 ini 绝对路径列表
- - run_batch: 对每个 ini 调 run_one(ini);串行 for / 并行 threading
- 不做分布式 worker 选择(由 runner 内调 worker.select_worker 处理)。
- 不做 log tee(stdout/stderr 重定向由调用方传给 runner.run_job)。
- 注:老 -gcd 目录扫描非递归(for `"${dir}"/*` 只看直接子项),这里保持一致。
- """
- import os
- import threading
- import time
- from typing import Callable, List, Tuple
- def expand_ini_inputs(inis: List[str], inis_dirs: List[str]) -> List[str]:
- """
- 合并 -ini 多次 + -inis 目录两种输入。
- 目录扫描非递归,只保留 `.ini` 结尾文件;去重(按绝对路径)。
- """
- collected = []
- seen = set()
- for ini in inis:
- ap = os.path.abspath(ini)
- if ap not in seen:
- seen.add(ap)
- collected.append(ap)
- for d in inis_dirs:
- if not os.path.isdir(d):
- raise ValueError('ini 目录不存在: ' + d)
- for entry in sorted(os.listdir(d)):
- if entry.endswith('.ini'):
- ap = os.path.abspath(os.path.join(d, entry))
- if ap not in seen:
- seen.add(ap)
- collected.append(ap)
- return collected
- def run_batch(inis: List[str],
- run_one: Callable[[str], int],
- parallel: bool = False,
- sleep_between: float = 0.5) -> Tuple[int, int]:
- """
- 对每个 ini 调用 run_one(ini)。
- Args:
- inis: 绝对路径 ini 列表
- run_one: ini_path -> returncode(0 成功)
- parallel: False 串行 for;True threading 并发
- sleep_between: 并行模式每任务启动间隔(秒),对齐老 multiple-job-starter.sh:243 sleep 0.5s
- Returns: (success_count, failure_count)
- """
- if not inis:
- return 0, 0
- if not parallel:
- success = failed = 0
- for ini in inis:
- if run_one(ini) == 0:
- success += 1
- else:
- failed += 1
- return success, failed
- threads = []
- results = [None] * len(inis)
- def _worker(idx, ini_path):
- try:
- results[idx] = run_one(ini_path)
- except Exception:
- results[idx] = -1
- for i, ini in enumerate(inis):
- t = threading.Thread(target=_worker, args=(i, ini))
- t.start()
- threads.append(t)
- if sleep_between > 0:
- time.sleep(sleep_between)
- for t in threads:
- t.join()
- success = sum(1 for rc in results if rc == 0)
- failed = len(results) - success
- return success, failed
|