hive-ddl-gen.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. Hive DDL 生成器(raw 层;ods 层占位待实施)。
  5. 输入 sync ini,从 PG 抽字段中文注释,按 reader.column 顺序渲染
  6. 全字段 STRING + dt STRING 分区 + ORC + EXTERNAL TABLE,写到 stdout
  7. (传 -o 时额外落盘 {table_name}_create.sql)。
  8. CLI:
  9. python3 bin/hive-ddl-gen.py -l raw -ini jobs/raw/{域}/{table}.ini [-o [DIR]]
  10. 参数:
  11. -l 层级(raw 必填;ods 暂未实施,传入直接 NotImplementedError)
  12. -ini sync ini 路径(按项目根解析相对路径,与项目其他 bin 入口一致)
  13. -o 输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘
  14. workspace/{yyyymmdd}/;带值额外落盘 <DIR>/)
  15. 表名由 writer.path 末两段反推(path 末段必须是 dt=... 占位)。
  16. """
  17. import argparse
  18. import importlib.util
  19. import os
  20. import sys
  21. from configparser import ConfigParser
  22. from datetime import datetime
  23. project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  24. sys.path.append(project_root)
  25. def _load_sync_gen():
  26. """复用 datax-sync-template-gen 的 resolve_datasource / parse_jdbc_url /
  27. query_columns_full / DS_POSTGRE_SQL_JDBC_URL(脚本名含连字符,importlib 加载)。
  28. """
  29. spec = importlib.util.spec_from_file_location(
  30. 'datax_sync_template_gen',
  31. os.path.join(project_root, 'bin', 'datax-sync-template-gen.py'),
  32. )
  33. mod = importlib.util.module_from_spec(spec)
  34. spec.loader.exec_module(mod)
  35. return mod
  36. SYNC_GEN = _load_sync_gen()
  37. WORKSPACE_DEFAULT = os.path.join(
  38. project_root, 'workspace', datetime.now().strftime('%Y%m%d'),
  39. )
  40. def _resolve_to_project_root(path):
  41. if os.path.isabs(path):
  42. return path
  43. return os.path.join(project_root, path)
  44. def parse_sync_ini(path):
  45. """解析 sync ini,提取 raw DDL 渲染所需 5 项。"""
  46. if not os.path.isfile(path):
  47. raise FileNotFoundError('sync ini 不存在: ' + path)
  48. cp = ConfigParser()
  49. cp.read(path, encoding='utf-8')
  50. if not cp.has_section('reader'):
  51. raise KeyError('sync ini 缺 [reader] 段: ' + path)
  52. if not cp.has_section('writer'):
  53. raise KeyError('sync ini 缺 [writer] 段: ' + path)
  54. ds_ref = cp.get('reader', 'dataSource').strip()
  55. schema_table = cp.get('reader', 'table').strip()
  56. columns_str = cp.get('reader', 'column').strip()
  57. writer_path = cp.get('writer', 'path').strip()
  58. if '.' not in schema_table:
  59. raise ValueError('reader.table 必须 schema.table 格式: ' + schema_table)
  60. schema, table = schema_table.split('.', 1)
  61. columns = [c.strip() for c in columns_str.split(',') if c.strip()]
  62. if not columns:
  63. raise ValueError('reader.column 为空')
  64. return {
  65. 'ds_ref': ds_ref,
  66. 'schema': schema,
  67. 'table': table,
  68. 'columns': columns,
  69. 'writer_path': writer_path,
  70. }
  71. def reverse_table_name(writer_path):
  72. """从 writer.path 反推 Hive 表名。
  73. path 形如 /user/hive/warehouse/raw.db/{table_name}/dt=${dt}/
  74. 末段必须是 dt=... 占位,倒数第二段即表名。
  75. """
  76. p = writer_path.rstrip('/')
  77. parts = p.rsplit('/', 1)
  78. if len(parts) != 2 or not parts[1].startswith('dt='):
  79. raise ValueError(
  80. 'writer.path 末段必须是 dt=...,无法反推表名: ' + writer_path)
  81. return parts[0].rsplit('/', 1)[-1]
  82. def fetch_column_comments(ds_ref, schema, table):
  83. """连 PG 拿 schema.table 的 {字段名: 中文注释} dict。
  84. 复用 sync-template-gen 的 datasource 解析与 pg_catalog 查询,不另起一套。
  85. """
  86. ds = SYNC_GEN.resolve_datasource(ds_ref)
  87. ds_dict = ds.parse()
  88. jdbc_url = ds_dict[SYNC_GEN.DS_POSTGRE_SQL_JDBC_URL]
  89. user = ds_dict['username']
  90. password = ds_dict['password']
  91. host, port, database = SYNC_GEN.parse_jdbc_url(jdbc_url)
  92. import pg8000.dbapi
  93. conn = pg8000.dbapi.connect(
  94. host=host, port=port, database=database,
  95. user=user, password=password,
  96. )
  97. try:
  98. rows = SYNC_GEN.query_columns_full(conn, schema, table)
  99. finally:
  100. conn.close()
  101. return {name: (comment or '') for _, name, comment, _, _ in rows}
  102. def render_raw_ddl(table_name, columns, comment_dict):
  103. """渲染 raw 层 DDL:全字段 STRING + dt STRING 分区 + ORC + EXTERNAL。
  104. 字段顺序严格按 columns(已是 sync ini reader.column 裁剪后顺序);
  105. 字段注释从 comment_dict 按字段名查,缺失留空字符串。
  106. """
  107. today = datetime.now().strftime('%Y-%m-%d')
  108. width = max(len(c) for c in columns) + 4
  109. lines = [
  110. '-- 作者:<TODO>',
  111. '-- 日期:' + today,
  112. '-- 工单:<TODO>',
  113. '-- 目的:<TODO>',
  114. '-- 状态:[待执行]',
  115. '-- 备注:<TODO>',
  116. '',
  117. 'DROP TABLE IF EXISTS raw.' + table_name + ';',
  118. '',
  119. 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.' + table_name + ' (',
  120. ]
  121. last_idx = len(columns) - 1
  122. for i, col in enumerate(columns):
  123. comma = ',' if i < last_idx else ''
  124. comment = comment_dict.get(col, '').replace("'", "''")
  125. lines.append(" {col:<{w}}STRING{comma} COMMENT '{comment}'".format(
  126. col=col, w=width, comma=comma, comment=comment))
  127. lines.extend([
  128. ')',
  129. "COMMENT '<TODO>'",
  130. 'PARTITIONED BY (dt STRING)',
  131. 'STORED AS ORC',
  132. "LOCATION '/user/hive/warehouse/raw.db/" + table_name + "';",
  133. '',
  134. ])
  135. return '\n'.join(lines)
  136. def main():
  137. parser = argparse.ArgumentParser(
  138. prog='hive-ddl-gen',
  139. description='Hive DDL 生成器(raw 层;ods 层占位待实施)',
  140. )
  141. parser.add_argument('-l', required=True, choices=['raw', 'ods'],
  142. metavar='LAYER',
  143. help='层级(raw 必填;ods 暂未实施直接报错)')
  144. parser.add_argument('-ini', required=True, metavar='PATH',
  145. help='sync ini 路径(按项目根解析相对路径)')
  146. parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None,
  147. metavar='DIR',
  148. help='输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘 workspace/{yyyymmdd}/;带值额外落盘 <DIR>/)')
  149. args = parser.parse_args()
  150. if args.l == 'ods':
  151. raise NotImplementedError('ods 层 DDL 生成暂未实施(ADR-06)')
  152. ini_path = _resolve_to_project_root(args.ini)
  153. spec = parse_sync_ini(ini_path)
  154. table_name = reverse_table_name(spec['writer_path'])
  155. comment_dict = fetch_column_comments(
  156. spec['ds_ref'], spec['schema'], spec['table'])
  157. ddl = render_raw_ddl(table_name, spec['columns'], comment_dict)
  158. sys.stdout.write(ddl)
  159. if args.o is not None:
  160. os.makedirs(args.o, exist_ok=True)
  161. out_path = os.path.join(args.o, table_name + '_create.sql')
  162. with open(out_path, 'w', encoding='utf-8') as f:
  163. f.write(ddl)
  164. print('已写入: ' + out_path, file=sys.stderr)
  165. if __name__ == '__main__':
  166. main()