Kaynağa Gözat

feat(datax): 新增 entry 模块(DataxImport / DataxExport 门面)

_BaseDatax 共享 workers pool + 环境判定 + _make_run_one 闭包
DataxImport: 分区预建(partition.execute_ddls)→ batch 调度
DataxExport: 无分区预建;源 HDFS 存在性 check 暂未搬迁(老 check_data_exists)
parallel 模式下每任务 stdout/stderr 重定向到独立 log 文件

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 1 hafta önce
ebeveyn
işleme
6e7192d02d
1 değiştirilmiş dosya ile 163 ekleme ve 0 silme
  1. 163 0
      dw_base/datax/entry.py

+ 163 - 0
dw_base/datax/entry.py

@@ -0,0 +1,163 @@
+# -*- coding:utf-8 -*-
+"""
+DataX 入口门面(对应 bin/datax-{hive-import,hdfs-export}-starter.{sh,py} 底层)。
+
+- DataxImport: 目标=Hive/HDFS,自动预建 Hive 分区;典型 jobs/raw/
+- DataxExport: 源=HDFS,导出到外部;典型 jobs/ads/,无分区预建
+
+共同流程:expand ini → for each (串/并行) → runner.run_job。
+Import 在 for 之前额外调 partition.execute_ddls 预建 Hive 分区。
+
+分布式 worker 选择当前仍保留(老参数平迁);若 kb/93 ADR-02 正式采纳,
+worker.select_worker 会降级为"永远返回 current_host",此处无需改。
+"""
+import getpass
+import os
+import socket
+from typing import List, Optional
+
+from dw_base.datax import batch, partition, path_utils, runner, worker
+
+
+def _is_release_user(release_user: str) -> bool:
+    return getpass.getuser() == release_user
+
+
+def _is_in_release_dir(base_dir: str, release_root_dir: str, project_name: str) -> bool:
+    expected = os.path.abspath(os.path.join(release_root_dir, project_name))
+    return os.path.abspath(base_dir).startswith(expected)
+
+
+class _BaseDatax:
+    """共享基类:加载 workers pool、解析环境、构造 run_one 闭包。"""
+
+    def __init__(self,
+                 base_dir: str,
+                 workers_ini_path: str,
+                 release_user: str,
+                 release_root_dir: str,
+                 python3_path: str,
+                 datax_home: str,
+                 log_root_dir: str,
+                 log_module: str):
+        self.base_dir = os.path.abspath(base_dir)
+        self.pool = worker.load_workers_ini(workers_ini_path)
+        self.release_user = release_user
+        self.release_root_dir = release_root_dir
+        self.python3_path = python3_path
+        self.datax_home = datax_home
+        self.log_root_dir = log_root_dir
+        self.log_module = log_module
+        self.project_name = os.path.basename(self.base_dir)
+        self.current_host = socket.gethostname().split('.')[0]
+
+    def _make_run_one(self,
+                      start_date: str,
+                      stop_date: str,
+                      host: Optional[str],
+                      use_random: bool,
+                      parallel: bool,
+                      skip_datax: bool):
+        is_rel_user = _is_release_user(self.release_user)
+        is_in_rel_dir = _is_in_release_dir(self.base_dir, self.release_root_dir, self.project_name)
+
+        def _run_one(ini_path: str) -> int:
+            w = worker.select_worker(
+                self.pool,
+                is_release_user=is_rel_user,
+                is_in_release_dir=is_in_rel_dir,
+                current_host=self.current_host,
+                host=host,
+                use_random=use_random,
+            )
+            job_name = path_utils.job_name_from_ini(ini_path)
+            log_file = path_utils.log_path(self.log_root_dir, self.log_module, start_date, job_name)
+            os.makedirs(os.path.dirname(log_file), exist_ok=True)
+            # 并行:每任务独立 log 文件(输出不回父 stdout,对齐老 > LOG 2>&1 &)
+            # 串行:继承父 stdout(用户可 tail 文件或靠外层 bash tee)
+            if parallel:
+                with open(log_file, 'a', encoding='utf-8') as fh:
+                    return runner.run_job(
+                        ini_path=ini_path, start_date=start_date, stop_date=stop_date,
+                        worker_host=w, current_host=self.current_host,
+                        base_dir=self.base_dir, python3_path=self.python3_path,
+                        datax_home=self.datax_home,
+                        skip_datax=skip_datax,
+                        stdout=fh, stderr=fh,
+                    )
+            return runner.run_job(
+                ini_path=ini_path, start_date=start_date, stop_date=stop_date,
+                worker_host=w, current_host=self.current_host,
+                base_dir=self.base_dir, python3_path=self.python3_path,
+                datax_home=self.datax_home,
+                skip_datax=skip_datax,
+            )
+        return _run_one
+
+
+class DataxImport(_BaseDatax):
+    """目标=Hive 导入(自动预建分区)。"""
+
+    def __init__(self, **kwargs):
+        super().__init__(log_module='datax', **kwargs)
+
+    def run(self,
+            inis: List[str],
+            inis_dirs: List[str],
+            start_date: str,
+            stop_date: str,
+            host: Optional[str] = None,
+            use_random: bool = False,
+            parallel: bool = False,
+            skip_partition: bool = False,
+            skip_datax: bool = False,
+            extra_partition_tables: Optional[List[str]] = None) -> int:
+        """
+        Returns: 失败任务数(0 = 全部成功)
+        """
+        ini_list = batch.expand_ini_inputs(inis, inis_dirs)
+        if not ini_list:
+            return 0
+
+        if not skip_partition:
+            ddls = []
+            for ini in ini_list:
+                ddl = partition.parse_ini_partition(ini, stop_date)
+                if ddl:
+                    ddls.append(ddl)
+            if extra_partition_tables:
+                dt = partition.compute_partition_dt(stop_date)
+                for tbl in extra_partition_tables:
+                    ddls.append('ALTER TABLE {tbl} ADD IF NOT EXISTS PARTITION(dt={dt});'.format(
+                        tbl=tbl, dt=dt))
+            partition.execute_ddls(ddls)
+
+        run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax)
+        _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
+        return failed
+
+
+class DataxExport(_BaseDatax):
+    """源=HDFS 导出(无分区预建;源路径存在性 check 沿用老脚本 check_data_exists 行为,暂未搬迁)。"""
+
+    def __init__(self, **kwargs):
+        super().__init__(log_module='datax', **kwargs)
+
+    def run(self,
+            inis: List[str],
+            inis_dirs: List[str],
+            start_date: str,
+            stop_date: str,
+            host: Optional[str] = None,
+            use_random: bool = False,
+            parallel: bool = False,
+            skip_datax: bool = False) -> int:
+        """
+        Returns: 失败任务数(0 = 全部成功)
+        """
+        ini_list = batch.expand_ini_inputs(inis, inis_dirs)
+        if not ini_list:
+            return 0
+        run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax)
+        _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
+        return failed