runner.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. # -*- coding:utf-8 -*-
  2. """
  3. DataX 单任务执行器(从 bin/datax-single-job-starter.sh 搬迁 Python 版)。
  4. 流程(对齐老脚本 generate_job_config + run_single_datax_job):
  5. 1. 生成 json(调 bin/datax-job-config-generator.py,本机 subprocess / 远端 ssh)
  6. 2. 执行 datax.py(同样本机 / 远端分发)
  7. 生成 json 通过 `python3 -m dw_base.datax.cli gen-json` 调用(替代老 bin shim);
  8. worker 选择由 worker.select_worker 提供,本模块只接收最终 worker_host。
  9. """
  10. import os
  11. import shlex
  12. import subprocess
  13. from dw_base.datax import path_utils
  14. def run_job(ini_path: str,
  15. start_date: str,
  16. stop_date: str,
  17. worker_host: str,
  18. current_host: str,
  19. base_dir: str,
  20. python3_path: str,
  21. datax_home: str,
  22. skip_datax: bool = False,
  23. stdout=None,
  24. stderr=None) -> int:
  25. """
  26. 单任务执行:生成 json → 执行 datax.py(本机或 ssh 到远端)。
  27. Args:
  28. ini_path: DataX ini 绝对路径
  29. start_date / stop_date: yyyyMMdd
  30. worker_host: 已由 worker.select_worker 决定的目标节点
  31. current_host: hostname -s
  32. base_dir: 项目根绝对路径
  33. python3_path: python3 可执行路径(conf/env.sh 的 PYTHON3_PATH)
  34. datax_home: DataX 安装路径(conf/env.sh 的 DATAX_HOME)
  35. skip_datax: 只生成 json 不执行
  36. stdout / stderr: 传给 subprocess.run 的重定向 target(文件句柄等);
  37. None = 继承父进程。batch 并行模式传文件句柄隔离每任务日志
  38. Returns:
  39. datax.py 的 returncode(0 = 成功);skip_datax=True 时返回 0
  40. Raises:
  41. RuntimeError: 生成 json 失败
  42. """
  43. json_path = path_utils.json_output_path(base_dir, ini_path)
  44. os.makedirs(os.path.dirname(json_path), exist_ok=True)
  45. gen_argv = [
  46. python3_path, '-u',
  47. '-m', 'dw_base.datax.cli',
  48. 'gen-json', ini_path,
  49. '-start-date', start_date,
  50. '-stop-date', stop_date,
  51. ]
  52. gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host, base_dir,
  53. stdout=stdout, stderr=stderr)
  54. if gen_rc != 0:
  55. raise RuntimeError('生成 DataX json 失败: ' + ini_path)
  56. if skip_datax:
  57. return 0
  58. exec_argv = [
  59. python3_path, '-u',
  60. os.path.join(datax_home, 'bin', 'datax.py'),
  61. json_path,
  62. ]
  63. return _run_local_or_remote(exec_argv, worker_host, current_host, base_dir,
  64. stdout=stdout, stderr=stderr)
  65. def _run_local_or_remote(argv, worker_host: str, current_host: str, base_dir: str,
  66. stdout=None, stderr=None) -> int:
  67. """
  68. 本机:subprocess.run 直跑 + PYTHONPATH 注入 base_dir(让 python -m dw_base.datax.cli 能找到包)
  69. 远端:ssh worker_host 'cd <base_dir> && <cmd>'(cwd 为项目根,同样保证 -m 能找到 dw_base)
  70. stdout/stderr 默认继承父进程;batch 层可传文件句柄做每任务独立日志。
  71. """
  72. if worker_host == current_host:
  73. env = os.environ.copy()
  74. existing = env.get('PYTHONPATH', '')
  75. env['PYTHONPATH'] = base_dir + (os.pathsep + existing if existing else '')
  76. return subprocess.run(argv, stdout=stdout, stderr=stderr, env=env).returncode
  77. remote_cmd = 'cd ' + shlex.quote(base_dir) + ' && ' + ' '.join(shlex.quote(a) for a in argv)
  78. return subprocess.run(['ssh', worker_host, remote_cmd], stdout=stdout, stderr=stderr).returncode