|
@@ -1,26 +1,31 @@
|
|
|
#!/usr/bin/env /usr/bin/python3
|
|
#!/usr/bin/env /usr/bin/python3
|
|
|
# -*- coding:utf-8 -*-
|
|
# -*- coding:utf-8 -*-
|
|
|
"""
|
|
"""
|
|
|
-Hive DDL 生成器(raw 层;ods 层占位待实施)。
|
|
|
|
|
|
|
+Hive DDL 生成器(raw / ods 双层)。
|
|
|
|
|
|
|
|
**仅支持 PG 源**:reader.dataSource 必须是 `postgresql/{env}-{instance}`
|
|
**仅支持 PG 源**:reader.dataSource 必须是 `postgresql/{env}-{instance}`
|
|
|
形式;mysql 等其他源由复用的 datax-sync-template-gen.resolve_datasource
|
|
形式;mysql 等其他源由复用的 datax-sync-template-gen.resolve_datasource
|
|
|
直接 NotImplementedError。
|
|
直接 NotImplementedError。
|
|
|
|
|
|
|
|
-输入 sync ini,从 PG 抽字段中文注释,按 reader.column 顺序渲染
|
|
|
|
|
-全字段 STRING + dt STRING 分区 + ORC + EXTERNAL TABLE,写到 stdout
|
|
|
|
|
-(传 -o 时额外落盘 {table_name}_create.sql)。
|
|
|
|
|
|
|
+输入 sync ini,从 PG 抽字段类型 + 中文注释:
|
|
|
|
|
+ - raw 层:按 reader.column 顺序渲染全字段 STRING + dt STRING 分区 + ORC + EXTERNAL
|
|
|
|
|
+ - ods 层:按 reader.column 顺序应用 conf/pg-to-hive-type.ini 类型映射,
|
|
|
|
|
+ 末尾加 is_deleted BOOLEAN 软删归一字段,dt STRING 分区 + ORC + EXTERNAL;
|
|
|
|
|
+ 不加 etl_time / src_sys / src_tbl 技术字段(详见 ADR-06)
|
|
|
|
|
+
|
|
|
|
|
+写到 stdout(传 -o 时额外落盘 {table_name}_create.sql)。
|
|
|
|
|
|
|
|
CLI:
|
|
CLI:
|
|
|
- python3 bin/hive-ddl-gen.py -l raw -ini jobs/raw/{域}/{table}.ini [-o [DIR]]
|
|
|
|
|
|
|
+ python3 bin/hive-ddl-gen.py -l {raw|ods} -ini jobs/raw/{域}/{table}.ini [-o [DIR]]
|
|
|
|
|
|
|
|
参数:
|
|
参数:
|
|
|
- -l 层级(raw 必填;ods 暂未实施,传入直接 NotImplementedError)
|
|
|
|
|
|
|
+ -l 层级(raw / ods,必填)
|
|
|
-ini sync ini 路径(按项目根解析相对路径,与项目其他 bin 入口一致)
|
|
-ini sync ini 路径(按项目根解析相对路径,与项目其他 bin 入口一致)
|
|
|
-o 输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘
|
|
-o 输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘
|
|
|
workspace/{yyyymmdd}/;带值额外落盘 <DIR>/)
|
|
workspace/{yyyymmdd}/;带值额外落盘 <DIR>/)
|
|
|
|
|
|
|
|
-表名由 writer.path 末两段反推(path 末段必须是 dt=... 占位)。
|
|
|
|
|
|
|
+表名由 writer.path 末两段反推(path 末段必须是 dt=... 占位);
|
|
|
|
|
+ods 表名 = raw 表名首段 'raw_' 替换为 'ods_'。
|
|
|
"""
|
|
"""
|
|
|
import argparse
|
|
import argparse
|
|
|
import importlib.util
|
|
import importlib.util
|
|
@@ -109,6 +114,19 @@ def fetch_column_comments(ds_ref, schema, table):
|
|
|
|
|
|
|
|
复用 sync-template-gen 的 datasource 解析与 pg_catalog 查询,不另起一套。
|
|
复用 sync-template-gen 的 datasource 解析与 pg_catalog 查询,不另起一套。
|
|
|
"""
|
|
"""
|
|
|
|
|
+ rows = _fetch_pg_column_rows(ds_ref, schema, table)
|
|
|
|
|
+ return {name: (comment or '') for _, name, comment, _, _ in rows}
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def fetch_column_full_rows(ds_ref, schema, table):
|
|
|
|
|
+ """连 PG 拿 schema.table 的全字段 [(attnum, attname, comment, pg_type, pk_flag), ...]。
|
|
|
|
|
+
|
|
|
|
|
+ ods 渲染需要 pg_type,raw 只用 comment。本函数返回原始 rows 给 ods 用。
|
|
|
|
|
+ """
|
|
|
|
|
+ return _fetch_pg_column_rows(ds_ref, schema, table)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _fetch_pg_column_rows(ds_ref, schema, table):
|
|
|
ds = SYNC_GEN.resolve_datasource(ds_ref)
|
|
ds = SYNC_GEN.resolve_datasource(ds_ref)
|
|
|
ds_dict = ds.parse()
|
|
ds_dict = ds.parse()
|
|
|
jdbc_url = ds_dict[SYNC_GEN.DS_POSTGRE_SQL_JDBC_URL]
|
|
jdbc_url = ds_dict[SYNC_GEN.DS_POSTGRE_SQL_JDBC_URL]
|
|
@@ -122,10 +140,56 @@ def fetch_column_comments(ds_ref, schema, table):
|
|
|
user=user, password=password,
|
|
user=user, password=password,
|
|
|
)
|
|
)
|
|
|
try:
|
|
try:
|
|
|
- rows = SYNC_GEN.query_columns_full(conn, schema, table)
|
|
|
|
|
|
|
+ return SYNC_GEN.query_columns_full(conn, schema, table)
|
|
|
finally:
|
|
finally:
|
|
|
conn.close()
|
|
conn.close()
|
|
|
- return {name: (comment or '') for _, name, comment, _, _ in rows}
|
|
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def normalize_pg_type(pg_type):
|
|
|
|
|
+ """PG type → 映射 conf 查询用的 normalized key。
|
|
|
|
|
+
|
|
|
|
|
+ 规则:
|
|
|
|
|
+ - 小写 + 去首尾空格
|
|
|
|
|
+ - 去括号参数:'numeric(12,2)' → 'numeric','character varying(64)' → 'character varying'
|
|
|
|
|
+ - 去时区后缀:'timestamp(6) without time zone' → 'timestamp'
|
|
|
|
|
+ """
|
|
|
|
|
+ t = pg_type.lower().strip()
|
|
|
|
|
+ if '(' in t and ')' in t:
|
|
|
|
|
+ before = t[:t.index('(')].strip()
|
|
|
|
|
+ after = t[t.index(')') + 1:].strip()
|
|
|
|
|
+ t = (before + ' ' + after).strip()
|
|
|
|
|
+ for suffix in ('without time zone', 'with time zone'):
|
|
|
|
|
+ if t.endswith(suffix):
|
|
|
|
|
+ t = t[:-len(suffix)].strip()
|
|
|
|
|
+ return t
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def load_type_mapping(conf_path):
|
|
|
|
|
+ """读 conf/pg-to-hive-type.ini 的 [mapping] 段,返回 {normalized_pg_type: hive_type}。"""
|
|
|
|
|
+ if not os.path.isfile(conf_path):
|
|
|
|
|
+ raise FileNotFoundError('类型映射 conf 不存在: ' + conf_path)
|
|
|
|
|
+ cp = ConfigParser()
|
|
|
|
|
+ cp.read(conf_path, encoding='utf-8')
|
|
|
|
|
+ if not cp.has_section('mapping'):
|
|
|
|
|
+ raise KeyError('类型映射 conf 缺 [mapping] 段: ' + conf_path)
|
|
|
|
|
+ return dict(cp.items('mapping'))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def map_pg_to_hive(pg_type, type_mapping):
|
|
|
|
|
+ """PG 字段类型映射到 Hive 类型;未命中报错让人显式补规则。"""
|
|
|
|
|
+ key = normalize_pg_type(pg_type)
|
|
|
|
|
+ if key not in type_mapping:
|
|
|
|
|
+ raise KeyError(
|
|
|
|
|
+ "PG 类型 '{}'(normalized '{}')不在 conf/pg-to-hive-type.ini 映射表,"
|
|
|
|
|
+ "需显式补规则".format(pg_type, key))
|
|
|
|
|
+ return type_mapping[key]
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def reverse_ods_table_name(raw_table_name):
|
|
|
|
|
+ """raw_xxx → ods_xxx;首段必须是 'raw_'。"""
|
|
|
|
|
+ if not raw_table_name.startswith('raw_'):
|
|
|
|
|
+ raise ValueError("raw 表名首段必须是 'raw_': " + raw_table_name)
|
|
|
|
|
+ return 'ods_' + raw_table_name[len('raw_'):]
|
|
|
|
|
|
|
|
|
|
|
|
|
def render_raw_ddl(table_name, columns, comment_dict):
|
|
def render_raw_ddl(table_name, columns, comment_dict):
|
|
@@ -166,14 +230,66 @@ def render_raw_ddl(table_name, columns, comment_dict):
|
|
|
return '\n'.join(lines)
|
|
return '\n'.join(lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+def render_ods_ddl(raw_table_name, columns, full_rows, type_mapping):
|
|
|
|
|
+ """渲染 ods 层 DDL:typed 字段 + is_deleted 归一 + dt 分区 + ORC + EXTERNAL。
|
|
|
|
|
+
|
|
|
|
|
+ full_rows: [(attnum, attname, comment, pg_type, pk_flag), ...] 来自 query_columns_full
|
|
|
|
|
+ columns: sync ini reader.column 裁剪后字段列表(与 full_rows 字段名子集对齐)
|
|
|
|
|
+ type_mapping: load_type_mapping 返回的 {normalized_pg_type: hive_type}
|
|
|
|
|
+
|
|
|
|
|
+ 字段顺序按 columns(不依赖 full_rows attnum);缺类型 / 缺注释报错。
|
|
|
|
|
+ 末尾加 is_deleted BOOLEAN 软删归一字段(注释固定)。
|
|
|
|
|
+ """
|
|
|
|
|
+ today = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
|
+ ods_table_name = reverse_ods_table_name(raw_table_name)
|
|
|
|
|
+ width = max(len(c) for c in columns + ['is_deleted']) + 4
|
|
|
|
|
+
|
|
|
|
|
+ by_name = {r[1]: (r[3], r[2] or '') for r in full_rows}
|
|
|
|
|
+ missing = [c for c in columns if c not in by_name]
|
|
|
|
|
+ if missing:
|
|
|
|
|
+ raise KeyError('reader.column 中字段 PG 元数据缺失: ' + ','.join(missing))
|
|
|
|
|
+
|
|
|
|
|
+ lines = [
|
|
|
|
|
+ '-- 作者:<TODO>',
|
|
|
|
|
+ '-- 日期:' + today,
|
|
|
|
|
+ '-- 工单:<TODO>',
|
|
|
|
|
+ '-- 目的:<TODO>',
|
|
|
|
|
+ '-- 状态:[待执行]',
|
|
|
|
|
+ '-- 备注:<TODO>',
|
|
|
|
|
+ '',
|
|
|
|
|
+ 'DROP TABLE IF EXISTS ods.' + ods_table_name + ';',
|
|
|
|
|
+ '',
|
|
|
|
|
+ 'CREATE EXTERNAL TABLE IF NOT EXISTS ods.' + ods_table_name + ' (',
|
|
|
|
|
+ ]
|
|
|
|
|
+ type_width = max(len(map_pg_to_hive(by_name[c][0], type_mapping)) for c in columns)
|
|
|
|
|
+ type_width = max(type_width, len('BOOLEAN')) + 2
|
|
|
|
|
+ for col in columns:
|
|
|
|
|
+ pg_type, comment = by_name[col]
|
|
|
|
|
+ hive_type = map_pg_to_hive(pg_type, type_mapping)
|
|
|
|
|
+ comment = comment.replace("'", "''")
|
|
|
|
|
+ lines.append(" {col:<{w}}{ht:<{tw}}COMMENT '{c}',".format(
|
|
|
|
|
+ col=col, w=width, ht=hive_type, tw=type_width, c=comment))
|
|
|
|
|
+ lines.append(" {col:<{w}}{ht:<{tw}}COMMENT '软删除归一(CASE WHEN del_* THEN TRUE)'".format(
|
|
|
|
|
+ col='is_deleted', w=width, ht='BOOLEAN', tw=type_width))
|
|
|
|
|
+ lines.extend([
|
|
|
|
|
+ ')',
|
|
|
|
|
+ "COMMENT '<TODO>'",
|
|
|
|
|
+ 'PARTITIONED BY (dt STRING)',
|
|
|
|
|
+ 'STORED AS ORC',
|
|
|
|
|
+ "LOCATION '/user/hive/warehouse/ods.db/" + ods_table_name + "';",
|
|
|
|
|
+ '',
|
|
|
|
|
+ ])
|
|
|
|
|
+ return '\n'.join(lines)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
def main():
|
|
def main():
|
|
|
parser = argparse.ArgumentParser(
|
|
parser = argparse.ArgumentParser(
|
|
|
prog='hive-ddl-gen',
|
|
prog='hive-ddl-gen',
|
|
|
- description='Hive DDL 生成器(raw 层;ods 层占位待实施)',
|
|
|
|
|
|
|
+ description='Hive DDL 生成器(raw / ods 双层)',
|
|
|
)
|
|
)
|
|
|
parser.add_argument('-l', required=True, choices=['raw', 'ods'],
|
|
parser.add_argument('-l', required=True, choices=['raw', 'ods'],
|
|
|
metavar='LAYER',
|
|
metavar='LAYER',
|
|
|
- help='层级(raw 必填;ods 暂未实施直接报错)')
|
|
|
|
|
|
|
+ help='层级(raw / ods,必填)')
|
|
|
parser.add_argument('-ini', required=True, metavar='PATH',
|
|
parser.add_argument('-ini', required=True, metavar='PATH',
|
|
|
help='sync ini 路径(按项目根解析相对路径)')
|
|
help='sync ini 路径(按项目根解析相对路径)')
|
|
|
parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None,
|
|
parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None,
|
|
@@ -181,22 +297,28 @@ def main():
|
|
|
help='输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘 workspace/{yyyymmdd}/;带值额外落盘 <DIR>/)')
|
|
help='输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘 workspace/{yyyymmdd}/;带值额外落盘 <DIR>/)')
|
|
|
args = parser.parse_args()
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
- if args.l == 'ods':
|
|
|
|
|
- raise NotImplementedError('ods 层 DDL 生成暂未实施(ADR-06)')
|
|
|
|
|
-
|
|
|
|
|
ini_path = _resolve_to_project_root(args.ini)
|
|
ini_path = _resolve_to_project_root(args.ini)
|
|
|
spec = parse_sync_ini(ini_path)
|
|
spec = parse_sync_ini(ini_path)
|
|
|
table_name = reverse_table_name(spec['writer_path'])
|
|
table_name = reverse_table_name(spec['writer_path'])
|
|
|
- comment_dict = fetch_column_comments(
|
|
|
|
|
- spec['ds_ref'], spec['schema'], spec['table'])
|
|
|
|
|
|
|
|
|
|
- ddl = render_raw_ddl(table_name, spec['columns'], comment_dict)
|
|
|
|
|
|
|
+ if args.l == 'raw':
|
|
|
|
|
+ comment_dict = fetch_column_comments(
|
|
|
|
|
+ spec['ds_ref'], spec['schema'], spec['table'])
|
|
|
|
|
+ ddl = render_raw_ddl(table_name, spec['columns'], comment_dict)
|
|
|
|
|
+ out_table_name = table_name
|
|
|
|
|
+ else:
|
|
|
|
|
+ full_rows = fetch_column_full_rows(
|
|
|
|
|
+ spec['ds_ref'], spec['schema'], spec['table'])
|
|
|
|
|
+ type_mapping = load_type_mapping(
|
|
|
|
|
+ os.path.join(project_root, 'conf', 'pg-to-hive-type.ini'))
|
|
|
|
|
+ ddl = render_ods_ddl(table_name, spec['columns'], full_rows, type_mapping)
|
|
|
|
|
+ out_table_name = reverse_ods_table_name(table_name)
|
|
|
|
|
|
|
|
sys.stdout.write(ddl)
|
|
sys.stdout.write(ddl)
|
|
|
|
|
|
|
|
if args.o is not None:
|
|
if args.o is not None:
|
|
|
os.makedirs(args.o, exist_ok=True)
|
|
os.makedirs(args.o, exist_ok=True)
|
|
|
- out_path = os.path.join(args.o, table_name + '_create.sql')
|
|
|
|
|
|
|
+ out_path = os.path.join(args.o, out_table_name + '_create.sql')
|
|
|
with open(out_path, 'w', encoding='utf-8') as f:
|
|
with open(out_path, 'w', encoding='utf-8') as f:
|
|
|
f.write(ddl)
|
|
f.write(ddl)
|
|
|
print('已写入: ' + out_path, file=sys.stderr)
|
|
print('已写入: ' + out_path, file=sys.stderr)
|