| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- # -*- coding:utf-8 -*-
- """
- DataX hive-import 场景的分区管理(从 bin/datax-multiple-hive-job-starter.sh:132-158 parse_ddl
- + L229-241 HIVE_DDL 执行段搬迁 Python 版)。
- 职责:
- - parse_ini_partition: 从 ini 的 writer.path 抽 {db}.{table},返回 ALTER DDL;非分区表返回 None
- - execute_ddls: 调 hive -e 执行一批 ALTER DDL
- 关键约束(kb/90 §2.6 实现建议 5):分区 dt 用 stop_date - 1 day,与 HDFS writer 对齐;
- 不沿用老脚本 START_DATE = dt 的假设(单日范围下两者相等没问题,多日范围错位)。
- """
- import subprocess
- from configparser import ConfigParser
- from datetime import datetime, timedelta
- from typing import List, Optional
- _DT_PLACEHOLDER = '/dt=${dt}'
- def compute_partition_dt(stop_date: str) -> str:
- """
- 分区日期 = stop_date - 1 day(与 HDFS writer 对齐)。
- stop_date 格式 yyyyMMdd。
- """
- stop_at = datetime.strptime(stop_date, '%Y%m%d')
- return (stop_at - timedelta(days=1)).strftime('%Y%m%d')
- def parse_ini_partition(ini_path: str, stop_date: str) -> Optional[str]:
- """
- 读 ini 的 writer.path 提取 {db}.{table},生成 ALTER ADD PARTITION DDL。
- 非分区表(path 不含 /dt=${dt})或找不到 {db}.{table} 段 → None。
- """
- cp = ConfigParser()
- cp.read(ini_path)
- if not cp.has_option('writer', 'path'):
- return None
- path = cp.get('writer', 'path')
- if _DT_PLACEHOLDER not in path:
- return None
- db = None
- tbl = None
- parts = [p for p in path.split('/') if p]
- for i, seg in enumerate(parts):
- if seg.endswith('.db') and i + 1 < len(parts):
- db = seg[:-len('.db')]
- tbl = parts[i + 1]
- break
- if not (db and tbl):
- return None
- dt = compute_partition_dt(stop_date)
- return 'ALTER TABLE {db}.{tbl} ADD IF NOT EXISTS PARTITION(dt={dt});'.format(
- db=db, tbl=tbl, dt=dt,
- )
- def execute_ddls(ddls: List[str]) -> None:
- """
- 调 hive -e 批量执行 ALTER DDL(一次 hive CLI 进程执行所有 DDL,空格分隔)。
- 空列表无操作;失败抛 RuntimeError 含 stderr 内容。
- """
- if not ddls:
- return
- full_sql = ' '.join(ddls)
- # Python 3.6.8 无 capture_output 参数,用 stdout=PIPE, stderr=PIPE
- result = subprocess.run(
- ['hive', '-e', full_sql],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- )
- if result.returncode != 0:
- stderr = result.stderr.decode('utf-8', errors='replace')
- raise RuntimeError('hive -e 执行分区 DDL 失败: ' + stderr)
|