|
@@ -3,19 +3,68 @@
|
|
|
DataX 单任务执行器(从 bin/datax-single-job-starter.sh 搬迁 Python 版)。
|
|
DataX 单任务执行器(从 bin/datax-single-job-starter.sh 搬迁 Python 版)。
|
|
|
|
|
|
|
|
流程(对齐老脚本 generate_job_config + run_single_datax_job):
|
|
流程(对齐老脚本 generate_job_config + run_single_datax_job):
|
|
|
-1. 生成 json(调 bin/datax-job-config-generator.py,本机 subprocess / 远端 ssh)
|
|
|
|
|
-2. 执行 datax.py(同样本机 / 远端分发)
|
|
|
|
|
|
|
+1. (hdfs reader 场景)HDFS 源路径存在性 check,对齐老 check_data_exists
|
|
|
|
|
+2. 生成 json (调 -m dw_base.datax.cli gen-json,本机 subprocess / 远端 ssh)
|
|
|
|
|
+3. 执行 datax.py(同样本机 / 远端分发)
|
|
|
|
|
|
|
|
-生成 json 通过 `python3 -m dw_base.datax.cli gen-json` 调用(替代老 bin shim);
|
|
|
|
|
-worker 选择由 worker.select_worker 提供,本模块只接收最终 worker_host。
|
|
|
|
|
|
|
+stdout 输出模式三选一:
|
|
|
|
|
+- 默认 stdout=None, tee_to=None → 继承父进程(最简)
|
|
|
|
|
+- stdout=fh, tee_to=None → 只写 fh(并行模式,不回父 stdout,对齐老 > LOG 2>&1 &)
|
|
|
|
|
+- stdout=None, tee_to=fh → tee 到 fh + 父 stdout(串行模式,对齐老 | tee LOG_FILE)
|
|
|
"""
|
|
"""
|
|
|
import os
|
|
import os
|
|
|
import shlex
|
|
import shlex
|
|
|
import subprocess
|
|
import subprocess
|
|
|
|
|
+import sys
|
|
|
|
|
+from configparser import ConfigParser
|
|
|
|
|
|
|
|
from dw_base.datax import path_utils
|
|
from dw_base.datax import path_utils
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+def _hdfs_src_check(ini_path: str, start_date: str) -> str:
|
|
|
|
|
+ """
|
|
|
|
|
+ HDFS 源路径存在性 + 空检查(对齐老 datax-single-job-starter.sh:128-146 check_data_exists)。
|
|
|
|
|
+
|
|
|
|
|
+ 仅对 reader 是 hdfs 的 ini 触发;其他 reader 返回 'n/a'。
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 'ok' - 路径存在且有数据
|
|
|
|
|
+ 'missing' - 路径不存在(hadoop fs -test -e 失败)
|
|
|
|
|
+ 'empty' - 路径存在但空(hadoop fs -du -s 返回 0)
|
|
|
|
|
+ 'n/a' - 非 hdfs reader,不需要检查
|
|
|
|
|
+ """
|
|
|
|
|
+ cp = ConfigParser()
|
|
|
|
|
+ cp.read(ini_path)
|
|
|
|
|
+ if not cp.has_option('reader', 'dataSource'):
|
|
|
|
|
+ return 'n/a'
|
|
|
|
|
+ ds = cp.get('reader', 'dataSource')
|
|
|
|
|
+ if not ds.startswith('hdfs/'):
|
|
|
|
|
+ return 'n/a'
|
|
|
|
|
+ if not cp.has_option('reader', 'path'):
|
|
|
|
|
+ return 'n/a'
|
|
|
|
|
+ path = cp.get('reader', 'path')
|
|
|
|
|
+ # 替换占位符(对齐 hdfs_reader.load_others L27-32)
|
|
|
|
|
+ path = path.replace('${start_date}', start_date)
|
|
|
|
|
+ path = path.replace('${start-date}', start_date)
|
|
|
|
|
+ path = path.replace('${dt}', start_date)
|
|
|
|
|
+
|
|
|
|
|
+ # hadoop fs -test -e
|
|
|
|
|
+ if subprocess.run(['hadoop', 'fs', '-test', '-e', path],
|
|
|
|
|
+ stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode != 0:
|
|
|
|
|
+ return 'missing'
|
|
|
|
|
+ # hadoop fs -du -s → 第一列是字节数
|
|
|
|
|
+ r = subprocess.run(['hadoop', 'fs', '-du', '-s', path],
|
|
|
|
|
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
|
+ if r.returncode == 0:
|
|
|
|
|
+ try:
|
|
|
|
|
+ size = int(r.stdout.decode('utf-8', errors='replace').strip().split()[0])
|
|
|
|
|
+ if size == 0:
|
|
|
|
|
+ return 'empty'
|
|
|
|
|
+ except (IndexError, ValueError):
|
|
|
|
|
+ pass
|
|
|
|
|
+ return 'ok'
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
def run_job(ini_path: str,
|
|
def run_job(ini_path: str,
|
|
|
start_date: str,
|
|
start_date: str,
|
|
|
stop_date: str,
|
|
stop_date: str,
|
|
@@ -26,28 +75,20 @@ def run_job(ini_path: str,
|
|
|
datax_home: str,
|
|
datax_home: str,
|
|
|
skip_datax: bool = False,
|
|
skip_datax: bool = False,
|
|
|
stdout=None,
|
|
stdout=None,
|
|
|
- stderr=None) -> int:
|
|
|
|
|
|
|
+ stderr=None,
|
|
|
|
|
+ tee_to=None) -> int:
|
|
|
"""
|
|
"""
|
|
|
- 单任务执行:生成 json → 执行 datax.py(本机或 ssh 到远端)。
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- ini_path: DataX ini 绝对路径
|
|
|
|
|
- start_date / stop_date: yyyyMMdd
|
|
|
|
|
- worker_host: 已由 worker.select_worker 决定的目标节点
|
|
|
|
|
- current_host: hostname -s
|
|
|
|
|
- base_dir: 项目根绝对路径
|
|
|
|
|
- python3_path: python3 可执行路径(conf/env.sh 的 PYTHON3_PATH)
|
|
|
|
|
- datax_home: DataX 安装路径(conf/env.sh 的 DATAX_HOME)
|
|
|
|
|
- skip_datax: 只生成 json 不执行
|
|
|
|
|
- stdout / stderr: 传给 subprocess.run 的重定向 target(文件句柄等);
|
|
|
|
|
- None = 继承父进程。batch 并行模式传文件句柄隔离每任务日志
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- datax.py 的 returncode(0 = 成功);skip_datax=True 时返回 0
|
|
|
|
|
-
|
|
|
|
|
- Raises:
|
|
|
|
|
- RuntimeError: 生成 json 失败
|
|
|
|
|
|
|
+ 单任务执行。
|
|
|
"""
|
|
"""
|
|
|
|
|
+ # 1. HDFS 源路径 check(对齐老 check_data_exists;仅 hdfs reader 触发)
|
|
|
|
|
+ src_state = _hdfs_src_check(ini_path, start_date)
|
|
|
|
|
+ if src_state == 'missing':
|
|
|
|
|
+ print('[datax] {ini} HDFS 源路径不存在,跳过 datax'.format(ini=ini_path))
|
|
|
|
|
+ return 0
|
|
|
|
|
+ if src_state == 'empty':
|
|
|
|
|
+ print('[datax] {ini} HDFS 源路径空,跳过 datax'.format(ini=ini_path))
|
|
|
|
|
+ return 0
|
|
|
|
|
+
|
|
|
json_path = path_utils.json_output_path(base_dir, ini_path)
|
|
json_path = path_utils.json_output_path(base_dir, ini_path)
|
|
|
os.makedirs(os.path.dirname(json_path), exist_ok=True)
|
|
os.makedirs(os.path.dirname(json_path), exist_ok=True)
|
|
|
|
|
|
|
@@ -58,12 +99,15 @@ def run_job(ini_path: str,
|
|
|
'-start-date', start_date,
|
|
'-start-date', start_date,
|
|
|
'-stop-date', stop_date,
|
|
'-stop-date', stop_date,
|
|
|
]
|
|
]
|
|
|
|
|
+ print('[datax] {ini} 开始生成 json @ worker={w}'.format(ini=ini_path, w=worker_host))
|
|
|
gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host, base_dir,
|
|
gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host, base_dir,
|
|
|
- stdout=stdout, stderr=stderr)
|
|
|
|
|
|
|
+ stdout=stdout, stderr=stderr, tee_to=tee_to)
|
|
|
if gen_rc != 0:
|
|
if gen_rc != 0:
|
|
|
raise RuntimeError('生成 DataX json 失败: ' + ini_path)
|
|
raise RuntimeError('生成 DataX json 失败: ' + ini_path)
|
|
|
|
|
+ print('[datax] {ini} json 生成完成'.format(ini=ini_path))
|
|
|
|
|
|
|
|
if skip_datax:
|
|
if skip_datax:
|
|
|
|
|
+ print('[datax] {ini} skip_datax=True,跳过执行'.format(ini=ini_path))
|
|
|
return 0
|
|
return 0
|
|
|
|
|
|
|
|
exec_argv = [
|
|
exec_argv = [
|
|
@@ -71,21 +115,45 @@ def run_job(ini_path: str,
|
|
|
os.path.join(datax_home, 'bin', 'datax.py'),
|
|
os.path.join(datax_home, 'bin', 'datax.py'),
|
|
|
json_path,
|
|
json_path,
|
|
|
]
|
|
]
|
|
|
- return _run_local_or_remote(exec_argv, worker_host, current_host, base_dir,
|
|
|
|
|
- stdout=stdout, stderr=stderr)
|
|
|
|
|
|
|
+ print('[datax] {ini} 开始执行 datax.py @ worker={w}'.format(ini=ini_path, w=worker_host))
|
|
|
|
|
+ rc = _run_local_or_remote(exec_argv, worker_host, current_host, base_dir,
|
|
|
|
|
+ stdout=stdout, stderr=stderr, tee_to=tee_to)
|
|
|
|
|
+ print('[datax] {ini} datax.py 执行完成 rc={rc}'.format(ini=ini_path, rc=rc))
|
|
|
|
|
+ return rc
|
|
|
|
|
|
|
|
|
|
|
|
|
def _run_local_or_remote(argv, worker_host: str, current_host: str, base_dir: str,
|
|
def _run_local_or_remote(argv, worker_host: str, current_host: str, base_dir: str,
|
|
|
- stdout=None, stderr=None) -> int:
|
|
|
|
|
|
|
+ stdout=None, stderr=None, tee_to=None) -> int:
|
|
|
"""
|
|
"""
|
|
|
- 本机:subprocess.run 直跑 + PYTHONPATH 注入 base_dir(让 python -m dw_base.datax.cli 能找到包)
|
|
|
|
|
- 远端:ssh worker_host 'cd <base_dir> && <cmd>'(cwd 为项目根,同样保证 -m 能找到 dw_base)
|
|
|
|
|
- stdout/stderr 默认继承父进程;batch 层可传文件句柄做每任务独立日志。
|
|
|
|
|
|
|
+ 本机:subprocess.run 直跑 + PYTHONPATH 注入 base_dir
|
|
|
|
|
+ 远端:ssh worker_host 'cd <base_dir> && <cmd>'
|
|
|
|
|
+ tee_to 优先于 stdout/stderr:提供 tee_to 时走 Popen 行循环 tee(文件 + 父 stdout)
|
|
|
"""
|
|
"""
|
|
|
if worker_host == current_host:
|
|
if worker_host == current_host:
|
|
|
env = os.environ.copy()
|
|
env = os.environ.copy()
|
|
|
existing = env.get('PYTHONPATH', '')
|
|
existing = env.get('PYTHONPATH', '')
|
|
|
env['PYTHONPATH'] = base_dir + (os.pathsep + existing if existing else '')
|
|
env['PYTHONPATH'] = base_dir + (os.pathsep + existing if existing else '')
|
|
|
|
|
+ if tee_to is not None:
|
|
|
|
|
+ return _run_with_tee(argv, tee_to, env=env)
|
|
|
return subprocess.run(argv, stdout=stdout, stderr=stderr, env=env).returncode
|
|
return subprocess.run(argv, stdout=stdout, stderr=stderr, env=env).returncode
|
|
|
remote_cmd = 'cd ' + shlex.quote(base_dir) + ' && ' + ' '.join(shlex.quote(a) for a in argv)
|
|
remote_cmd = 'cd ' + shlex.quote(base_dir) + ' && ' + ' '.join(shlex.quote(a) for a in argv)
|
|
|
- return subprocess.run(['ssh', worker_host, remote_cmd], stdout=stdout, stderr=stderr).returncode
|
|
|
|
|
|
|
+ ssh_argv = ['ssh', worker_host, remote_cmd]
|
|
|
|
|
+ if tee_to is not None:
|
|
|
|
|
+ return _run_with_tee(ssh_argv, tee_to)
|
|
|
|
|
+ return subprocess.run(ssh_argv, stdout=stdout, stderr=stderr).returncode
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _run_with_tee(argv, log_fh, env=None) -> int:
|
|
|
|
|
+ """
|
|
|
|
|
+ Popen + 行循环 tee:subprocess 的 stdout/stderr 合并后同时写 log_fh 和 sys.stdout。
|
|
|
|
|
+ 对齐老脚本 bash 层的 `| tee LOG_FILE` 行为(串行模式独立 log 文件)。
|
|
|
|
|
+ """
|
|
|
|
|
+ proc = subprocess.Popen(argv,
|
|
|
|
|
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
|
|
|
|
+ env=env, bufsize=1, universal_newlines=True)
|
|
|
|
|
+ for line in proc.stdout:
|
|
|
|
|
+ sys.stdout.write(line)
|
|
|
|
|
+ sys.stdout.flush()
|
|
|
|
|
+ log_fh.write(line)
|
|
|
|
|
+ log_fh.flush()
|
|
|
|
|
+ return proc.wait()
|