runner.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. # -*- coding:utf-8 -*-
  2. """
  3. DataX 单任务执行器(从 bin/datax-single-job-starter.sh 搬迁 Python 版)。
  4. 流程(对齐老脚本 generate_job_config + run_single_datax_job):
  5. 1. (hdfs reader 场景)HDFS 源路径存在性 check,对齐老 check_data_exists
  6. 2. 生成 json (调 -m dw_base.datax.cli gen-json,本机 subprocess / 远端 ssh)
  7. 3. 执行 datax.py(同样本机 / 远端分发)
  8. stdout 输出模式三选一:
  9. - 默认 stdout=None, tee_to=None → 继承父进程(最简)
  10. - stdout=fh, tee_to=None → 只写 fh(并行模式,不回父 stdout,对齐老 > LOG 2>&1 &)
  11. - stdout=None, tee_to=fh → tee 到 fh + 父 stdout(串行模式,对齐老 | tee LOG_FILE)
  12. """
  13. import os
  14. import shlex
  15. import subprocess
  16. import sys
  17. from configparser import ConfigParser
  18. from dw_base.datax import path_utils
  19. def _hdfs_src_check(ini_path: str, start_date: str) -> str:
  20. """
  21. HDFS 源路径存在性 + 空检查(对齐老 datax-single-job-starter.sh:128-146 check_data_exists)。
  22. 仅对 reader 是 hdfs 的 ini 触发;其他 reader 返回 'n/a'。
  23. Returns:
  24. 'ok' - 路径存在且有数据
  25. 'missing' - 路径不存在(hadoop fs -test -e 失败)
  26. 'empty' - 路径存在但空(hadoop fs -du -s 返回 0)
  27. 'n/a' - 非 hdfs reader,不需要检查
  28. """
  29. cp = ConfigParser()
  30. cp.read(ini_path)
  31. if not cp.has_option('reader', 'dataSource'):
  32. return 'n/a'
  33. ds = cp.get('reader', 'dataSource')
  34. if not ds.startswith('hdfs/'):
  35. return 'n/a'
  36. if not cp.has_option('reader', 'path'):
  37. return 'n/a'
  38. path = cp.get('reader', 'path')
  39. # 替换占位符(对齐 hdfs_reader.load_others L27-32)
  40. path = path.replace('${start_date}', start_date)
  41. path = path.replace('${start-date}', start_date)
  42. path = path.replace('${dt}', start_date)
  43. # hadoop fs -test -e
  44. if subprocess.run(['hadoop', 'fs', '-test', '-e', path],
  45. stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode != 0:
  46. return 'missing'
  47. # hadoop fs -du -s → 第一列是字节数
  48. r = subprocess.run(['hadoop', 'fs', '-du', '-s', path],
  49. stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  50. if r.returncode == 0:
  51. try:
  52. size = int(r.stdout.decode('utf-8', errors='replace').strip().split()[0])
  53. if size == 0:
  54. return 'empty'
  55. except (IndexError, ValueError):
  56. pass
  57. return 'ok'
  58. def run_job(ini_path: str,
  59. start_date: str,
  60. stop_date: str,
  61. worker_host: str,
  62. current_host: str,
  63. base_dir: str,
  64. python3_path: str,
  65. datax_home: str,
  66. skip_datax: bool = False,
  67. skip_check: bool = False,
  68. speed_overrides: dict = None,
  69. stdout=None,
  70. stderr=None,
  71. tee_to=None) -> int:
  72. """
  73. 单任务执行。
  74. skip_check=False(默认):hdfs reader 跑源路径存在性 check,missing/empty → return 1 失败
  75. skip_check=True:跳过整段 check,直接生成 json + 跑 datax(对齐老 silent skip 行为)
  76. speed_overrides: L3 CLI 透传到 gen-json,形如 {'channel': 20, 'byte': None, 'record': 50000}
  77. """
  78. # 1. HDFS 源路径 check(默认开启;--skip-check 显式关闭后走老 silent skip 行为)
  79. if not skip_check:
  80. src_state = _hdfs_src_check(ini_path, start_date)
  81. if src_state == 'missing':
  82. print('[datax] {ini} HDFS 源路径不存在,任务失败(如确需跳过请加 -skip-check)'.format(ini=ini_path))
  83. return 1
  84. if src_state == 'empty':
  85. print('[datax] {ini} HDFS 源路径空,任务失败(如确需跳过请加 -skip-check)'.format(ini=ini_path))
  86. return 1
  87. json_path = path_utils.json_output_path(base_dir, ini_path)
  88. os.makedirs(os.path.dirname(json_path), exist_ok=True)
  89. gen_argv = [
  90. python3_path, '-u',
  91. '-m', 'dw_base.datax.cli',
  92. 'gen-json', ini_path,
  93. '-start-date', start_date,
  94. '-stop-date', stop_date,
  95. ]
  96. # L3 CLI speed 透传(None 不传)
  97. if speed_overrides:
  98. for key in ('channel', 'byte', 'record'):
  99. if speed_overrides.get(key) is not None:
  100. gen_argv.extend(['-' + key, str(speed_overrides[key])])
  101. print('[datax] {ini} 开始生成 json @ worker={w}'.format(ini=ini_path, w=worker_host))
  102. gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host, base_dir,
  103. stdout=stdout, stderr=stderr, tee_to=tee_to)
  104. if gen_rc != 0:
  105. raise RuntimeError('生成 DataX json 失败: ' + ini_path)
  106. print('[datax] {ini} json 生成完成'.format(ini=ini_path))
  107. if skip_datax:
  108. print('[datax] {ini} skip_datax=True,跳过执行'.format(ini=ini_path))
  109. return 0
  110. exec_argv = [
  111. python3_path, '-u',
  112. os.path.join(datax_home, 'bin', 'datax.py'),
  113. json_path,
  114. ]
  115. print('[datax] {ini} 开始执行 datax.py @ worker={w}'.format(ini=ini_path, w=worker_host))
  116. rc = _run_local_or_remote(exec_argv, worker_host, current_host, base_dir,
  117. stdout=stdout, stderr=stderr, tee_to=tee_to)
  118. print('[datax] {ini} datax.py 执行完成 rc={rc}'.format(ini=ini_path, rc=rc))
  119. return rc
  120. def _run_local_or_remote(argv, worker_host: str, current_host: str, base_dir: str,
  121. stdout=None, stderr=None, tee_to=None) -> int:
  122. """
  123. 本机:subprocess.run 直跑 + PYTHONPATH 注入 base_dir
  124. 远端:ssh worker_host 'cd <base_dir> && <cmd>'
  125. tee_to 优先于 stdout/stderr:提供 tee_to 时走 Popen 行循环 tee(文件 + 父 stdout)
  126. """
  127. if worker_host == current_host:
  128. env = os.environ.copy()
  129. existing = env.get('PYTHONPATH', '')
  130. env['PYTHONPATH'] = base_dir + (os.pathsep + existing if existing else '')
  131. if tee_to is not None:
  132. return _run_with_tee(argv, tee_to, env=env)
  133. return subprocess.run(argv, stdout=stdout, stderr=stderr, env=env).returncode
  134. remote_cmd = 'cd ' + shlex.quote(base_dir) + ' && ' + ' '.join(shlex.quote(a) for a in argv)
  135. ssh_argv = ['ssh', worker_host, remote_cmd]
  136. if tee_to is not None:
  137. return _run_with_tee(ssh_argv, tee_to)
  138. return subprocess.run(ssh_argv, stdout=stdout, stderr=stderr).returncode
  139. def _run_with_tee(argv, log_fh, env=None) -> int:
  140. """
  141. Popen + 行循环 tee:subprocess 的 stdout/stderr 合并后同时写 log_fh 和 sys.stdout。
  142. 对齐老脚本 bash 层的 `| tee LOG_FILE` 行为(串行模式独立 log 文件)。
  143. """
  144. proc = subprocess.Popen(argv,
  145. stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
  146. env=env, bufsize=1, universal_newlines=True)
  147. for line in proc.stdout:
  148. sys.stdout.write(line)
  149. sys.stdout.flush()
  150. log_fh.write(line)
  151. log_fh.flush()
  152. return proc.wait()