#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- """ Hive DDL 生成器(raw / ods 双层)。 **仅支持 PG 源**:reader.dataSource 必须是 `postgresql/{env}-{instance}` 形式;mysql 等其他源由复用的 datax-sync-template-gen.resolve_datasource 直接 NotImplementedError。 输入 sync ini,从 PG 抽字段类型 + 中文注释: - raw 层:按 reader.column 顺序渲染全字段 STRING + dt STRING 分区 + ORC + EXTERNAL - ods 层:按 reader.column 顺序应用 conf/pg-to-hive-type.ini 类型映射, 末尾加 is_deleted BOOLEAN 软删归一字段,dt STRING 分区 + ORC + EXTERNAL; 不加 etl_time / src_sys / src_tbl 技术字段(详见 ADR-06) 写到 stdout(传 -o 时额外落盘 {table_name}_create.sql)。 CLI: python3 bin/hive-ddl-gen.py -l {raw|ods} -ini jobs/raw/{域}/{table}.ini [-o [DIR]] 参数: -l 层级(raw / ods,必填) -ini sync ini 路径(按项目根解析相对路径,与项目其他 bin 入口一致) -o 输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘 workspace/{yyyymmdd}/;带值额外落盘 /) 表名由 writer.path 末两段反推(path 末段必须是 dt=... 占位); ods 表名 = raw 表名首段 'raw_' 替换为 'ods_'。 """ import argparse import importlib.util import os import sys from configparser import ConfigParser from datetime import datetime project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(project_root) def _load_sync_gen(): """复用 datax-sync-template-gen 的 resolve_datasource / parse_jdbc_url / query_columns_full / DS_POSTGRE_SQL_JDBC_URL(脚本名含连字符,importlib 加载)。 """ spec = importlib.util.spec_from_file_location( 'datax_sync_template_gen', os.path.join(project_root, 'bin', 'datax-sync-template-gen.py'), ) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) return mod SYNC_GEN = _load_sync_gen() WORKSPACE_DEFAULT = os.path.join( project_root, 'workspace', datetime.now().strftime('%Y%m%d'), ) def _resolve_to_project_root(path): if os.path.isabs(path): return path return os.path.join(project_root, path) def parse_sync_ini(path): """解析 sync ini,提取 raw DDL 渲染所需 5 项。""" if not os.path.isfile(path): raise FileNotFoundError('sync ini 不存在: ' + path) cp = ConfigParser() cp.read(path, encoding='utf-8') if not cp.has_section('reader'): raise KeyError('sync ini 缺 [reader] 段: ' + path) if not cp.has_section('writer'): raise KeyError('sync ini 缺 [writer] 段: ' + path) ds_ref = cp.get('reader', 'dataSource').strip() schema_table = cp.get('reader', 'table').strip() columns_str = cp.get('reader', 'column').strip() writer_path = cp.get('writer', 'path').strip() if '.' not in schema_table: raise ValueError('reader.table 必须 schema.table 格式: ' + schema_table) schema, table = schema_table.split('.', 1) columns = [c.strip() for c in columns_str.split(',') if c.strip()] if not columns: raise ValueError('reader.column 为空') return { 'ds_ref': ds_ref, 'schema': schema, 'table': table, 'columns': columns, 'writer_path': writer_path, } def reverse_table_name(writer_path): """从 writer.path 反推 Hive 表名。 path 形如 /user/hive/warehouse/raw.db/{table_name}/dt=${dt}/ 末段必须是 dt=... 占位,倒数第二段即表名。 """ p = writer_path.rstrip('/') parts = p.rsplit('/', 1) if len(parts) != 2 or not parts[1].startswith('dt='): raise ValueError( 'writer.path 末段必须是 dt=...,无法反推表名: ' + writer_path) return parts[0].rsplit('/', 1)[-1] def fetch_column_comments(ds_ref, schema, table): """连 PG 拿 schema.table 的 {字段名: 中文注释} dict。 复用 sync-template-gen 的 datasource 解析与 pg_catalog 查询,不另起一套。 """ rows = _fetch_pg_column_rows(ds_ref, schema, table) return {name: (comment or '') for _, name, comment, _, _ in rows} def fetch_column_full_rows(ds_ref, schema, table): """连 PG 拿 schema.table 的全字段 [(attnum, attname, comment, pg_type, pk_flag), ...]。 ods 渲染需要 pg_type,raw 只用 comment。本函数返回原始 rows 给 ods 用。 """ return _fetch_pg_column_rows(ds_ref, schema, table) def _fetch_pg_column_rows(ds_ref, schema, table): ds = SYNC_GEN.resolve_datasource(ds_ref) ds_dict = ds.parse() jdbc_url = ds_dict[SYNC_GEN.DS_POSTGRE_SQL_JDBC_URL] user = ds_dict['username'] password = ds_dict['password'] host, port, database = SYNC_GEN.parse_jdbc_url(jdbc_url) import pg8000.dbapi conn = pg8000.dbapi.connect( host=host, port=port, database=database, user=user, password=password, ) try: return SYNC_GEN.query_columns_full(conn, schema, table) finally: conn.close() def normalize_pg_type(pg_type): """PG type → 映射 conf 查询用的 normalized key。 规则: - 小写 + 去首尾空格 - 去括号参数:'numeric(12,2)' → 'numeric','character varying(64)' → 'character varying' - 去时区后缀:'timestamp(6) without time zone' → 'timestamp' """ t = pg_type.lower().strip() if '(' in t and ')' in t: before = t[:t.index('(')].strip() after = t[t.index(')') + 1:].strip() t = (before + ' ' + after).strip() for suffix in ('without time zone', 'with time zone'): if t.endswith(suffix): t = t[:-len(suffix)].strip() return t def load_type_mapping(conf_path): """读 conf/pg-to-hive-type.ini 的 [mapping] 段,返回 {normalized_pg_type: hive_type}。""" if not os.path.isfile(conf_path): raise FileNotFoundError('类型映射 conf 不存在: ' + conf_path) cp = ConfigParser() cp.read(conf_path, encoding='utf-8') if not cp.has_section('mapping'): raise KeyError('类型映射 conf 缺 [mapping] 段: ' + conf_path) return dict(cp.items('mapping')) def map_pg_to_hive(pg_type, type_mapping): """PG 字段类型映射到 Hive 类型;未命中报错让人显式补规则。""" key = normalize_pg_type(pg_type) if key not in type_mapping: raise KeyError( "PG 类型 '{}'(normalized '{}')不在 conf/pg-to-hive-type.ini 映射表," "需显式补规则".format(pg_type, key)) return type_mapping[key] def reverse_ods_table_name(raw_table_name): """raw_xxx → ods_xxx;首段必须是 'raw_'。""" if not raw_table_name.startswith('raw_'): raise ValueError("raw 表名首段必须是 'raw_': " + raw_table_name) return 'ods_' + raw_table_name[len('raw_'):] def render_raw_ddl(table_name, columns, comment_dict): """渲染 raw 层 DDL:全字段 STRING + dt STRING 分区 + ORC + EXTERNAL。 字段顺序严格按 columns(已是 sync ini reader.column 裁剪后顺序); 字段注释从 comment_dict 按字段名查,缺失留空字符串。 """ today = datetime.now().strftime('%Y-%m-%d') width = max(len(c) for c in columns) + 4 lines = [ '-- 作者:', '-- 日期:' + today, '-- 工单:', '-- 目的:', '-- 状态:[待执行]', '-- 备注:', '', 'DROP TABLE IF EXISTS raw.' + table_name + ';', '', 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.' + table_name + ' (', ] last_idx = len(columns) - 1 for i, col in enumerate(columns): comma = ',' if i < last_idx else '' comment = comment_dict.get(col, '').replace("'", "''") lines.append(" {col:<{w}}STRING COMMENT '{comment}'{comma}".format( col=col, w=width, comma=comma, comment=comment)) lines.extend([ ')', "COMMENT ''", 'PARTITIONED BY (dt STRING)', 'STORED AS ORC', "LOCATION '/user/hive/warehouse/raw.db/" + table_name + "';", '', ]) return '\n'.join(lines) def render_ods_ddl(raw_table_name, columns, full_rows, type_mapping): """渲染 ods 层 DDL:typed 字段 + is_deleted 归一 + dt 分区 + ORC + EXTERNAL。 full_rows: [(attnum, attname, comment, pg_type, pk_flag), ...] 来自 query_columns_full columns: sync ini reader.column 裁剪后字段列表(与 full_rows 字段名子集对齐) type_mapping: load_type_mapping 返回的 {normalized_pg_type: hive_type} 字段顺序按 columns(不依赖 full_rows attnum);缺类型 / 缺注释报错。 末尾加 is_deleted BOOLEAN 软删归一字段(注释固定)。 """ today = datetime.now().strftime('%Y-%m-%d') ods_table_name = reverse_ods_table_name(raw_table_name) width = max(len(c) for c in columns + ['is_deleted']) + 4 by_name = {r[1]: (r[3], r[2] or '') for r in full_rows} missing = [c for c in columns if c not in by_name] if missing: raise KeyError('reader.column 中字段 PG 元数据缺失: ' + ','.join(missing)) lines = [ '-- 作者:', '-- 日期:' + today, '-- 工单:', '-- 目的:', '-- 状态:[待执行]', '-- 备注:', '', 'DROP TABLE IF EXISTS ods.' + ods_table_name + ';', '', 'CREATE EXTERNAL TABLE IF NOT EXISTS ods.' + ods_table_name + ' (', ] type_width = max(len(map_pg_to_hive(by_name[c][0], type_mapping)) for c in columns) type_width = max(type_width, len('BOOLEAN')) + 2 for col in columns: pg_type, comment = by_name[col] hive_type = map_pg_to_hive(pg_type, type_mapping) comment = comment.replace("'", "''") lines.append(" {col:<{w}}{ht:<{tw}}COMMENT '{c}',".format( col=col, w=width, ht=hive_type, tw=type_width, c=comment)) lines.append(" {col:<{w}}{ht:<{tw}}COMMENT '软删除归一(CASE WHEN del_* THEN TRUE)'".format( col='is_deleted', w=width, ht='BOOLEAN', tw=type_width)) lines.extend([ ')', "COMMENT ''", 'PARTITIONED BY (dt STRING)', 'STORED AS ORC', "LOCATION '/user/hive/warehouse/ods.db/" + ods_table_name + "';", '', ]) return '\n'.join(lines) def main(): parser = argparse.ArgumentParser( prog='hive-ddl-gen', description='Hive DDL 生成器(raw / ods 双层)', ) parser.add_argument('-l', required=True, choices=['raw', 'ods'], metavar='LAYER', help='层级(raw / ods,必填)') parser.add_argument('-ini', required=True, metavar='PATH', help='sync ini 路径(按项目根解析相对路径)') parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None, metavar='DIR', help='输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘 workspace/{yyyymmdd}/;带值额外落盘 /)') args = parser.parse_args() ini_path = _resolve_to_project_root(args.ini) spec = parse_sync_ini(ini_path) table_name = reverse_table_name(spec['writer_path']) if args.l == 'raw': comment_dict = fetch_column_comments( spec['ds_ref'], spec['schema'], spec['table']) ddl = render_raw_ddl(table_name, spec['columns'], comment_dict) out_table_name = table_name else: full_rows = fetch_column_full_rows( spec['ds_ref'], spec['schema'], spec['table']) type_mapping = load_type_mapping( os.path.join(project_root, 'conf', 'pg-to-hive-type.ini')) ddl = render_ods_ddl(table_name, spec['columns'], full_rows, type_mapping) out_table_name = reverse_ods_table_name(table_name) sys.stdout.write(ddl) if args.o is not None: os.makedirs(args.o, exist_ok=True) out_path = os.path.join(args.o, out_table_name + '_create.sql') with open(out_path, 'w', encoding='utf-8') as f: f.write(ddl) print('已写入: ' + out_path, file=sys.stderr) if __name__ == '__main__': main()