datax-sync-template-gen.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. PG → HDFS DataX sync ini 模板生成器 + raw 建模 metadata 表 + 表探查。
  5. 一次跑同时产出三件:
  6. 1. PG 表探查段(行数估值 + 锚点字段维护质量 + 软删字段命中),落 md 头部
  7. 2. PG 全字段 metadata markdown 表(序号/字段名/中文名/数据类型/主键标识/
  8. 脱敏类型)—— 用于 kb/24 raw 建模文档
  9. 3. 全字段 sync ini 模板 —— 开发者按 md 讨论结果手动裁剪字段 / 改 where /
  10. 加 [mask] / 调 splitPk / 改 writer.path 表名后缀等,再提交到 jobs/raw/{域}/
  11. CLI:
  12. python3 bin/datax-sync-template-gen.py \\
  13. -ds postgresql/prod-hobby \\
  14. -t public.card_group_order_info \\
  15. [-mask-conf <PATH>] [-o [DIR]]
  16. 参数:
  17. -ds 数据源 ref,形如 {db_type}/{env}-{实例简称}(同 sync ini
  18. 里 dataSource 字段格式)。暂只支持 postgresql。
  19. -t schema 限定的表名(如 public.card_group_order_info)。
  20. -mask-conf mask 配置 ini 路径({table}.mask.ini,可选)。传入时按配置
  21. 剔除 trim 字段 + 渲染 [mask] 段,md 脱敏类型列填好;不传时
  22. 全字段输出,md 脱敏类型列空白。**文件不存在直接报错**。
  23. -o 输出目录(可选;任意三态下 stdout 都同时打印 md + ini):
  24. - 不传:仅 stdout
  25. - 传 -o 不带值:stdout + 落盘 workspace/{yyyymmdd}/{table}.{md,ini}
  26. - 传 -o <DIR>:stdout + 落盘 <DIR>/{table}.{md,ini}
  27. """
  28. import argparse
  29. import os
  30. import re
  31. import sys
  32. from configparser import ConfigParser
  33. from datetime import datetime
  34. project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  35. sys.path.append(project_root)
  36. from dw_base.datax.datasources.data_source_factory import DataSourceFactory
  37. from dw_base.datax.datax_constants import DS_POSTGRE_SQL_JDBC_URL
  38. WORKSPACE_DEFAULT = os.path.join(
  39. project_root, 'workspace', datetime.now().strftime('%Y%m%d'),
  40. )
  41. # 探查硬编码:增量同步标准锚点字段(推行后端命名标准)
  42. ANCHOR_FIELDS = ('create_time', 'update_time')
  43. # 抽样上限:TABLESAMPLE SYSTEM(1) 按存储页跳跃后再 LIMIT 截断
  44. PROBE_SAMPLE_LIMIT = 1000
  45. def resolve_datasource(ds_ref):
  46. """复用 plugin.py:34-42 的 ref → DataSource 解析逻辑。
  47. ds_ref 形如 'postgresql/prod-hobby',首段为 db_type(同父目录名)。
  48. datasource ini 落点:项目同级 ../datasource/{ds_ref}.ini。
  49. """
  50. ds_type = ds_ref.split('/')[0]
  51. if ds_type != 'postgresql':
  52. raise NotImplementedError('暂只支持 postgresql 数据源,收到: ' + ds_type)
  53. ds_file_path = os.path.normpath(
  54. os.path.join(project_root, '..', 'datasource', ds_ref + '.ini'))
  55. if not os.path.isfile(ds_file_path):
  56. raise FileNotFoundError('数据源 ini 不存在: ' + ds_file_path)
  57. return DataSourceFactory.get_data_source(ds_type, ds_file_path)
  58. def parse_jdbc_url(jdbc_url):
  59. """从 jdbc:postgresql://host:port/database 抽 (host, port, database)。"""
  60. m = re.match(r'jdbc:postgresql://([^:/]+)(?::(\d+))?/(.+)', jdbc_url)
  61. if not m:
  62. raise ValueError('无法解析 PG jdbcUrl: ' + jdbc_url)
  63. return m.group(1), int(m.group(2) or 5432), m.group(3)
  64. def query_columns_full(conn, schema, table):
  65. """带序号 / 类型 / 主键标识的全字段 metadata 查询,按 attnum 排序。
  66. 返回 [(attnum, attname, comment, pg_type, pk_flag), ...]
  67. """
  68. cur = conn.cursor()
  69. cur.execute("""
  70. SELECT
  71. a.attnum,
  72. a.attname,
  73. pg_catalog.col_description(a.attrelid, a.attnum),
  74. pg_catalog.format_type(a.atttypid, a.atttypmod),
  75. CASE WHEN EXISTS (
  76. SELECT 1 FROM pg_index i
  77. WHERE i.indrelid = a.attrelid AND i.indisprimary
  78. AND a.attnum = ANY(i.indkey)
  79. ) THEN 'PK' ELSE '' END
  80. FROM pg_catalog.pg_attribute a
  81. JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
  82. JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
  83. WHERE n.nspname = %s AND c.relname = %s
  84. AND a.attnum > 0 AND NOT a.attisdropped
  85. ORDER BY a.attnum
  86. """, (schema, table))
  87. return cur.fetchall()
  88. def probe_table(conn, schema, table, full_rows):
  89. """对表做行数估值 + PK + 锚点抽样 + 近期 update_time + 主键序时间范围 + 软删命中。
  90. - 行数:pg_class.reltuples 估值
  91. - PK:单/复合/无 + 是否自增(pg_get_serial_sequence)
  92. - 锚点:create_time / update_time 存在性 + 抽样非空率(TABLESAMPLE SYSTEM(1) LIMIT 1000)
  93. - 近期 update_time 非空率:仅当单 PK + update_time 存在;ORDER BY pk DESC LIMIT 1000
  94. - create_time 主键序范围:仅当单列自增 PK + create_time 存在;ORDER BY pk ASC/DESC LIMIT 1
  95. - 软删:full_rows 筛 'del' 子串(不区分大小写)
  96. 返回 dict 见 render_probe_md 引用字段。
  97. """
  98. cur = conn.cursor()
  99. cur.execute("""
  100. SELECT c.reltuples::bigint
  101. FROM pg_catalog.pg_class c
  102. JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
  103. WHERE n.nspname = %s AND c.relname = %s
  104. """, (schema, table))
  105. row = cur.fetchone()
  106. reltuples = int(row[0]) if row and row[0] is not None else 0
  107. pk_cols = [r[1] for r in full_rows if r[4] == 'PK']
  108. pk_auto_increment = False
  109. if len(pk_cols) == 1:
  110. # pg_get_serial_sequence 只识别 OWNED BY 关联的 sequence——
  111. # 业务库手工建的 sequence 没 OWNED BY 标记会漏判,所以同时查 attidentity
  112. # (PG 10+ IDENTITY 列)和 default 表达式(含 nextval 即视为自增)。
  113. cur.execute("""
  114. SELECT a.attidentity, pg_get_expr(ad.adbin, ad.adrelid)
  115. FROM pg_attribute a
  116. LEFT JOIN pg_attrdef ad ON ad.adrelid = a.attrelid AND ad.adnum = a.attnum
  117. JOIN pg_class c ON c.oid = a.attrelid
  118. JOIN pg_namespace n ON n.oid = c.relnamespace
  119. WHERE n.nspname = %s AND c.relname = %s AND a.attname = %s
  120. """, (schema, table, pk_cols[0]))
  121. r = cur.fetchone()
  122. if r:
  123. attidentity, default_expr = r[0], r[1]
  124. pk_auto_increment = (
  125. attidentity in ('a', 'd')
  126. or (default_expr is not None and 'nextval' in default_expr.lower())
  127. )
  128. field_names = {r[1] for r in full_rows}
  129. anchor = {col: {'exists': col in field_names, 'notnull': None}
  130. for col in ANCHOR_FIELDS}
  131. sample_total = 0
  132. present = [c for c in ANCHOR_FIELDS if anchor[c]['exists']]
  133. if present:
  134. notnull_select = ', '.join('count("{}")'.format(c) for c in present)
  135. sql = (
  136. 'SELECT count(*), {nn} FROM '
  137. '(SELECT * FROM "{schema}"."{table}" '
  138. ' TABLESAMPLE SYSTEM(1) LIMIT {lim}) AS sub'
  139. ).format(nn=notnull_select, schema=schema, table=table,
  140. lim=PROBE_SAMPLE_LIMIT)
  141. cur.execute(sql)
  142. result = cur.fetchone()
  143. sample_total = int(result[0])
  144. for i, c in enumerate(present):
  145. anchor[c]['notnull'] = int(result[i + 1])
  146. recent_total = None
  147. recent_update_notnull = None
  148. if len(pk_cols) == 1 and anchor['update_time']['exists']:
  149. sql = (
  150. 'SELECT count(*), count("update_time") FROM '
  151. '(SELECT update_time FROM "{schema}"."{table}" '
  152. ' ORDER BY "{pk}" DESC LIMIT {lim}) AS sub'
  153. ).format(schema=schema, table=table, pk=pk_cols[0],
  154. lim=PROBE_SAMPLE_LIMIT)
  155. cur.execute(sql)
  156. result = cur.fetchone()
  157. recent_total = int(result[0])
  158. recent_update_notnull = int(result[1])
  159. create_time_earliest = None
  160. create_time_latest = None
  161. if pk_auto_increment and anchor['create_time']['exists']:
  162. sql = (
  163. 'SELECT '
  164. '(SELECT create_time FROM "{schema}"."{table}" '
  165. ' ORDER BY "{pk}" ASC LIMIT 1), '
  166. '(SELECT create_time FROM "{schema}"."{table}" '
  167. ' ORDER BY "{pk}" DESC LIMIT 1)'
  168. ).format(schema=schema, table=table, pk=pk_cols[0])
  169. cur.execute(sql)
  170. result = cur.fetchone()
  171. create_time_earliest = result[0]
  172. create_time_latest = result[1]
  173. del_candidates = sorted(r[1] for r in full_rows if 'del' in r[1].lower())
  174. return {
  175. 'reltuples': reltuples,
  176. 'pk_cols': pk_cols,
  177. 'pk_auto_increment': pk_auto_increment,
  178. 'sample_total': sample_total,
  179. 'anchor': anchor,
  180. 'recent_total': recent_total,
  181. 'recent_update_notnull': recent_update_notnull,
  182. 'create_time_earliest': create_time_earliest,
  183. 'create_time_latest': create_time_latest,
  184. 'del_candidates': del_candidates,
  185. }
  186. def render_probe_md(stats):
  187. """渲染探查段 markdown。"""
  188. lines = ['### 探查', '']
  189. lines.append('- 行数估值(pg_class.reltuples):{:,}'.format(stats['reltuples']))
  190. pk_cols = stats['pk_cols']
  191. if not pk_cols:
  192. pk_desc = '无(DataX channel 无法并行)'
  193. elif len(pk_cols) > 1:
  194. pk_desc = '复合 ({}) (DataX splitPk 不支持复合,退串行)'.format(
  195. ', '.join('`{}`'.format(c) for c in pk_cols))
  196. elif stats['pk_auto_increment']:
  197. pk_desc = '`{}`(自增)'.format(pk_cols[0])
  198. else:
  199. pk_desc = '`{}`(非自增,DataX channel 切分分布可能不均)'.format(pk_cols[0])
  200. lines.append('- 主键:' + pk_desc)
  201. lines.append('- 锚点字段:')
  202. total = stats['sample_total']
  203. for col in ANCHOR_FIELDS:
  204. s = stats['anchor'][col]
  205. if not s['exists']:
  206. lines.append(' - `{}`:缺失'.format(col))
  207. continue
  208. if total > 0 and s['notnull'] is not None:
  209. pct = 100.0 * s['notnull'] / total
  210. base = '`{}`:存在;整体非空率 {:.1f}% ({}/{} 抽样)'.format(
  211. col, pct, s['notnull'], total)
  212. else:
  213. base = '`{}`:存在;抽样无数据'.format(col)
  214. if col == 'create_time' and stats['create_time_earliest']:
  215. base += ';按主键序范围 {} ~ {}'.format(
  216. stats['create_time_earliest'], stats['create_time_latest'])
  217. lines.append(' - ' + base)
  218. if col == 'update_time' and stats['recent_total'] is not None:
  219. rt = stats['recent_total']
  220. rnn = stats['recent_update_notnull']
  221. rpct = 100.0 * rnn / rt if rt else 0.0
  222. lines.append(' - 近期非空率 {:.1f}% ({}/{} 最近 1000 行)'.format(
  223. rpct, rnn, rt))
  224. if stats['del_candidates']:
  225. lines.append('- 软删字段(含 `del` 子串):' + ', '.join(
  226. '`{}`'.format(c) for c in stats['del_candidates']))
  227. else:
  228. lines.append('- 软删字段(含 `del` 子串):未命中')
  229. return '\n'.join(lines) + '\n'
  230. def _resolve_to_project_root(path):
  231. """相对路径按项目根解析,绝对路径原样返回。
  232. 复用 dw_base.datax.entry._resolve_relative_to_base 的逻辑——
  233. 任何 cwd 跑此脚本都能找到 mask conf 等相对路径资源,
  234. 与项目其他 bin 入口(datax-hive-import-starter 等)行为一致。
  235. """
  236. if os.path.isabs(path):
  237. return path
  238. return os.path.join(project_root, path)
  239. def load_mask_conf(path):
  240. """读 mask 配置 ini,返回 {field: method} dict。
  241. 格式(与 jobs/raw/{域}/{table}.mask.ini 同款):
  242. [mask]
  243. field1 = method1
  244. field2 = method2
  245. method ∈ trim / md5 / month_trunc / mask_middle / keep_first_n / keep_last_n
  246. - trim:整字段不入 raw(reader column 不查询)
  247. - 其他:字段入 raw,由 dw_base.datax.mask 在 reader 端脱敏
  248. 文件不存在直接 raise FileNotFoundError(不静默失败)。
  249. """
  250. if not os.path.isfile(path):
  251. raise FileNotFoundError('mask 配置不存在: ' + path)
  252. cp = ConfigParser()
  253. cp.read(path, encoding='utf-8')
  254. if not cp.has_section('mask'):
  255. return {}
  256. return dict(cp.items('mask'))
  257. def render_schema_md(rows, mask_dict=None):
  258. """输出 markdown 表格:序号 / 字段名 / 中文名 / 数据类型 / 主键标识 / 脱敏类型。
  259. mask_dict 不传时脱敏类型列为空白;传入时填字段对应的 method(含 trim)。
  260. """
  261. lines = [
  262. '| 序号 | 字段名 | 中文名 | 数据类型 | 主键标识 | 脱敏类型 |',
  263. '| --- | --- | --- | --- | --- | --- |',
  264. ]
  265. methods = mask_dict or {}
  266. for num, name, comment, typ, pk in rows:
  267. method = methods.get(name, '')
  268. lines.append('| {} | `{}` | {} | {} | {} | {} |'.format(
  269. num, name, comment or '', typ, pk, method))
  270. return '\n'.join(lines) + '\n'
  271. def render_template(ds_ref, database, schema, table, columns, pk, mask_methods=None):
  272. """渲染 sync ini 模板。
  273. columns: [(name, comment), ...] 已剔除 trim 字段,保持 PG 原顺序
  274. mask_methods: {field: method} 仅含非 trim 方法(mask_middle / month_trunc 等),
  275. 渲染 [mask] 段;空 dict 或 None 时不渲染 [mask] 段
  276. """
  277. column_str = ','.join(c for c, _ in columns)
  278. today = datetime.now().strftime('%Y-%m-%d')
  279. if mask_methods:
  280. mask_lines = '\n'.join('{} = {}'.format(f, m) for f, m in mask_methods.items())
  281. mask_section = '[mask]\n' + mask_lines + '\n\n'
  282. else:
  283. mask_section = ''
  284. return (
  285. '; 作者:<TODO>\n'
  286. '; 日期:{today}\n'
  287. '; 工单:<TODO>\n'
  288. '; 目的:PG {database}.{schema}.{table} → Hive raw.<TODO> 同步模板\n'
  289. '; 状态:[待执行]\n'
  290. '; 备注:自动生成的全字段参考模板。开发者按需裁剪字段 / 改 where / 加 mask 段 /\n'
  291. '; 调 splitPk / 改 writer.path 表名后缀(_inc_d / _his_o 等)\n'
  292. ';\n'
  293. '; 配套 DDL:manual/ddl/raw/<TODO_domain>/raw_<TODO>_create.sql\n'
  294. '\n'
  295. '[reader]\n'
  296. 'dataSource = {ds_ref}\n'
  297. 'database = {database}\n'
  298. 'table = {schema}.{table}\n'
  299. 'column = {column_str}\n'
  300. 'columnType =\n'
  301. "where = update_time >= '${{start_date}}' AND update_time < '${{stop_date}}'\n"
  302. 'querySql =\n'
  303. 'splitPk = {pk}\n'
  304. 'fetchSize = 1000\n'
  305. '\n'
  306. '{mask_section}'
  307. '[writer]\n'
  308. 'dataSource = hdfs/<TODO>\n'
  309. 'path = /user/hive/warehouse/raw.db/{table}_TODO_d/dt=${{dt}}/\n'
  310. 'column = {column_str}\n'
  311. 'columnType =\n'
  312. 'fileType = orc\n'
  313. 'fileName = {table}_TODO_d\n'
  314. 'encoding = UTF-8\n'
  315. 'writeMode = truncate\n'
  316. 'fieldDelimiter = \\t\n'
  317. ).format(
  318. today=today, ds_ref=ds_ref, database=database, schema=schema,
  319. table=table, column_str=column_str, pk=pk, mask_section=mask_section,
  320. )
  321. def main():
  322. parser = argparse.ArgumentParser(
  323. prog='datax-sync-template-gen',
  324. description='PG → HDFS DataX sync ini 模板生成器(全字段参考模板)',
  325. )
  326. parser.add_argument('-ds', required=True, metavar='DS_REF',
  327. help='数据源 ref,形如 postgresql/prod-hobby(同 sync ini dataSource 字段)')
  328. parser.add_argument('-t', required=True, metavar='SCHEMA.TABLE',
  329. help='schema 限定的表名(如 public.card_group_order_info)')
  330. parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None, metavar='DIR',
  331. help='输出目录(任意三态 stdout 始终打印 md + ini;不传仅 stdout;不带值额外落盘 workspace/{yyyymmdd}/;带值额外落盘 <DIR>/)')
  332. parser.add_argument('-mask-conf', default=None, metavar='PATH', dest='mask_conf',
  333. help='mask 配置 ini 路径({table}.mask.ini)。传入时按配置剔除 trim 字段 + 渲染 [mask] 段,md 脱敏类型列填好;不传时全字段输出,md 脱敏类型列空白')
  334. args = parser.parse_args()
  335. if '.' not in args.t:
  336. print('-t 必须 schema.table 格式,收到: ' + args.t, file=sys.stderr)
  337. sys.exit(2)
  338. schema, table = args.t.split('.', 1)
  339. ds = resolve_datasource(args.ds)
  340. ds_dict = ds.parse()
  341. jdbc_url = ds_dict[DS_POSTGRE_SQL_JDBC_URL]
  342. user = ds_dict['username']
  343. password = ds_dict['password']
  344. host, port, database = parse_jdbc_url(jdbc_url)
  345. import pg8000.dbapi
  346. conn = pg8000.dbapi.connect(
  347. host=host, port=port, database=database,
  348. user=user, password=password,
  349. )
  350. try:
  351. full_rows = query_columns_full(conn, schema, table)
  352. if not full_rows:
  353. raise ValueError('表不存在或无字段: {}.{}'.format(schema, table))
  354. probe_stats = probe_table(conn, schema, table, full_rows)
  355. finally:
  356. conn.close()
  357. # full_rows: [(attnum, attname, comment, pg_type, pk_flag), ...]
  358. if args.mask_conf:
  359. mask_path = _resolve_to_project_root(args.mask_conf)
  360. mask_dict = load_mask_conf(mask_path)
  361. else:
  362. mask_dict = {}
  363. # mask 配置含表中不存在字段时 stderr 警告(不阻断)
  364. pg_field_set = {r[1] for r in full_rows}
  365. unknown_fields = [f for f in mask_dict if f not in pg_field_set]
  366. if unknown_fields:
  367. print('警告:mask 配置含表中不存在字段(已忽略): ' + ', '.join(unknown_fields),
  368. file=sys.stderr)
  369. trim_set = {f for f, m in mask_dict.items() if m == 'trim'}
  370. non_trim_mask = {f: m for f, m in mask_dict.items() if m != 'trim'}
  371. # 已剔除 trim 字段的 column 列表,保持 PG 原顺序(attnum 升序)
  372. columns = [(r[1], r[2] or '') for r in full_rows if r[1] not in trim_set]
  373. pk_names = [r[1] for r in full_rows if r[4] == 'PK']
  374. pk = pk_names[0] if len(pk_names) == 1 and pk_names[0] not in trim_set else ''
  375. probe_md = render_probe_md(probe_stats)
  376. schema_md = render_schema_md(full_rows, mask_dict)
  377. md_content = probe_md + '\n### 字段\n\n' + schema_md
  378. ini_content = render_template(args.ds, database, schema, table, columns, pk, non_trim_mask)
  379. # stdout 始终打印(先 md 表后 ini 模板),传 -o 时再额外落盘
  380. sys.stdout.write(md_content)
  381. sys.stdout.write('\n')
  382. sys.stdout.write(ini_content)
  383. if args.o is not None:
  384. os.makedirs(args.o, exist_ok=True)
  385. md_path = os.path.join(args.o, table + '.md')
  386. ini_path = os.path.join(args.o, table + '.ini')
  387. with open(md_path, 'w', encoding='utf-8') as f:
  388. f.write(md_content)
  389. with open(ini_path, 'w', encoding='utf-8') as f:
  390. f.write(ini_content)
  391. print('已写入: ' + md_path, file=sys.stderr)
  392. print('已写入: ' + ini_path, file=sys.stderr)
  393. if __name__ == '__main__':
  394. main()