datax-sync-template-gen.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. PG → HDFS DataX sync ini 模板生成器 + raw 建模 metadata 表。
  5. 一次跑同时产出两件:
  6. 1. PG 全字段 metadata markdown 表(序号/字段名/中文名/数据类型/主键标识/
  7. 裁剪类型空列)—— 用于 kb/24 raw 建模文档讨论字段裁剪
  8. 2. 全字段 sync ini 模板 —— 开发者按 md 讨论结果手动裁剪字段 / 改 where /
  9. 加 [mask] / 调 splitPk / 改 writer.path 表名后缀等,再提交到 jobs/raw/{域}/
  10. CLI:
  11. python3 bin/datax-sync-template-gen.py \\
  12. -ds postgresql/prod-hobby \\
  13. -t public.card_group_order_info \\
  14. [-o [DIR]]
  15. 参数:
  16. -ds 数据源 ref,形如 {db_type}/{env}-{实例简称}(同 sync ini 里
  17. dataSource 字段格式)。暂只支持 postgresql。
  18. -t schema 限定的表名,形如 schema.table(如 public.card_group_order_info)。
  19. -o 输出目录(可选):
  20. - 不传:stdout 同时打印 md 表 + ini 模板
  21. - 传 -o 不带值:workspace/{yyyymmdd}/,写两个文件 {table}.md + {table}.ini
  22. - 传 -o <DIR>:自定义目录,写两个文件
  23. """
  24. import argparse
  25. import os
  26. import re
  27. import sys
  28. from configparser import ConfigParser
  29. from datetime import datetime
  30. project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  31. sys.path.append(project_root)
  32. from dw_base.datax.datasources.data_source_factory import DataSourceFactory
  33. from dw_base.datax.datax_constants import DS_POSTGRE_SQL_JDBC_URL
  34. WORKSPACE_DEFAULT = os.path.join(
  35. project_root, 'workspace', datetime.now().strftime('%Y%m%d'),
  36. )
  37. def resolve_datasource(ds_ref):
  38. """复用 plugin.py:34-42 的 ref → DataSource 解析逻辑。
  39. ds_ref 形如 'postgresql/prod-hobby',首段为 db_type(同父目录名)。
  40. datasource ini 落点:项目同级 ../datasource/{ds_ref}.ini。
  41. """
  42. ds_type = ds_ref.split('/')[0]
  43. if ds_type != 'postgresql':
  44. raise NotImplementedError('暂只支持 postgresql 数据源,收到: ' + ds_type)
  45. ds_file_path = os.path.normpath(
  46. os.path.join(project_root, '..', 'datasource', ds_ref + '.ini'))
  47. if not os.path.isfile(ds_file_path):
  48. raise FileNotFoundError('数据源 ini 不存在: ' + ds_file_path)
  49. return DataSourceFactory.get_data_source(ds_type, ds_file_path)
  50. def parse_jdbc_url(jdbc_url):
  51. """从 jdbc:postgresql://host:port/database 抽 (host, port, database)。"""
  52. m = re.match(r'jdbc:postgresql://([^:/]+)(?::(\d+))?/(.+)', jdbc_url)
  53. if not m:
  54. raise ValueError('无法解析 PG jdbcUrl: ' + jdbc_url)
  55. return m.group(1), int(m.group(2) or 5432), m.group(3)
  56. def query_columns_full(conn, schema, table):
  57. """带序号 / 类型 / 主键标识的全字段 metadata 查询,按 attnum 排序。
  58. 返回 [(attnum, attname, comment, pg_type, pk_flag), ...]
  59. """
  60. cur = conn.cursor()
  61. cur.execute("""
  62. SELECT
  63. a.attnum,
  64. a.attname,
  65. pg_catalog.col_description(a.attrelid, a.attnum),
  66. pg_catalog.format_type(a.atttypid, a.atttypmod),
  67. CASE WHEN EXISTS (
  68. SELECT 1 FROM pg_index i
  69. WHERE i.indrelid = a.attrelid AND i.indisprimary
  70. AND a.attnum = ANY(i.indkey)
  71. ) THEN 'PK' ELSE '' END
  72. FROM pg_catalog.pg_attribute a
  73. JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
  74. JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
  75. WHERE n.nspname = %s AND c.relname = %s
  76. AND a.attnum > 0 AND NOT a.attisdropped
  77. ORDER BY a.attnum
  78. """, (schema, table))
  79. return cur.fetchall()
  80. def load_mask_conf(path):
  81. """读 mask 配置 ini,返回 {field: method} dict。
  82. 格式(与 jobs/raw/{域}/{table}.mask.ini 同款):
  83. [mask]
  84. field1 = method1
  85. field2 = method2
  86. method ∈ trim / md5 / month_trunc / mask_middle / keep_first_n / keep_last_n
  87. - trim:整字段不入 raw(reader column 不查询)
  88. - 其他:字段入 raw,由 dw_base.datax.mask 在 reader 端脱敏
  89. """
  90. cp = ConfigParser()
  91. cp.read(path, encoding='utf-8')
  92. if not cp.has_section('mask'):
  93. return {}
  94. return dict(cp.items('mask'))
  95. def render_schema_md(rows, mask_dict=None):
  96. """输出 markdown 表格:序号 / 字段名 / 中文名 / 数据类型 / 主键标识 / 脱敏类型。
  97. mask_dict 不传时脱敏类型列为空白;传入时填字段对应的 method(含 trim)。
  98. """
  99. lines = [
  100. '| 序号 | 字段名 | 中文名 | 数据类型 | 主键标识 | 脱敏类型 |',
  101. '| --- | --- | --- | --- | --- | --- |',
  102. ]
  103. methods = mask_dict or {}
  104. for num, name, comment, typ, pk in rows:
  105. method = methods.get(name, '')
  106. lines.append('| {} | `{}` | {} | {} | {} | {} |'.format(
  107. num, name, comment or '', typ, pk, method))
  108. return '\n'.join(lines) + '\n'
  109. def render_template(ds_ref, database, schema, table, columns, pk, mask_methods=None):
  110. """渲染 sync ini 模板。
  111. columns: [(name, comment), ...] 已剔除 trim 字段,保持 PG 原顺序
  112. mask_methods: {field: method} 仅含非 trim 方法(mask_middle / month_trunc 等),
  113. 渲染 [mask] 段;空 dict 或 None 时不渲染 [mask] 段
  114. """
  115. column_str = ','.join(c for c, _ in columns)
  116. today = datetime.now().strftime('%Y-%m-%d')
  117. if mask_methods:
  118. mask_lines = '\n'.join('{} = {}'.format(f, m) for f, m in mask_methods.items())
  119. mask_section = '[mask]\n' + mask_lines + '\n\n'
  120. else:
  121. mask_section = ''
  122. return (
  123. '; 作者:<TODO>\n'
  124. '; 日期:{today}\n'
  125. '; 工单:<TODO>\n'
  126. '; 目的:PG {database}.{schema}.{table} → Hive raw.<TODO> 同步模板\n'
  127. '; 状态:[待执行]\n'
  128. '; 备注:自动生成的全字段参考模板。开发者按需裁剪字段 / 改 where / 加 mask 段 /\n'
  129. '; 调 splitPk / 改 writer.path 表名后缀(_inc_d / _his_o 等)\n'
  130. ';\n'
  131. '; 配套 DDL:manual/ddl/raw/<TODO_domain>/raw_<TODO>_create.sql\n'
  132. '\n'
  133. '[reader]\n'
  134. 'dataSource = {ds_ref}\n'
  135. 'database = {database}\n'
  136. 'table = {schema}.{table}\n'
  137. 'column = {column_str}\n'
  138. 'columnType =\n'
  139. "where = update_time >= '${{start_date}}' AND update_time < '${{stop_date}}'\n"
  140. 'querySql =\n'
  141. 'splitPk = {pk}\n'
  142. 'fetchSize = 1000\n'
  143. '\n'
  144. '{mask_section}'
  145. '[writer]\n'
  146. 'dataSource = hdfs/<TODO>\n'
  147. 'path = /user/hive/warehouse/raw.db/{table}_TODO_d/dt=${{dt}}/\n'
  148. 'column = {column_str}\n'
  149. 'columnType =\n'
  150. 'fileType = orc\n'
  151. 'fileName = {table}_TODO_d\n'
  152. 'encoding = UTF-8\n'
  153. 'writeMode = truncate\n'
  154. 'fieldDelimiter = \\t\n'
  155. ).format(
  156. today=today, ds_ref=ds_ref, database=database, schema=schema,
  157. table=table, column_str=column_str, pk=pk, mask_section=mask_section,
  158. )
  159. def main():
  160. parser = argparse.ArgumentParser(
  161. prog='datax-sync-template-gen',
  162. description='PG → HDFS DataX sync ini 模板生成器(全字段参考模板)',
  163. )
  164. parser.add_argument('-ds', required=True, metavar='DS_REF',
  165. help='数据源 ref,形如 postgresql/prod-hobby(同 sync ini dataSource 字段)')
  166. parser.add_argument('-t', required=True, metavar='SCHEMA.TABLE',
  167. help='schema 限定的表名(如 public.card_group_order_info)')
  168. parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None, metavar='DIR',
  169. help='输出目录(不传 stdout 同时打印 md 表 + ini 模板;不带值 workspace/{yyyymmdd}/ 写两文件;带值自定义目录写两文件)')
  170. parser.add_argument('--mask-conf', default=None, metavar='PATH',
  171. help='mask 配置 ini 路径({table}.mask.ini)。传入时按配置剔除 trim 字段 + 渲染 [mask] 段,md 脱敏类型列填好;不传时全字段输出,md 脱敏类型列空白')
  172. args = parser.parse_args()
  173. if '.' not in args.t:
  174. print('-t 必须 schema.table 格式,收到: ' + args.t, file=sys.stderr)
  175. sys.exit(2)
  176. schema, table = args.t.split('.', 1)
  177. ds = resolve_datasource(args.ds)
  178. ds_dict = ds.parse()
  179. jdbc_url = ds_dict[DS_POSTGRE_SQL_JDBC_URL]
  180. user = ds_dict['username']
  181. password = ds_dict['password']
  182. host, port, database = parse_jdbc_url(jdbc_url)
  183. import pg8000.dbapi
  184. conn = pg8000.dbapi.connect(
  185. host=host, port=port, database=database,
  186. user=user, password=password,
  187. )
  188. try:
  189. full_rows = query_columns_full(conn, schema, table)
  190. if not full_rows:
  191. raise ValueError('表不存在或无字段: {}.{}'.format(schema, table))
  192. finally:
  193. conn.close()
  194. # full_rows: [(attnum, attname, comment, pg_type, pk_flag), ...]
  195. mask_dict = load_mask_conf(args.mask_conf) if args.mask_conf else {}
  196. trim_set = {f for f, m in mask_dict.items() if m == 'trim'}
  197. non_trim_mask = {f: m for f, m in mask_dict.items() if m != 'trim'}
  198. # 已剔除 trim 字段的 column 列表,保持 PG 原顺序(attnum 升序)
  199. columns = [(r[1], r[2] or '') for r in full_rows if r[1] not in trim_set]
  200. pk_names = [r[1] for r in full_rows if r[4] == 'PK']
  201. pk = pk_names[0] if len(pk_names) == 1 and pk_names[0] not in trim_set else ''
  202. md_content = render_schema_md(full_rows, mask_dict)
  203. ini_content = render_template(args.ds, database, schema, table, columns, pk, non_trim_mask)
  204. if args.o is None:
  205. # stdout:先 md 表后 ini 模板
  206. sys.stdout.write(md_content)
  207. sys.stdout.write('\n')
  208. sys.stdout.write(ini_content)
  209. else:
  210. os.makedirs(args.o, exist_ok=True)
  211. md_path = os.path.join(args.o, table + '.md')
  212. ini_path = os.path.join(args.o, table + '.ini')
  213. with open(md_path, 'w', encoding='utf-8') as f:
  214. f.write(md_content)
  215. with open(ini_path, 'w', encoding='utf-8') as f:
  216. f.write(ini_content)
  217. print('已写入: ' + md_path, file=sys.stderr)
  218. print('已写入: ' + ini_path, file=sys.stderr)
  219. if __name__ == '__main__':
  220. main()