| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- # -*- coding:utf-8 -*-
- """
- DataX 单任务执行器(从 bin/datax-single-job-starter.sh 搬迁 Python 版)。
- 流程(对齐老脚本 generate_job_config + run_single_datax_job):
- 1. (hdfs reader 场景)HDFS 源路径存在性 check,对齐老 check_data_exists
- 2. 生成 json (调 -m dw_base.datax.cli gen-json,本机 subprocess / 远端 ssh)
- 3. 执行 datax.py(同样本机 / 远端分发)
- stdout 输出模式三选一:
- - 默认 stdout=None, tee_to=None → 继承父进程(最简)
- - stdout=fh, tee_to=None → 只写 fh(并行模式,不回父 stdout,对齐老 > LOG 2>&1 &)
- - stdout=None, tee_to=fh → tee 到 fh + 父 stdout(串行模式,对齐老 | tee LOG_FILE)
- """
- import os
- import shlex
- import subprocess
- import sys
- from configparser import ConfigParser
- from dw_base.datax import path_utils
- def _hdfs_src_check(ini_path: str, start_date: str) -> str:
- """
- HDFS 源路径存在性 + 空检查(对齐老 datax-single-job-starter.sh:128-146 check_data_exists)。
- 仅对 reader 是 hdfs 的 ini 触发;其他 reader 返回 'n/a'。
- Returns:
- 'ok' - 路径存在且有数据
- 'missing' - 路径不存在(hadoop fs -test -e 失败)
- 'empty' - 路径存在但空(hadoop fs -du -s 返回 0)
- 'n/a' - 非 hdfs reader,不需要检查
- """
- cp = ConfigParser()
- cp.read(ini_path)
- if not cp.has_option('reader', 'dataSource'):
- return 'n/a'
- ds = cp.get('reader', 'dataSource')
- if not ds.startswith('hdfs/'):
- return 'n/a'
- if not cp.has_option('reader', 'path'):
- return 'n/a'
- path = cp.get('reader', 'path')
- # 替换占位符(对齐 hdfs_reader.load_others L27-32)
- path = path.replace('${start_date}', start_date)
- path = path.replace('${start-date}', start_date)
- path = path.replace('${dt}', start_date)
- # hadoop fs -test -e
- if subprocess.run(['hadoop', 'fs', '-test', '-e', path],
- stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode != 0:
- return 'missing'
- # hadoop fs -du -s → 第一列是字节数
- r = subprocess.run(['hadoop', 'fs', '-du', '-s', path],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- if r.returncode == 0:
- try:
- size = int(r.stdout.decode('utf-8', errors='replace').strip().split()[0])
- if size == 0:
- return 'empty'
- except (IndexError, ValueError):
- pass
- return 'ok'
- def run_job(ini_path: str,
- start_date: str,
- stop_date: str,
- worker_host: str,
- current_host: str,
- base_dir: str,
- python3_path: str,
- datax_home: str,
- skip_datax: bool = False,
- skip_check: bool = False,
- speed_overrides: dict = None,
- stdout=None,
- stderr=None,
- tee_to=None) -> int:
- """
- 单任务执行。
- skip_check=False(默认):hdfs reader 跑源路径存在性 check,missing/empty → return 1 失败
- skip_check=True:跳过整段 check,直接生成 json + 跑 datax(对齐老 silent skip 行为)
- speed_overrides: L3 CLI 透传到 gen-json,形如 {'channel': 20, 'byte': None, 'record': 50000}
- """
- # 1. HDFS 源路径 check(默认开启;--skip-check 显式关闭后走老 silent skip 行为)
- if not skip_check:
- src_state = _hdfs_src_check(ini_path, start_date)
- if src_state == 'missing':
- print('[datax] {ini} HDFS 源路径不存在,任务失败(如确需跳过请加 -skip-check)'.format(ini=ini_path))
- return 1
- if src_state == 'empty':
- print('[datax] {ini} HDFS 源路径空,任务失败(如确需跳过请加 -skip-check)'.format(ini=ini_path))
- return 1
- json_path = path_utils.json_output_path(base_dir, ini_path)
- os.makedirs(os.path.dirname(json_path), exist_ok=True)
- gen_argv = [
- python3_path, '-u',
- '-m', 'dw_base.datax.cli',
- 'gen-json', ini_path,
- '-start-date', start_date,
- '-stop-date', stop_date,
- ]
- # L3 CLI speed 透传(None 不传)
- if speed_overrides:
- for key in ('channel', 'byte', 'record'):
- if speed_overrides.get(key) is not None:
- gen_argv.extend(['-' + key, str(speed_overrides[key])])
- print('[datax] {ini} 开始生成 json @ worker={w}'.format(ini=ini_path, w=worker_host))
- gen_rc = _run_local_or_remote(gen_argv, worker_host, current_host, base_dir,
- stdout=stdout, stderr=stderr, tee_to=tee_to)
- if gen_rc != 0:
- raise RuntimeError('生成 DataX json 失败: ' + ini_path)
- print('[datax] {ini} json 生成完成'.format(ini=ini_path))
- if skip_datax:
- print('[datax] {ini} skip_datax=True,跳过执行'.format(ini=ini_path))
- return 0
- exec_argv = [
- python3_path, '-u',
- os.path.join(datax_home, 'bin', 'datax.py'),
- json_path,
- ]
- print('[datax] {ini} 开始执行 datax.py @ worker={w}'.format(ini=ini_path, w=worker_host))
- rc = _run_local_or_remote(exec_argv, worker_host, current_host, base_dir,
- stdout=stdout, stderr=stderr, tee_to=tee_to)
- print('[datax] {ini} datax.py 执行完成 rc={rc}'.format(ini=ini_path, rc=rc))
- return rc
- def _run_local_or_remote(argv, worker_host: str, current_host: str, base_dir: str,
- stdout=None, stderr=None, tee_to=None) -> int:
- """
- 本机:subprocess.run 直跑 + PYTHONPATH 注入 base_dir
- 远端:ssh worker_host 'cd <base_dir> && <cmd>'
- tee_to 优先于 stdout/stderr:提供 tee_to 时走 Popen 行循环 tee(文件 + 父 stdout)
- """
- if worker_host == current_host:
- env = os.environ.copy()
- existing = env.get('PYTHONPATH', '')
- env['PYTHONPATH'] = base_dir + (os.pathsep + existing if existing else '')
- if tee_to is not None:
- return _run_with_tee(argv, tee_to, env=env)
- return subprocess.run(argv, stdout=stdout, stderr=stderr, env=env).returncode
- remote_cmd = 'cd ' + shlex.quote(base_dir) + ' && ' + ' '.join(shlex.quote(a) for a in argv)
- ssh_argv = ['ssh', worker_host, remote_cmd]
- if tee_to is not None:
- return _run_with_tee(ssh_argv, tee_to)
- return subprocess.run(ssh_argv, stdout=stdout, stderr=stderr).returncode
- def _run_with_tee(argv, log_fh, env=None) -> int:
- """
- Popen + 行循环 tee:subprocess 的 stdout/stderr 合并后同时写 log_fh 和 sys.stdout。
- 对齐老脚本 bash 层的 `| tee LOG_FILE` 行为(串行模式独立 log 文件)。
- """
- proc = subprocess.Popen(argv,
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
- env=env, bufsize=1, universal_newlines=True)
- for line in proc.stdout:
- sys.stdout.write(line)
- sys.stdout.flush()
- log_fh.write(line)
- log_fh.flush()
- return proc.wait()
|