Browse Source

feat(datax): 新增 batch 模块(批量展开 + 串/并行调度)

expand_ini_inputs: 合并 -ini 多次 + -inis 目录非递归扫描 + 去重
run_batch: 串行 for / 并行 threading,每任务间 sleep 0.5s(对齐老脚本)
分布式 worker 选择不在本层(由 runner 内调 worker.select_worker)
log tee 不在本层(stdout/stderr 重定向传给 runner.run_job)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 2 weeks ago
parent
commit
04c3f6a119
1 changed files with 89 additions and 0 deletions
  1. 89 0
      dw_base/datax/batch.py

+ 89 - 0
dw_base/datax/batch.py

@@ -0,0 +1,89 @@
+# -*- coding:utf-8 -*-
+"""
+批量展开 + 串/并行调度(对应 bin/datax-multiple-job-starter.sh)。
+
+- expand_ini_inputs: 合并 -ini 多次 + -inis 目录扫描,去重返回 ini 绝对路径列表
+- run_batch: 对每个 ini 调 run_one(ini);串行 for / 并行 threading
+
+不做分布式 worker 选择(由 runner 内调 worker.select_worker 处理)。
+不做 log tee(stdout/stderr 重定向由调用方传给 runner.run_job)。
+
+注:老 -gcd 目录扫描非递归(for `"${dir}"/*` 只看直接子项),这里保持一致。
+"""
+import os
+import threading
+import time
+from typing import Callable, List, Tuple
+
+
+def expand_ini_inputs(inis: List[str], inis_dirs: List[str]) -> List[str]:
+    """
+    合并 -ini 多次 + -inis 目录两种输入。
+    目录扫描非递归,只保留 `.ini` 结尾文件;去重(按绝对路径)。
+    """
+    collected = []
+    seen = set()
+    for ini in inis:
+        ap = os.path.abspath(ini)
+        if ap not in seen:
+            seen.add(ap)
+            collected.append(ap)
+    for d in inis_dirs:
+        if not os.path.isdir(d):
+            raise ValueError('ini 目录不存在: ' + d)
+        for entry in sorted(os.listdir(d)):
+            if entry.endswith('.ini'):
+                ap = os.path.abspath(os.path.join(d, entry))
+                if ap not in seen:
+                    seen.add(ap)
+                    collected.append(ap)
+    return collected
+
+
+def run_batch(inis: List[str],
+              run_one: Callable[[str], int],
+              parallel: bool = False,
+              sleep_between: float = 0.5) -> Tuple[int, int]:
+    """
+    对每个 ini 调用 run_one(ini)。
+
+    Args:
+        inis: 绝对路径 ini 列表
+        run_one: ini_path -> returncode(0 成功)
+        parallel: False 串行 for;True threading 并发
+        sleep_between: 并行模式每任务启动间隔(秒),对齐老 multiple-job-starter.sh:243 sleep 0.5s
+
+    Returns: (success_count, failure_count)
+    """
+    if not inis:
+        return 0, 0
+
+    if not parallel:
+        success = failed = 0
+        for ini in inis:
+            if run_one(ini) == 0:
+                success += 1
+            else:
+                failed += 1
+        return success, failed
+
+    threads = []
+    results = [None] * len(inis)
+
+    def _worker(idx, ini_path):
+        try:
+            results[idx] = run_one(ini_path)
+        except Exception:
+            results[idx] = -1
+
+    for i, ini in enumerate(inis):
+        t = threading.Thread(target=_worker, args=(i, ini))
+        t.start()
+        threads.append(t)
+        if sleep_between > 0:
+            time.sleep(sleep_between)
+    for t in threads:
+        t.join()
+    success = sum(1 for rc in results if rc == 0)
+    failed = len(results) - success
+    return success, failed