فهرست منبع

feat(datax): 新增 partition 模块(parse_ddl 搬迁 + hive ALTER 执行)

parse_ini_partition 从 ini writer.path 抽 {db}.{table} 返回 ALTER DDL
compute_partition_dt 用 stop_date - 1 day 与 HDFS writer 对齐
(不沿用老脚本 START_DATE=dt 假设,多日范围不再错位;kb/90 §2.6 实现建议 5)
execute_ddls 调 hive -e 批量执行,空列表 no-op

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 1 هفته پیش
والد
کامیت
5fe9c80f52
1فایلهای تغییر یافته به همراه77 افزوده شده و 0 حذف شده
  1. 77 0
      dw_base/datax/partition.py

+ 77 - 0
dw_base/datax/partition.py

@@ -0,0 +1,77 @@
+# -*- coding:utf-8 -*-
+"""
+DataX hive-import 场景的分区管理(从 bin/datax-multiple-hive-job-starter.sh:132-158 parse_ddl
++ L229-241 HIVE_DDL 执行段搬迁 Python 版)。
+
+职责:
+- parse_ini_partition: 从 ini 的 writer.path 抽 {db}.{table},返回 ALTER DDL;非分区表返回 None
+- execute_ddls: 调 hive -e 执行一批 ALTER DDL
+
+关键约束(kb/90 §2.6 实现建议 5):分区 dt 用 stop_date - 1 day,与 HDFS writer 对齐;
+不沿用老脚本 START_DATE = dt 的假设(单日范围下两者相等没问题,多日范围错位)。
+"""
+import subprocess
+from configparser import ConfigParser
+from datetime import datetime, timedelta
+from typing import List, Optional
+
+
+_DT_PLACEHOLDER = '/dt=${dt}'
+
+
+def compute_partition_dt(stop_date: str) -> str:
+    """
+    分区日期 = stop_date - 1 day(与 HDFS writer 对齐)。
+    stop_date 格式 yyyyMMdd。
+    """
+    stop_at = datetime.strptime(stop_date, '%Y%m%d')
+    return (stop_at - timedelta(days=1)).strftime('%Y%m%d')
+
+
+def parse_ini_partition(ini_path: str, stop_date: str) -> Optional[str]:
+    """
+    读 ini 的 writer.path 提取 {db}.{table},生成 ALTER ADD PARTITION DDL。
+    非分区表(path 不含 /dt=${dt})或找不到 {db}.{table} 段 → None。
+    """
+    cp = ConfigParser()
+    cp.read(ini_path)
+    if not cp.has_option('writer', 'path'):
+        return None
+    path = cp.get('writer', 'path')
+    if _DT_PLACEHOLDER not in path:
+        return None
+
+    db = None
+    tbl = None
+    parts = [p for p in path.split('/') if p]
+    for i, seg in enumerate(parts):
+        if seg.endswith('.db') and i + 1 < len(parts):
+            db = seg[:-len('.db')]
+            tbl = parts[i + 1]
+            break
+    if not (db and tbl):
+        return None
+
+    dt = compute_partition_dt(stop_date)
+    return 'ALTER TABLE {db}.{tbl} ADD IF NOT EXISTS PARTITION(dt={dt});'.format(
+        db=db, tbl=tbl, dt=dt,
+    )
+
+
+def execute_ddls(ddls: List[str]) -> None:
+    """
+    调 hive -e 批量执行 ALTER DDL(一次 hive CLI 进程执行所有 DDL,空格分隔)。
+    空列表无操作;失败抛 RuntimeError 含 stderr 内容。
+    """
+    if not ddls:
+        return
+    full_sql = ' '.join(ddls)
+    # Python 3.6.8 无 capture_output 参数,用 stdout=PIPE, stderr=PIPE
+    result = subprocess.run(
+        ['hive', '-e', full_sql],
+        stdout=subprocess.PIPE,
+        stderr=subprocess.PIPE,
+    )
+    if result.returncode != 0:
+        stderr = result.stderr.decode('utf-8', errors='replace')
+        raise RuntimeError('hive -e 执行分区 DDL 失败: ' + stderr)