hive-ddl-gen.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. Hive DDL 生成器(raw / ods 双层)。
  5. **仅支持 PG 源**:reader.dataSource 必须是 `postgresql/{env}-{instance}`
  6. 形式;mysql 等其他源由复用的 datax-sync-template-gen.resolve_datasource
  7. 直接 NotImplementedError。
  8. 输入 sync ini,从 PG 抽字段类型 + 中文注释:
  9. - raw 层:按 reader.column 顺序渲染全字段 STRING + dt STRING 分区 + ORC + EXTERNAL
  10. - ods 层:按 reader.column 顺序应用 conf/pg-to-hive-type.ini 类型映射,
  11. 末尾加 is_deleted BOOLEAN 软删归一字段,dt STRING 分区 + ORC + EXTERNAL;
  12. 不加 etl_time / src_sys / src_tbl 技术字段(详见 ADR-06)
  13. 写到 stdout(传 -o 时额外落盘 {table_name}_create.sql)。
  14. CLI:
  15. python3 bin/hive-ddl-gen.py -l {raw|ods} -ini jobs/raw/{域}/{table}.ini [-o [DIR]]
  16. 参数:
  17. -l 层级(raw / ods,必填)
  18. -ini sync ini 路径(按项目根解析相对路径,与项目其他 bin 入口一致)
  19. -o 输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘
  20. workspace/{yyyymmdd}/;带值额外落盘 <DIR>/)
  21. 表名由 writer.path 末两段反推(path 末段必须是 dt=... 占位);
  22. ods 表名 = raw 表名首段 'raw_' 替换为 'ods_'。
  23. """
  24. import argparse
  25. import importlib.util
  26. import os
  27. import sys
  28. from configparser import ConfigParser
  29. from datetime import datetime
  30. project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  31. sys.path.append(project_root)
  32. def _load_sync_gen():
  33. """复用 datax-sync-template-gen 的 resolve_datasource / parse_jdbc_url /
  34. query_columns_full / DS_POSTGRE_SQL_JDBC_URL(脚本名含连字符,importlib 加载)。
  35. """
  36. spec = importlib.util.spec_from_file_location(
  37. 'datax_sync_template_gen',
  38. os.path.join(project_root, 'bin', 'datax-sync-template-gen.py'),
  39. )
  40. mod = importlib.util.module_from_spec(spec)
  41. spec.loader.exec_module(mod)
  42. return mod
  43. SYNC_GEN = _load_sync_gen()
  44. WORKSPACE_DEFAULT = os.path.join(
  45. project_root, 'workspace', datetime.now().strftime('%Y%m%d'),
  46. )
  47. def _resolve_to_project_root(path):
  48. if os.path.isabs(path):
  49. return path
  50. return os.path.join(project_root, path)
  51. def parse_sync_ini(path):
  52. """解析 sync ini,提取 raw DDL 渲染所需 5 项。"""
  53. if not os.path.isfile(path):
  54. raise FileNotFoundError('sync ini 不存在: ' + path)
  55. cp = ConfigParser()
  56. cp.read(path, encoding='utf-8')
  57. if not cp.has_section('reader'):
  58. raise KeyError('sync ini 缺 [reader] 段: ' + path)
  59. if not cp.has_section('writer'):
  60. raise KeyError('sync ini 缺 [writer] 段: ' + path)
  61. ds_ref = cp.get('reader', 'dataSource').strip()
  62. schema_table = cp.get('reader', 'table').strip()
  63. columns_str = cp.get('reader', 'column').strip()
  64. writer_path = cp.get('writer', 'path').strip()
  65. if '.' not in schema_table:
  66. raise ValueError('reader.table 必须 schema.table 格式: ' + schema_table)
  67. schema, table = schema_table.split('.', 1)
  68. columns = [c.strip() for c in columns_str.split(',') if c.strip()]
  69. if not columns:
  70. raise ValueError('reader.column 为空')
  71. return {
  72. 'ds_ref': ds_ref,
  73. 'schema': schema,
  74. 'table': table,
  75. 'columns': columns,
  76. 'writer_path': writer_path,
  77. }
  78. def reverse_table_name(writer_path):
  79. """从 writer.path 反推 Hive 表名。
  80. path 形如 /user/hive/warehouse/raw.db/{table_name}/dt=${dt}/
  81. 末段必须是 dt=... 占位,倒数第二段即表名。
  82. """
  83. p = writer_path.rstrip('/')
  84. parts = p.rsplit('/', 1)
  85. if len(parts) != 2 or not parts[1].startswith('dt='):
  86. raise ValueError(
  87. 'writer.path 末段必须是 dt=...,无法反推表名: ' + writer_path)
  88. return parts[0].rsplit('/', 1)[-1]
  89. def fetch_column_comments(ds_ref, schema, table):
  90. """连 PG 拿 schema.table 的 {字段名: 中文注释} dict。
  91. 复用 sync-template-gen 的 datasource 解析与 pg_catalog 查询,不另起一套。
  92. """
  93. rows = _fetch_pg_column_rows(ds_ref, schema, table)
  94. return {name: (comment or '') for _, name, comment, _, _ in rows}
  95. def fetch_column_full_rows(ds_ref, schema, table):
  96. """连 PG 拿 schema.table 的全字段 [(attnum, attname, comment, pg_type, pk_flag), ...]。
  97. ods 渲染需要 pg_type,raw 只用 comment。本函数返回原始 rows 给 ods 用。
  98. """
  99. return _fetch_pg_column_rows(ds_ref, schema, table)
  100. def _fetch_pg_column_rows(ds_ref, schema, table):
  101. ds = SYNC_GEN.resolve_datasource(ds_ref)
  102. ds_dict = ds.parse()
  103. jdbc_url = ds_dict[SYNC_GEN.DS_POSTGRE_SQL_JDBC_URL]
  104. user = ds_dict['username']
  105. password = ds_dict['password']
  106. host, port, database = SYNC_GEN.parse_jdbc_url(jdbc_url)
  107. import pg8000.dbapi
  108. conn = pg8000.dbapi.connect(
  109. host=host, port=port, database=database,
  110. user=user, password=password,
  111. )
  112. try:
  113. return SYNC_GEN.query_columns_full(conn, schema, table)
  114. finally:
  115. conn.close()
  116. def normalize_pg_type(pg_type):
  117. """PG type → 映射 conf 查询用的 normalized key。
  118. 规则:
  119. - 小写 + 去首尾空格
  120. - 去括号参数:'numeric(12,2)' → 'numeric','character varying(64)' → 'character varying'
  121. - 去时区后缀:'timestamp(6) without time zone' → 'timestamp'
  122. """
  123. t = pg_type.lower().strip()
  124. if '(' in t and ')' in t:
  125. before = t[:t.index('(')].strip()
  126. after = t[t.index(')') + 1:].strip()
  127. t = (before + ' ' + after).strip()
  128. for suffix in ('without time zone', 'with time zone'):
  129. if t.endswith(suffix):
  130. t = t[:-len(suffix)].strip()
  131. return t
  132. def load_type_mapping(conf_path):
  133. """读 conf/pg-to-hive-type.ini 的 [mapping] 段,返回 {normalized_pg_type: hive_type}。"""
  134. if not os.path.isfile(conf_path):
  135. raise FileNotFoundError('类型映射 conf 不存在: ' + conf_path)
  136. cp = ConfigParser()
  137. cp.read(conf_path, encoding='utf-8')
  138. if not cp.has_section('mapping'):
  139. raise KeyError('类型映射 conf 缺 [mapping] 段: ' + conf_path)
  140. return dict(cp.items('mapping'))
  141. def map_pg_to_hive(pg_type, type_mapping):
  142. """PG 字段类型映射到 Hive 类型;未命中报错让人显式补规则。"""
  143. key = normalize_pg_type(pg_type)
  144. if key not in type_mapping:
  145. raise KeyError(
  146. "PG 类型 '{}'(normalized '{}')不在 conf/pg-to-hive-type.ini 映射表,"
  147. "需显式补规则".format(pg_type, key))
  148. return type_mapping[key]
  149. def reverse_ods_table_name(raw_table_name):
  150. """raw_xxx → ods_xxx;首段必须是 'raw_'。"""
  151. if not raw_table_name.startswith('raw_'):
  152. raise ValueError("raw 表名首段必须是 'raw_': " + raw_table_name)
  153. return 'ods_' + raw_table_name[len('raw_'):]
  154. def render_raw_ddl(table_name, columns, comment_dict):
  155. """渲染 raw 层 DDL:全字段 STRING + dt STRING 分区 + ORC + EXTERNAL。
  156. 字段顺序严格按 columns(已是 sync ini reader.column 裁剪后顺序);
  157. 字段注释从 comment_dict 按字段名查,缺失留空字符串。
  158. """
  159. today = datetime.now().strftime('%Y-%m-%d')
  160. width = max(len(c) for c in columns) + 4
  161. lines = [
  162. '-- 作者:<TODO>',
  163. '-- 日期:' + today,
  164. '-- 工单:<TODO>',
  165. '-- 目的:<TODO>',
  166. '-- 状态:[待执行]',
  167. '-- 备注:<TODO>',
  168. '',
  169. 'DROP TABLE IF EXISTS raw.' + table_name + ';',
  170. '',
  171. 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.' + table_name + ' (',
  172. ]
  173. last_idx = len(columns) - 1
  174. for i, col in enumerate(columns):
  175. comma = ',' if i < last_idx else ''
  176. comment = comment_dict.get(col, '').replace("'", "''")
  177. lines.append(" {col:<{w}}STRING COMMENT '{comment}'{comma}".format(
  178. col=col, w=width, comma=comma, comment=comment))
  179. lines.extend([
  180. ')',
  181. "COMMENT '<TODO>'",
  182. 'PARTITIONED BY (dt STRING)',
  183. 'STORED AS ORC',
  184. "LOCATION '/user/hive/warehouse/raw.db/" + table_name + "';",
  185. '',
  186. ])
  187. return '\n'.join(lines)
  188. def render_ods_ddl(raw_table_name, columns, full_rows, type_mapping):
  189. """渲染 ods 层 DDL:typed 字段 + is_deleted 归一 + dt 分区 + ORC + EXTERNAL。
  190. full_rows: [(attnum, attname, comment, pg_type, pk_flag), ...] 来自 query_columns_full
  191. columns: sync ini reader.column 裁剪后字段列表(与 full_rows 字段名子集对齐)
  192. type_mapping: load_type_mapping 返回的 {normalized_pg_type: hive_type}
  193. 字段顺序按 columns(不依赖 full_rows attnum);缺类型 / 缺注释报错。
  194. 末尾加 is_deleted BOOLEAN 软删归一字段(注释固定)。
  195. """
  196. today = datetime.now().strftime('%Y-%m-%d')
  197. ods_table_name = reverse_ods_table_name(raw_table_name)
  198. width = max(len(c) for c in columns + ['is_deleted']) + 4
  199. by_name = {r[1]: (r[3], r[2] or '') for r in full_rows}
  200. missing = [c for c in columns if c not in by_name]
  201. if missing:
  202. raise KeyError('reader.column 中字段 PG 元数据缺失: ' + ','.join(missing))
  203. lines = [
  204. '-- 作者:<TODO>',
  205. '-- 日期:' + today,
  206. '-- 工单:<TODO>',
  207. '-- 目的:<TODO>',
  208. '-- 状态:[待执行]',
  209. '-- 备注:<TODO>',
  210. '',
  211. 'DROP TABLE IF EXISTS ods.' + ods_table_name + ';',
  212. '',
  213. 'CREATE EXTERNAL TABLE IF NOT EXISTS ods.' + ods_table_name + ' (',
  214. ]
  215. type_width = max(len(map_pg_to_hive(by_name[c][0], type_mapping)) for c in columns)
  216. type_width = max(type_width, len('BOOLEAN')) + 2
  217. for col in columns:
  218. pg_type, comment = by_name[col]
  219. hive_type = map_pg_to_hive(pg_type, type_mapping)
  220. comment = comment.replace("'", "''")
  221. lines.append(" {col:<{w}}{ht:<{tw}}COMMENT '{c}',".format(
  222. col=col, w=width, ht=hive_type, tw=type_width, c=comment))
  223. lines.append(" {col:<{w}}{ht:<{tw}}COMMENT '软删除归一(CASE WHEN del_* THEN TRUE)'".format(
  224. col='is_deleted', w=width, ht='BOOLEAN', tw=type_width))
  225. lines.extend([
  226. ')',
  227. "COMMENT '<TODO>'",
  228. 'PARTITIONED BY (dt STRING)',
  229. 'STORED AS ORC',
  230. "LOCATION '/user/hive/warehouse/ods.db/" + ods_table_name + "';",
  231. '',
  232. ])
  233. return '\n'.join(lines)
  234. def main():
  235. parser = argparse.ArgumentParser(
  236. prog='hive-ddl-gen',
  237. description='Hive DDL 生成器(raw / ods 双层)',
  238. )
  239. parser.add_argument('-l', required=True, choices=['raw', 'ods'],
  240. metavar='LAYER',
  241. help='层级(raw / ods,必填)')
  242. parser.add_argument('-ini', required=True, metavar='PATH',
  243. help='sync ini 路径(按项目根解析相对路径)')
  244. parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None,
  245. metavar='DIR',
  246. help='输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘 workspace/{yyyymmdd}/;带值额外落盘 <DIR>/)')
  247. args = parser.parse_args()
  248. ini_path = _resolve_to_project_root(args.ini)
  249. spec = parse_sync_ini(ini_path)
  250. table_name = reverse_table_name(spec['writer_path'])
  251. if args.l == 'raw':
  252. comment_dict = fetch_column_comments(
  253. spec['ds_ref'], spec['schema'], spec['table'])
  254. ddl = render_raw_ddl(table_name, spec['columns'], comment_dict)
  255. out_table_name = table_name
  256. else:
  257. full_rows = fetch_column_full_rows(
  258. spec['ds_ref'], spec['schema'], spec['table'])
  259. type_mapping = load_type_mapping(
  260. os.path.join(project_root, 'conf', 'pg-to-hive-type.ini'))
  261. ddl = render_ods_ddl(table_name, spec['columns'], full_rows, type_mapping)
  262. out_table_name = reverse_ods_table_name(table_name)
  263. sys.stdout.write(ddl)
  264. if args.o is not None:
  265. os.makedirs(args.o, exist_ok=True)
  266. out_path = os.path.join(args.o, out_table_name + '_create.sql')
  267. with open(out_path, 'w', encoding='utf-8') as f:
  268. f.write(ddl)
  269. print('已写入: ' + out_path, file=sys.stderr)
  270. if __name__ == '__main__':
  271. main()