partition.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. # -*- coding:utf-8 -*-
  2. """
  3. DataX hive-import 场景的分区管理(从 bin/datax-multiple-hive-job-starter.sh:132-158 parse_ddl
  4. + L229-241 HIVE_DDL 执行段搬迁 Python 版)。
  5. 职责:
  6. - parse_ini_partition: 从 ini 的 writer.path 抽 {db}.{table},返回 ALTER DDL;非分区表返回 None
  7. - execute_ddls: 调 hive -e 执行一批 ALTER DDL
  8. 关键约束:分区 dt = start_date(业务日),与 HDFS writer 对齐;分区内允许含次日漂移
  9. 数据(按 ADR-03 raw 不纠正分区漂移)。配套 raw 48h 宽窗:where 用 [start, stop) 而 dt
  10. 跟 start_date,抓到的"漂到次日"记录统一落 dt=start_date 分区,由 ods 按 update_time 归位。
  11. """
  12. import subprocess
  13. from configparser import ConfigParser
  14. from typing import List, Optional
  15. _DT_PLACEHOLDER = '/dt=${dt}'
  16. def parse_ini_partition(ini_path: str, start_date: str) -> Optional[str]:
  17. """
  18. 读 ini 的 writer.path 提取 {db}.{table},生成 ALTER ADD PARTITION DDL。
  19. 分区 dt = start_date(业务日)。
  20. 非分区表(path 不含 /dt=${dt})或找不到 {db}.{table} 段 → None。
  21. """
  22. cp = ConfigParser()
  23. cp.read(ini_path)
  24. if not cp.has_option('writer', 'path'):
  25. return None
  26. path = cp.get('writer', 'path')
  27. if _DT_PLACEHOLDER not in path:
  28. return None
  29. db = None
  30. tbl = None
  31. parts = [p for p in path.split('/') if p]
  32. for i, seg in enumerate(parts):
  33. if seg.endswith('.db') and i + 1 < len(parts):
  34. db = seg[:-len('.db')]
  35. tbl = parts[i + 1]
  36. break
  37. if not (db and tbl):
  38. return None
  39. return 'ALTER TABLE {db}.{tbl} ADD IF NOT EXISTS PARTITION(dt={dt});'.format(
  40. db=db, tbl=tbl, dt=start_date,
  41. )
  42. def execute_ddls(ddls: List[str]) -> None:
  43. """
  44. 调 hive -e 批量执行 ALTER DDL(一次 hive CLI 进程执行所有 DDL,空格分隔)。
  45. 空列表无操作;失败抛 RuntimeError 含 stderr 内容。
  46. """
  47. if not ddls:
  48. return
  49. full_sql = ' '.join(ddls)
  50. # Python 3.6.8 无 capture_output 参数,用 stdout=PIPE, stderr=PIPE
  51. result = subprocess.run(
  52. ['hive', '-e', full_sql],
  53. stdout=subprocess.PIPE,
  54. stderr=subprocess.PIPE,
  55. )
  56. if result.returncode != 0:
  57. stderr = result.stderr.decode('utf-8', errors='replace')
  58. raise RuntimeError('hive -e 执行分区 DDL 失败: ' + stderr)