# -*- coding:utf-8 -*- """ DataX hive-import 场景的分区管理(从 bin/datax-multiple-hive-job-starter.sh:132-158 parse_ddl + L229-241 HIVE_DDL 执行段搬迁 Python 版)。 职责: - parse_ini_partition: 从 ini 的 writer.path 抽 {db}.{table},返回 ALTER DDL;非分区表返回 None - execute_ddls: 调 hive -e 执行一批 ALTER DDL 关键约束:分区 dt = start_date(业务日),与 HDFS writer 对齐;分区内允许含次日漂移 数据(按 ADR-03 raw 不纠正分区漂移)。配套 raw 48h 宽窗:where 用 [start, stop) 而 dt 跟 start_date,抓到的"漂到次日"记录统一落 dt=start_date 分区,由 ods 按 update_time 归位。 """ import subprocess from configparser import ConfigParser from typing import List, Optional _DT_PLACEHOLDER = '/dt=${dt}' def parse_ini_partition(ini_path: str, start_date: str) -> Optional[str]: """ 读 ini 的 writer.path 提取 {db}.{table},生成 ALTER ADD PARTITION DDL。 分区 dt = start_date(业务日)。 非分区表(path 不含 /dt=${dt})或找不到 {db}.{table} 段 → None。 """ cp = ConfigParser() cp.read(ini_path) if not cp.has_option('writer', 'path'): return None path = cp.get('writer', 'path') if _DT_PLACEHOLDER not in path: return None db = None tbl = None parts = [p for p in path.split('/') if p] for i, seg in enumerate(parts): if seg.endswith('.db') and i + 1 < len(parts): db = seg[:-len('.db')] tbl = parts[i + 1] break if not (db and tbl): return None return 'ALTER TABLE {db}.{tbl} ADD IF NOT EXISTS PARTITION(dt={dt});'.format( db=db, tbl=tbl, dt=start_date, ) def execute_ddls(ddls: List[str]) -> None: """ 调 hive -e 批量执行 ALTER DDL(一次 hive CLI 进程执行所有 DDL,空格分隔)。 空列表无操作;失败抛 RuntimeError 含 stderr 内容。 """ if not ddls: return full_sql = ' '.join(ddls) # Python 3.6.8 无 capture_output 参数,用 stdout=PIPE, stderr=PIPE result = subprocess.run( ['hive', '-e', full_sql], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) if result.returncode != 0: stderr = result.stderr.decode('utf-8', errors='replace') raise RuntimeError('hive -e 执行分区 DDL 失败: ' + stderr)