#!/usr/bin/env /usr/bin/python3
# -*- coding:utf-8 -*-
"""
Hive DDL 生成器(raw 层;ods 层占位待实施)。
**仅支持 PG 源**:reader.dataSource 必须是 `postgresql/{env}-{instance}`
形式;mysql 等其他源由复用的 datax-sync-template-gen.resolve_datasource
直接 NotImplementedError。
输入 sync ini,从 PG 抽字段中文注释,按 reader.column 顺序渲染
全字段 STRING + dt STRING 分区 + ORC + EXTERNAL TABLE,写到 stdout
(传 -o 时额外落盘 {table_name}_create.sql)。
CLI:
python3 bin/hive-ddl-gen.py -l raw -ini jobs/raw/{域}/{table}.ini [-o [DIR]]
参数:
-l 层级(raw 必填;ods 暂未实施,传入直接 NotImplementedError)
-ini sync ini 路径(按项目根解析相对路径,与项目其他 bin 入口一致)
-o 输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘
workspace/{yyyymmdd}/;带值额外落盘
/)
表名由 writer.path 末两段反推(path 末段必须是 dt=... 占位)。
"""
import argparse
import importlib.util
import os
import sys
from configparser import ConfigParser
from datetime import datetime
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(project_root)
def _load_sync_gen():
"""复用 datax-sync-template-gen 的 resolve_datasource / parse_jdbc_url /
query_columns_full / DS_POSTGRE_SQL_JDBC_URL(脚本名含连字符,importlib 加载)。
"""
spec = importlib.util.spec_from_file_location(
'datax_sync_template_gen',
os.path.join(project_root, 'bin', 'datax-sync-template-gen.py'),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
SYNC_GEN = _load_sync_gen()
WORKSPACE_DEFAULT = os.path.join(
project_root, 'workspace', datetime.now().strftime('%Y%m%d'),
)
def _resolve_to_project_root(path):
if os.path.isabs(path):
return path
return os.path.join(project_root, path)
def parse_sync_ini(path):
"""解析 sync ini,提取 raw DDL 渲染所需 5 项。"""
if not os.path.isfile(path):
raise FileNotFoundError('sync ini 不存在: ' + path)
cp = ConfigParser()
cp.read(path, encoding='utf-8')
if not cp.has_section('reader'):
raise KeyError('sync ini 缺 [reader] 段: ' + path)
if not cp.has_section('writer'):
raise KeyError('sync ini 缺 [writer] 段: ' + path)
ds_ref = cp.get('reader', 'dataSource').strip()
schema_table = cp.get('reader', 'table').strip()
columns_str = cp.get('reader', 'column').strip()
writer_path = cp.get('writer', 'path').strip()
if '.' not in schema_table:
raise ValueError('reader.table 必须 schema.table 格式: ' + schema_table)
schema, table = schema_table.split('.', 1)
columns = [c.strip() for c in columns_str.split(',') if c.strip()]
if not columns:
raise ValueError('reader.column 为空')
return {
'ds_ref': ds_ref,
'schema': schema,
'table': table,
'columns': columns,
'writer_path': writer_path,
}
def reverse_table_name(writer_path):
"""从 writer.path 反推 Hive 表名。
path 形如 /user/hive/warehouse/raw.db/{table_name}/dt=${dt}/
末段必须是 dt=... 占位,倒数第二段即表名。
"""
p = writer_path.rstrip('/')
parts = p.rsplit('/', 1)
if len(parts) != 2 or not parts[1].startswith('dt='):
raise ValueError(
'writer.path 末段必须是 dt=...,无法反推表名: ' + writer_path)
return parts[0].rsplit('/', 1)[-1]
def fetch_column_comments(ds_ref, schema, table):
"""连 PG 拿 schema.table 的 {字段名: 中文注释} dict。
复用 sync-template-gen 的 datasource 解析与 pg_catalog 查询,不另起一套。
"""
ds = SYNC_GEN.resolve_datasource(ds_ref)
ds_dict = ds.parse()
jdbc_url = ds_dict[SYNC_GEN.DS_POSTGRE_SQL_JDBC_URL]
user = ds_dict['username']
password = ds_dict['password']
host, port, database = SYNC_GEN.parse_jdbc_url(jdbc_url)
import pg8000.dbapi
conn = pg8000.dbapi.connect(
host=host, port=port, database=database,
user=user, password=password,
)
try:
rows = SYNC_GEN.query_columns_full(conn, schema, table)
finally:
conn.close()
return {name: (comment or '') for _, name, comment, _, _ in rows}
def render_raw_ddl(table_name, columns, comment_dict):
"""渲染 raw 层 DDL:全字段 STRING + dt STRING 分区 + ORC + EXTERNAL。
字段顺序严格按 columns(已是 sync ini reader.column 裁剪后顺序);
字段注释从 comment_dict 按字段名查,缺失留空字符串。
"""
today = datetime.now().strftime('%Y-%m-%d')
width = max(len(c) for c in columns) + 4
lines = [
'-- 作者:',
'-- 日期:' + today,
'-- 工单:',
'-- 目的:',
'-- 状态:[待执行]',
'-- 备注:',
'',
'DROP TABLE IF EXISTS raw.' + table_name + ';',
'',
'CREATE EXTERNAL TABLE IF NOT EXISTS raw.' + table_name + ' (',
]
last_idx = len(columns) - 1
for i, col in enumerate(columns):
comma = ',' if i < last_idx else ''
comment = comment_dict.get(col, '').replace("'", "''")
lines.append(" {col:<{w}}STRING COMMENT '{comment}'{comma}".format(
col=col, w=width, comma=comma, comment=comment))
lines.extend([
')',
"COMMENT ''",
'PARTITIONED BY (dt STRING)',
'STORED AS ORC',
"LOCATION '/user/hive/warehouse/raw.db/" + table_name + "';",
'',
])
return '\n'.join(lines)
def main():
parser = argparse.ArgumentParser(
prog='hive-ddl-gen',
description='Hive DDL 生成器(raw 层;ods 层占位待实施)',
)
parser.add_argument('-l', required=True, choices=['raw', 'ods'],
metavar='LAYER',
help='层级(raw 必填;ods 暂未实施直接报错)')
parser.add_argument('-ini', required=True, metavar='PATH',
help='sync ini 路径(按项目根解析相对路径)')
parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None,
metavar='DIR',
help='输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘 workspace/{yyyymmdd}/;带值额外落盘 /)')
args = parser.parse_args()
if args.l == 'ods':
raise NotImplementedError('ods 层 DDL 生成暂未实施(ADR-06)')
ini_path = _resolve_to_project_root(args.ini)
spec = parse_sync_ini(ini_path)
table_name = reverse_table_name(spec['writer_path'])
comment_dict = fetch_column_comments(
spec['ds_ref'], spec['schema'], spec['table'])
ddl = render_raw_ddl(table_name, spec['columns'], comment_dict)
sys.stdout.write(ddl)
if args.o is not None:
os.makedirs(args.o, exist_ok=True)
out_path = os.path.join(args.o, table_name + '_create.sql')
with open(out_path, 'w', encoding='utf-8') as f:
f.write(ddl)
print('已写入: ' + out_path, file=sys.stderr)
if __name__ == '__main__':
main()