# -*- coding:utf-8 -*- """ DataX 入口门面(对应 bin/datax-{hive-import,hdfs-export}-starter.{sh,py} 底层)。 - DataxImport: 目标=Hive/HDFS,自动预建 Hive 分区;典型 jobs/raw/ - DataxExport: 源=HDFS,导出到外部;典型 jobs/ads/,无分区预建 共同流程:expand ini → for each (串/并行) → runner.run_job。 Import 在 for 之前额外调 partition.execute_ddls 预建 Hive 分区。 分布式 worker 选择当前仍保留(老参数平迁);若 kb/93 ADR-02 正式采纳, worker.select_worker 会降级为"永远返回 current_host",此处无需改。 """ import getpass import os import socket from datetime import datetime, timedelta from typing import List, Optional from dw_base.datax import batch, partition, path_utils, runner, worker def _is_release_user(release_user: str) -> bool: return getpass.getuser() == release_user def _is_in_release_dir(base_dir: str, release_root_dir: str, project_name: str) -> bool: expected = os.path.abspath(os.path.join(release_root_dir, project_name)) return os.path.abspath(base_dir).startswith(expected) def _resolve_relative_to_base(path: str, base_dir: str) -> str: """ini / inis 相对路径按 base_dir 解析(不依赖 Python 进程 cwd)。绝对路径原样返回。""" if os.path.isabs(path): return path return os.path.join(base_dir, path) class _BaseDatax: """共享基类:加载 workers pool、解析环境、构造 run_one 闭包。""" def __init__(self, base_dir: str, workers_ini_path: str, release_user: str, release_root_dir: str, python3_path: str, datax_home: str, log_root_dir: str, log_module: str): self.base_dir = os.path.abspath(base_dir) self.pool = worker.load_workers_ini(workers_ini_path) self.release_user = release_user self.release_root_dir = release_root_dir self.python3_path = python3_path self.datax_home = datax_home self.log_root_dir = log_root_dir self.log_module = log_module self.project_name = os.path.basename(self.base_dir) self.current_host = socket.gethostname().split('.')[0] def _make_run_one(self, start_date: str, stop_date: str, host: Optional[str], use_random: bool, parallel: bool, skip_datax: bool, skip_check: bool, speed_overrides: Optional[dict] = None): is_rel_user = _is_release_user(self.release_user) is_in_rel_dir = _is_in_release_dir(self.base_dir, self.release_root_dir, self.project_name) def _run_one(ini_path: str) -> int: w = worker.select_worker( self.pool, is_release_user=is_rel_user, is_in_release_dir=is_in_rel_dir, current_host=self.current_host, host=host, use_random=use_random, ) job_name = path_utils.job_name_from_ini(ini_path) log_file = path_utils.log_path(self.log_root_dir, self.log_module, start_date, job_name) os.makedirs(os.path.dirname(log_file), exist_ok=True) print('[datax] ini={j} worker={w} log={lf}'.format(j=job_name, w=w, lf=log_file)) # 并行:每任务独立 log 文件(输出不回父 stdout,对齐老 > LOG 2>&1 &) # 串行:tee 到独立 log 文件 + 父 stdout(对齐老 | tee LOG_FILE) if parallel: with open(log_file, 'a', encoding='utf-8') as fh: rc = runner.run_job( ini_path=ini_path, start_date=start_date, stop_date=stop_date, worker_host=w, current_host=self.current_host, base_dir=self.base_dir, python3_path=self.python3_path, datax_home=self.datax_home, skip_datax=skip_datax, skip_check=skip_check, speed_overrides=speed_overrides, stdout=fh, stderr=fh, ) print('[datax] ini={j} done rc={rc}'.format(j=job_name, rc=rc)) return rc with open(log_file, 'a', encoding='utf-8') as fh: rc = runner.run_job( ini_path=ini_path, start_date=start_date, stop_date=stop_date, worker_host=w, current_host=self.current_host, base_dir=self.base_dir, python3_path=self.python3_path, datax_home=self.datax_home, skip_datax=skip_datax, skip_check=skip_check, speed_overrides=speed_overrides, tee_to=fh, ) print('[datax] ini={j} done rc={rc}'.format(j=job_name, rc=rc)) return rc return _run_one class DataxImport(_BaseDatax): """目标=Hive 导入(自动预建分区)。""" def __init__(self, **kwargs): super().__init__(log_module='datax', **kwargs) def run(self, inis: List[str], inis_dirs: List[str], start_date: str, stop_date: str, host: Optional[str] = None, use_random: bool = False, parallel: bool = False, skip_partition: bool = False, skip_datax: bool = False, speed_overrides: Optional[dict] = None) -> int: """ Returns: 失败任务数(0 = 全部成功) """ resolved_inis = [_resolve_relative_to_base(p, self.base_dir) for p in inis] resolved_dirs = [_resolve_relative_to_base(p, self.base_dir) for p in inis_dirs] ini_list = batch.expand_ini_inputs(resolved_inis, resolved_dirs) if not ini_list: return 0 if not skip_partition: ddls = [] for ini in ini_list: ddl = partition.parse_ini_partition(ini, stop_date) if ddl: ddls.append(ddl) partition.execute_ddls(ddls) # import 的 reader 永非 hdfs,runner._hdfs_src_check 早返 n/a,skip_check 透传无意义,固定 False run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax, skip_check=False, speed_overrides=speed_overrides) _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel) return failed def backfill(self, inis: List[str], inis_dirs: List[str], start_date: str, stop_date: str, host: Optional[str] = None, use_random: bool = False, parallel: bool = False, skip_partition: bool = False, skip_datax: bool = False, speed_overrides: Optional[dict] = None) -> int: """ 存量回填:start_date/stop_date 作外层范围 [含, 不含),按日循环调 self.run() 单日语义。 失败不中断,继续下一天;返回跨天失败任务总数(0 = 全部成功)。 """ day = datetime.strptime(start_date, '%Y%m%d').date() stop = datetime.strptime(stop_date, '%Y%m%d').date() if stop <= day: raise ValueError('backfill: stop_date 必须大于 start_date') total_days = (stop - day).days print('[backfill] 开始回填 {n} 天:{s} → {e}'.format( n=total_days, s=day.strftime('%Y%m%d'), e=(stop - timedelta(days=1)).strftime('%Y%m%d'), )) total_failed = 0 failed_days = [] while day < stop: day_plus_1 = day + timedelta(days=1) sd = day.strftime('%Y%m%d') ed = day_plus_1.strftime('%Y%m%d') print('[backfill] {d} 开始'.format(d=sd)) failed = self.run( inis=inis, inis_dirs=inis_dirs, start_date=sd, stop_date=ed, host=host, use_random=use_random, parallel=parallel, skip_partition=skip_partition, skip_datax=skip_datax, speed_overrides=speed_overrides, ) if failed > 0: total_failed += failed failed_days.append(sd) print('[backfill] {d} 失败任务 {f} 个'.format(d=sd, f=failed)) day = day_plus_1 print('[backfill] 完成:成功 {s} 天 / 失败 {f} 天 / 失败任务总 {t} 个'.format( s=total_days - len(failed_days), f=len(failed_days), t=total_failed)) if failed_days: print('[backfill] 失败天列表:' + ','.join(failed_days)) return total_failed class DataxExport(_BaseDatax): """源=HDFS 导出(无分区预建)。HDFS 源存在性 check 默认开启,missing/empty → 失败;-skip-check 关闭后走老 silent skip。""" def __init__(self, **kwargs): super().__init__(log_module='datax', **kwargs) def run(self, inis: List[str], inis_dirs: List[str], start_date: str, stop_date: str, host: Optional[str] = None, use_random: bool = False, parallel: bool = False, skip_datax: bool = False, skip_check: bool = False, speed_overrides: Optional[dict] = None) -> int: """ Returns: 失败任务数(0 = 全部成功) """ resolved_inis = [_resolve_relative_to_base(p, self.base_dir) for p in inis] resolved_dirs = [_resolve_relative_to_base(p, self.base_dir) for p in inis_dirs] ini_list = batch.expand_ini_inputs(resolved_inis, resolved_dirs) if not ini_list: return 0 run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax, skip_check, speed_overrides) _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel) return failed