# -*- coding:utf-8 -*- """ DataX 单任务执行器(从 bin/datax-single-job-starter.sh 搬迁 Python 版)。 流程(对齐老脚本 generate_job_config + run_single_datax_job): 1. (hdfs reader 场景)HDFS 源路径存在性 check,对齐老 check_data_exists 2. 生成 json (调 -m dw_base.datax.cli gen-json,本机 subprocess / 远端 ssh) 3. 执行 datax.py(同样本机 / 远端分发) stdout 输出模式三选一: - 默认 stdout=None, tee_to=None → 继承父进程(最简) - stdout=fh, tee_to=None → 只写 fh(并行模式,不回父 stdout,对齐老 > LOG 2>&1 &) - stdout=None, tee_to=fh → tee 到 fh + 父 stdout(串行模式,对齐老 | tee LOG_FILE) """ import os import shlex import subprocess import sys from configparser import ConfigParser from dw_base.datax import path_utils def _hdfs_src_check(ini_path: str, start_date: str) -> str: """ HDFS 源路径存在性 + 空检查(对齐老 datax-single-job-starter.sh:128-146 check_data_exists)。 仅对 reader 是 hdfs 的 ini 触发;其他 reader 返回 'n/a'。 Returns: 'ok' - 路径存在且有数据 'missing' - 路径不存在(hadoop fs -test -e 失败) 'empty' - 路径存在但空(hadoop fs -du -s 返回 0) 'n/a' - 非 hdfs reader,不需要检查 """ cp = ConfigParser() cp.read(ini_path) if not cp.has_option('reader', 'dataSource'): return 'n/a' ds = cp.get('reader', 'dataSource') if not ds.startswith('hdfs/'): return 'n/a' if not cp.has_option('reader', 'path'): return 'n/a' path = cp.get('reader', 'path') # 替换占位符(对齐 hdfs_reader.load_others L27-32) path = path.replace('${start_date}', start_date) path = path.replace('${start-date}', start_date) path = path.replace('${dt}', start_date) # hadoop fs -test -e if subprocess.run(['hadoop', 'fs', '-test', '-e', path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode != 0: return 'missing' # hadoop fs -du -s → 第一列是字节数 r = subprocess.run(['hadoop', 'fs', '-du', '-s', path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if r.returncode == 0: try: size = int(r.stdout.decode('utf-8', errors='replace').strip().split()[0]) if size == 0: return 'empty' except (IndexError, ValueError): pass return 'ok' def run_job(ini_path: str, start_date: str, stop_date: str, worker_host: str, current_host: str, base_dir: str, python3_path: str, datax_home: str, skip_datax: bool = False, skip_check: bool = False, speed_overrides: dict = None, stdout=None, stderr=None, tee_to=None) -> int: """ 单任务执行。 skip_check=False(默认):hdfs reader 跑源路径存在性 check,missing/empty → return 1 失败 skip_check=True:跳过整段 check,直接生成 json + 跑 datax(对齐老 silent skip 行为) speed_overrides: L3 CLI 透传到 gen-json,形如 {'channel': 20, 'byte': None, 'record': 50000} """ # 1. HDFS 源路径 check(默认开启;--skip-check 显式关闭后走老 silent skip 行为) if not skip_check: src_state = _hdfs_src_check(ini_path, start_date) if src_state == 'missing': print('[datax] {ini} HDFS 源路径不存在,任务失败(如确需跳过请加 -skip-check)'.format(ini=ini_path)) return 1 if src_state == 'empty': print('[datax] {ini} HDFS 源路径空,任务失败(如确需跳过请加 -skip-check)'.format(ini=ini_path)) return 1 json_path = path_utils.json_output_path(base_dir, ini_path) os.makedirs(os.path.dirname(json_path), exist_ok=True) gen_argv = [ python3_path, '-u', '-m', 'dw_base.datax.cli', 'gen-json', ini_path, '-start-date', start_date, '-stop-date', stop_date, ] # L3 CLI speed 透传(None 不传) if speed_overrides: for key in ('channel', 'byte', 'record'): if speed_overrides.get(key) is not None: gen_argv.extend(['-' + key, str(speed_overrides[key])]) print('[datax] {ini} 开始生成 json @ worker={w}'.format(ini=ini_path, w=worker_host)) gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host, base_dir, stdout=stdout, stderr=stderr, tee_to=tee_to) if gen_rc != 0: raise RuntimeError('生成 DataX json 失败: ' + ini_path) print('[datax] {ini} json 生成完成'.format(ini=ini_path)) if skip_datax: print('[datax] {ini} skip_datax=True,跳过执行'.format(ini=ini_path)) return 0 exec_argv = [ python3_path, '-u', os.path.join(datax_home, 'bin', 'datax.py'), json_path, ] print('[datax] {ini} 开始执行 datax.py @ worker={w}'.format(ini=ini_path, w=worker_host)) rc = _run_local_or_remote(exec_argv, worker_host, current_host, base_dir, stdout=stdout, stderr=stderr, tee_to=tee_to) print('[datax] {ini} datax.py 执行完成 rc={rc}'.format(ini=ini_path, rc=rc)) return rc def _run_local_or_remote(argv, worker_host: str, current_host: str, base_dir: str, stdout=None, stderr=None, tee_to=None) -> int: """ 本机:subprocess.run 直跑 + PYTHONPATH 注入 base_dir 远端:ssh worker_host 'cd && ' tee_to 优先于 stdout/stderr:提供 tee_to 时走 Popen 行循环 tee(文件 + 父 stdout) """ if worker_host == current_host: env = os.environ.copy() existing = env.get('PYTHONPATH', '') env['PYTHONPATH'] = base_dir + (os.pathsep + existing if existing else '') if tee_to is not None: return _run_with_tee(argv, tee_to, env=env) return subprocess.run(argv, stdout=stdout, stderr=stderr, env=env).returncode remote_cmd = 'cd ' + shlex.quote(base_dir) + ' && ' + ' '.join(shlex.quote(a) for a in argv) ssh_argv = ['ssh', worker_host, remote_cmd] if tee_to is not None: return _run_with_tee(ssh_argv, tee_to) return subprocess.run(ssh_argv, stdout=stdout, stderr=stderr).returncode def _run_with_tee(argv, log_fh, env=None) -> int: """ Popen + 行循环 tee:subprocess 的 stdout/stderr 合并后同时写 log_fh 和 sys.stdout。 对齐老脚本 bash 层的 `| tee LOG_FILE` 行为(串行模式独立 log 文件)。 """ proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env, bufsize=1, universal_newlines=True) for line in proc.stdout: sys.stdout.write(line) sys.stdout.flush() log_fh.write(line) log_fh.flush() return proc.wait()