| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- # -*- coding:utf-8 -*-
- """
- DataX 入口门面(对应 bin/datax-{hive-import,hdfs-export}-starter.{sh,py} 底层)。
- - DataxImport: 目标=Hive/HDFS,自动预建 Hive 分区;典型 jobs/raw/
- - DataxExport: 源=HDFS,导出到外部;典型 jobs/ads/,无分区预建
- 共同流程:expand ini → for each (串/并行) → runner.run_job。
- Import 在 for 之前额外调 partition.execute_ddls 预建 Hive 分区。
- 分布式 worker 选择当前仍保留(老参数平迁);若 kb/93 ADR-02 正式采纳,
- worker.select_worker 会降级为"永远返回 current_host",此处无需改。
- """
- import getpass
- import os
- import socket
- from typing import List, Optional
- from dw_base.datax import batch, partition, path_utils, runner, worker
- def _is_release_user(release_user: str) -> bool:
- return getpass.getuser() == release_user
- def _is_in_release_dir(base_dir: str, release_root_dir: str, project_name: str) -> bool:
- expected = os.path.abspath(os.path.join(release_root_dir, project_name))
- return os.path.abspath(base_dir).startswith(expected)
- def _resolve_relative_to_base(path: str, base_dir: str) -> str:
- """ini / inis 相对路径按 base_dir 解析(不依赖 Python 进程 cwd)。绝对路径原样返回。"""
- if os.path.isabs(path):
- return path
- return os.path.join(base_dir, path)
- class _BaseDatax:
- """共享基类:加载 workers pool、解析环境、构造 run_one 闭包。"""
- def __init__(self,
- base_dir: str,
- workers_ini_path: str,
- release_user: str,
- release_root_dir: str,
- python3_path: str,
- datax_home: str,
- log_root_dir: str,
- log_module: str):
- self.base_dir = os.path.abspath(base_dir)
- self.pool = worker.load_workers_ini(workers_ini_path)
- self.release_user = release_user
- self.release_root_dir = release_root_dir
- self.python3_path = python3_path
- self.datax_home = datax_home
- self.log_root_dir = log_root_dir
- self.log_module = log_module
- self.project_name = os.path.basename(self.base_dir)
- self.current_host = socket.gethostname().split('.')[0]
- def _make_run_one(self,
- start_date: str,
- stop_date: str,
- host: Optional[str],
- use_random: bool,
- parallel: bool,
- skip_datax: bool):
- is_rel_user = _is_release_user(self.release_user)
- is_in_rel_dir = _is_in_release_dir(self.base_dir, self.release_root_dir, self.project_name)
- def _run_one(ini_path: str) -> int:
- w = worker.select_worker(
- self.pool,
- is_release_user=is_rel_user,
- is_in_release_dir=is_in_rel_dir,
- current_host=self.current_host,
- host=host,
- use_random=use_random,
- )
- job_name = path_utils.job_name_from_ini(ini_path)
- log_file = path_utils.log_path(self.log_root_dir, self.log_module, start_date, job_name)
- os.makedirs(os.path.dirname(log_file), exist_ok=True)
- print('[datax] ini={j} worker={w} log={lf}'.format(j=job_name, w=w, lf=log_file))
- # 并行:每任务独立 log 文件(输出不回父 stdout,对齐老 > LOG 2>&1 &)
- # 串行:tee 到独立 log 文件 + 父 stdout(对齐老 | tee LOG_FILE)
- if parallel:
- with open(log_file, 'a', encoding='utf-8') as fh:
- rc = runner.run_job(
- ini_path=ini_path, start_date=start_date, stop_date=stop_date,
- worker_host=w, current_host=self.current_host,
- base_dir=self.base_dir, python3_path=self.python3_path,
- datax_home=self.datax_home,
- skip_datax=skip_datax,
- stdout=fh, stderr=fh,
- )
- print('[datax] ini={j} done rc={rc}'.format(j=job_name, rc=rc))
- return rc
- with open(log_file, 'a', encoding='utf-8') as fh:
- rc = runner.run_job(
- ini_path=ini_path, start_date=start_date, stop_date=stop_date,
- worker_host=w, current_host=self.current_host,
- base_dir=self.base_dir, python3_path=self.python3_path,
- datax_home=self.datax_home,
- skip_datax=skip_datax,
- tee_to=fh,
- )
- print('[datax] ini={j} done rc={rc}'.format(j=job_name, rc=rc))
- return rc
- return _run_one
- class DataxImport(_BaseDatax):
- """目标=Hive 导入(自动预建分区)。"""
- def __init__(self, **kwargs):
- super().__init__(log_module='datax', **kwargs)
- def run(self,
- inis: List[str],
- inis_dirs: List[str],
- start_date: str,
- stop_date: str,
- host: Optional[str] = None,
- use_random: bool = False,
- parallel: bool = False,
- skip_partition: bool = False,
- skip_datax: bool = False,
- extra_partition_tables: Optional[List[str]] = None) -> int:
- """
- Returns: 失败任务数(0 = 全部成功)
- """
- resolved_inis = [_resolve_relative_to_base(p, self.base_dir) for p in inis]
- resolved_dirs = [_resolve_relative_to_base(p, self.base_dir) for p in inis_dirs]
- ini_list = batch.expand_ini_inputs(resolved_inis, resolved_dirs)
- if not ini_list:
- return 0
- if not skip_partition:
- ddls = []
- for ini in ini_list:
- ddl = partition.parse_ini_partition(ini, stop_date)
- if ddl:
- ddls.append(ddl)
- if extra_partition_tables:
- dt = partition.compute_partition_dt(stop_date)
- for tbl in extra_partition_tables:
- ddls.append('ALTER TABLE {tbl} ADD IF NOT EXISTS PARTITION(dt={dt});'.format(
- tbl=tbl, dt=dt))
- partition.execute_ddls(ddls)
- run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax)
- _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
- return failed
- class DataxExport(_BaseDatax):
- """源=HDFS 导出(无分区预建;源路径存在性 check 沿用老脚本 check_data_exists 行为,暂未搬迁)。"""
- def __init__(self, **kwargs):
- super().__init__(log_module='datax', **kwargs)
- def run(self,
- inis: List[str],
- inis_dirs: List[str],
- start_date: str,
- stop_date: str,
- host: Optional[str] = None,
- use_random: bool = False,
- parallel: bool = False,
- skip_datax: bool = False) -> int:
- """
- Returns: 失败任务数(0 = 全部成功)
- """
- resolved_inis = [_resolve_relative_to_base(p, self.base_dir) for p in inis]
- resolved_dirs = [_resolve_relative_to_base(p, self.base_dir) for p in inis_dirs]
- ini_list = batch.expand_ini_inputs(resolved_inis, resolved_dirs)
- if not ini_list:
- return 0
- run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax)
- _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
- return failed
|