batch.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # -*- coding:utf-8 -*-
  2. """
  3. 批量展开 + 串/并行调度(对应 bin/datax-multiple-job-starter.sh)。
  4. - expand_ini_inputs: 合并 -ini 多次 + -inis 目录扫描,去重返回 ini 绝对路径列表
  5. - run_batch: 对每个 ini 调 run_one(ini);串行 for / 并行 threading
  6. 不做分布式 worker 选择(由 runner 内调 worker.select_worker 处理)。
  7. 不做 log tee(stdout/stderr 重定向由调用方传给 runner.run_job)。
  8. 注:老 -gcd 目录扫描非递归(for `"${dir}"/*` 只看直接子项),这里保持一致。
  9. """
  10. import os
  11. import threading
  12. import time
  13. from typing import Callable, List, Tuple
  14. def expand_ini_inputs(inis: List[str], inis_dirs: List[str]) -> List[str]:
  15. """
  16. 合并 -ini 多次 + -inis 目录两种输入。
  17. 目录扫描非递归,只保留 `.ini` 结尾文件;去重(按绝对路径)。
  18. """
  19. collected = []
  20. seen = set()
  21. for ini in inis:
  22. ap = os.path.abspath(ini)
  23. if ap not in seen:
  24. seen.add(ap)
  25. collected.append(ap)
  26. for d in inis_dirs:
  27. if not os.path.isdir(d):
  28. raise ValueError('ini 目录不存在: ' + d)
  29. for entry in sorted(os.listdir(d)):
  30. if entry.endswith('.ini'):
  31. ap = os.path.abspath(os.path.join(d, entry))
  32. if ap not in seen:
  33. seen.add(ap)
  34. collected.append(ap)
  35. return collected
  36. def run_batch(inis: List[str],
  37. run_one: Callable[[str], int],
  38. parallel: bool = False,
  39. sleep_between: float = 0.5) -> Tuple[int, int]:
  40. """
  41. 对每个 ini 调用 run_one(ini)。
  42. Args:
  43. inis: 绝对路径 ini 列表
  44. run_one: ini_path -> returncode(0 成功)
  45. parallel: False 串行 for;True threading 并发
  46. sleep_between: 并行模式每任务启动间隔(秒),对齐老 multiple-job-starter.sh:243 sleep 0.5s
  47. Returns: (success_count, failure_count)
  48. """
  49. if not inis:
  50. return 0, 0
  51. if not parallel:
  52. success = failed = 0
  53. for ini in inis:
  54. if run_one(ini) == 0:
  55. success += 1
  56. else:
  57. failed += 1
  58. return success, failed
  59. threads = []
  60. results = [None] * len(inis)
  61. def _worker(idx, ini_path):
  62. try:
  63. results[idx] = run_one(ini_path)
  64. except Exception:
  65. results[idx] = -1
  66. for i, ini in enumerate(inis):
  67. t = threading.Thread(target=_worker, args=(i, ini))
  68. t.start()
  69. threads.append(t)
  70. if sleep_between > 0:
  71. time.sleep(sleep_between)
  72. for t in threads:
  73. t.join()
  74. success = sum(1 for rc in results if rc == 0)
  75. failed = len(results) - success
  76. return success, failed