runner.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. # -*- coding:utf-8 -*-
  2. """
  3. DataX 单任务执行器(从 bin/datax-single-job-starter.sh 搬迁 Python 版)。
  4. 流程(对齐老脚本 generate_job_config + run_single_datax_job):
  5. 1. 生成 json(调 bin/datax-job-config-generator.py,本机 subprocess / 远端 ssh)
  6. 2. 执行 datax.py(同样本机 / 远端分发)
  7. 注:本版继续透过老 shim bin/datax-job-config-generator.py 生成 json(批次 4 删老
  8. 脚本时再改为直接 import JobConfigGenerator);worker 选择由 worker.select_worker 提供,
  9. 本模块只接收最终 worker_host。
  10. """
  11. import os
  12. import shlex
  13. import subprocess
  14. from dw_base.datax import path_utils
  15. def run_job(ini_path: str,
  16. start_date: str,
  17. stop_date: str,
  18. worker_host: str,
  19. current_host: str,
  20. base_dir: str,
  21. python3_path: str,
  22. datax_home: str,
  23. skip_datax: bool = False) -> int:
  24. """
  25. 单任务执行:生成 json → 执行 datax.py(本机或 ssh 到远端)。
  26. Args:
  27. ini_path: DataX ini 绝对路径
  28. start_date / stop_date: yyyyMMdd
  29. worker_host: 已由 worker.select_worker 决定的目标节点
  30. current_host: hostname -s
  31. base_dir: 项目根绝对路径
  32. python3_path: python3 可执行路径(conf/env.sh 的 PYTHON3_PATH)
  33. datax_home: DataX 安装路径(conf/env.sh 的 DATAX_HOME)
  34. skip_datax: 只生成 json 不执行
  35. Returns:
  36. datax.py 的 returncode(0 = 成功);skip_datax=True 时返回 0
  37. Raises:
  38. RuntimeError: 生成 json 失败
  39. """
  40. json_path = path_utils.json_output_path(base_dir, ini_path)
  41. os.makedirs(os.path.dirname(json_path), exist_ok=True)
  42. gen_argv = [
  43. python3_path, '-u',
  44. os.path.join(base_dir, 'bin', 'datax-job-config-generator.py'),
  45. '-c', ini_path,
  46. '-start-date', start_date,
  47. '-stop-date', stop_date,
  48. ]
  49. gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host)
  50. if gen_rc != 0:
  51. raise RuntimeError('生成 DataX json 失败: ' + ini_path)
  52. if skip_datax:
  53. return 0
  54. exec_argv = [
  55. python3_path, '-u',
  56. os.path.join(datax_home, 'bin', 'datax.py'),
  57. json_path,
  58. ]
  59. return _run_local_or_remote(exec_argv, worker_host, current_host)
  60. def _run_local_or_remote(argv, worker_host: str, current_host: str) -> int:
  61. """
  62. 本机 = subprocess.run 直跑;远端 = ssh worker_host '<cmd>' 传字符串。
  63. stdout/stderr 继承父进程,由上游(batch 层)决定 tee / 重定向。
  64. """
  65. if worker_host == current_host:
  66. return subprocess.run(argv).returncode
  67. remote_cmd = ' '.join(shlex.quote(a) for a in argv)
  68. return subprocess.run(['ssh', worker_host, remote_cmd]).returncode