#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- """ PG → HDFS DataX sync ini 模板生成器 + raw 建模 metadata 表 + 表探查。 一次跑同时产出三件: 1. PG 表探查段(行数估值 + 锚点字段维护质量 + 软删字段命中),落 md 头部 2. PG 全字段 metadata markdown 表(序号/字段名/中文名/数据类型/主键标识/ 脱敏类型)—— 用于 kb/24 raw 建模文档 3. 全字段 sync ini 模板 —— 开发者按 md 讨论结果手动裁剪字段 / 改 where / 加 [mask] / 调 splitPk / 改 writer.path 表名后缀等,再提交到 jobs/raw/{域}/ CLI: python3 bin/datax-sync-template-gen.py \\ -ds postgresql/prod-hobby \\ -t public.card_group_order_info \\ [-mask-conf ] [-o [DIR]] 参数: -ds 数据源 ref,形如 {db_type}/{env}-{实例简称}(同 sync ini 里 dataSource 字段格式)。暂只支持 postgresql。 -t schema 限定的表名(如 public.card_group_order_info)。 -mask-conf mask 配置 ini 路径({table}.mask.ini,可选)。传入时按配置 剔除 trim 字段 + 渲染 [mask] 段,md 脱敏类型列填好;不传时 全字段输出,md 脱敏类型列空白。**文件不存在直接报错**。 -o 输出目录(可选;任意三态下 stdout 都同时打印 md + ini): - 不传:仅 stdout - 传 -o 不带值:stdout + 落盘 workspace/{yyyymmdd}/{table}.{md,ini} - 传 -o :stdout + 落盘 /{table}.{md,ini} """ import argparse import os import re import sys from configparser import ConfigParser from datetime import datetime project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(project_root) from dw_base.datax.datasources.data_source_factory import DataSourceFactory from dw_base.datax.datax_constants import DS_POSTGRE_SQL_JDBC_URL WORKSPACE_DEFAULT = os.path.join( project_root, 'workspace', datetime.now().strftime('%Y%m%d'), ) # 探查硬编码:增量同步标准锚点字段(推行后端命名标准) ANCHOR_FIELDS = ('create_time', 'update_time') # 抽样上限:TABLESAMPLE SYSTEM(1) 按存储页跳跃后再 LIMIT 截断 PROBE_SAMPLE_LIMIT = 1000 # 小表阈值:reltuples 低于此值直接走全表 count(毫秒级,避免 SYSTEM 抽样在小表上概率失败) PROBE_SMALL_TABLE_THRESHOLD = 100000 def resolve_datasource(ds_ref): """复用 plugin.py:34-42 的 ref → DataSource 解析逻辑。 ds_ref 形如 'postgresql/prod-hobby',首段为 db_type(同父目录名)。 datasource ini 落点:项目同级 ../datasource/{ds_ref}.ini。 """ ds_type = ds_ref.split('/')[0] if ds_type != 'postgresql': raise NotImplementedError('暂只支持 postgresql 数据源,收到: ' + ds_type) ds_file_path = os.path.normpath( os.path.join(project_root, '..', 'datasource', ds_ref + '.ini')) if not os.path.isfile(ds_file_path): raise FileNotFoundError('数据源 ini 不存在: ' + ds_file_path) return DataSourceFactory.get_data_source(ds_type, ds_file_path) def parse_jdbc_url(jdbc_url): """从 jdbc:postgresql://host:port/database 抽 (host, port, database)。""" m = re.match(r'jdbc:postgresql://([^:/]+)(?::(\d+))?/(.+)', jdbc_url) if not m: raise ValueError('无法解析 PG jdbcUrl: ' + jdbc_url) return m.group(1), int(m.group(2) or 5432), m.group(3) def query_columns_full(conn, schema, table): """带序号 / 类型 / 主键标识的全字段 metadata 查询,按 attnum 排序。 返回 [(attnum, attname, comment, pg_type, pk_flag), ...] """ cur = conn.cursor() cur.execute(""" SELECT a.attnum, a.attname, pg_catalog.col_description(a.attrelid, a.attnum), pg_catalog.format_type(a.atttypid, a.atttypmod), CASE WHEN EXISTS ( SELECT 1 FROM pg_index i WHERE i.indrelid = a.attrelid AND i.indisprimary AND a.attnum = ANY(i.indkey) ) THEN 'PK' ELSE '' END FROM pg_catalog.pg_attribute a JOIN pg_catalog.pg_class c ON a.attrelid = c.oid JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = %s AND c.relname = %s AND a.attnum > 0 AND NOT a.attisdropped ORDER BY a.attnum """, (schema, table)) return cur.fetchall() def probe_table(conn, schema, table, full_rows): """对表做行数 + PK + 锚点(主键序范围 / update_time 抽样非空)+ 软删命中。 - 行数:pg_class.reltuples 估值 - PK:单/复合/无 + 是否自增(attidentity + default 表达式 nextval 双判) - create_time 主键序范围:单列自增 PK + create_time 存在;ORDER BY pk ASC/DESC LIMIT 1 - update_time 抽样:TABLESAMPLE SYSTEM(1) LIMIT 1000,非空数 > 0 即视为业务方已启用 (不取最早非空时间——全表 min 在大表上慢;启用日期对决策意义不大, 只关心"是否启用"二元值) - 软删:full_rows 筛 'del' 子串(不区分大小写) """ cur = conn.cursor() cur.execute(""" SELECT c.reltuples::bigint FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = %s AND c.relname = %s """, (schema, table)) row = cur.fetchone() reltuples = int(row[0]) if row and row[0] is not None else 0 pk_cols = [r[1] for r in full_rows if r[4] == 'PK'] pk_auto_increment = False if len(pk_cols) == 1: cur.execute(""" SELECT a.attidentity, pg_get_expr(ad.adbin, ad.adrelid) FROM pg_attribute a LEFT JOIN pg_attrdef ad ON ad.adrelid = a.attrelid AND ad.adnum = a.attnum JOIN pg_class c ON c.oid = a.attrelid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = %s AND c.relname = %s AND a.attname = %s """, (schema, table, pk_cols[0])) r = cur.fetchone() if r: attidentity, default_expr = r[0], r[1] pk_auto_increment = ( attidentity in ('a', 'd') or (default_expr is not None and 'nextval' in default_expr.lower()) ) field_names = {r[1] for r in full_rows} create_exists = 'create_time' in field_names update_exists = 'update_time' in field_names create_earliest = None create_latest = None if pk_auto_increment and create_exists: cur.execute( 'SELECT ' '(SELECT create_time FROM "{s}"."{t}" ORDER BY "{p}" ASC LIMIT 1), ' '(SELECT create_time FROM "{s}"."{t}" ORDER BY "{p}" DESC LIMIT 1)' .format(s=schema, t=table, p=pk_cols[0])) r = cur.fetchone() create_earliest, create_latest = r[0], r[1] update_sample_total = 0 update_sample_notnull = 0 if update_exists: if reltuples < PROBE_SMALL_TABLE_THRESHOLD: # 小表全表 count,避开 SYSTEM 抽样在小表上的概率失败 cur.execute('SELECT count(*), count("update_time") FROM "{s}"."{t}"' .format(s=schema, t=table)) r = cur.fetchone() update_sample_total = int(r[0]) update_sample_notnull = int(r[1]) else: # 大表 SYSTEM(1) LIMIT 1000 抽样,3 次 retry 避开偶发 0 行 for _ in range(3): cur.execute( 'SELECT count(*), count("update_time") FROM ' '(SELECT update_time FROM "{s}"."{t}" ' ' TABLESAMPLE SYSTEM(1) LIMIT {n}) AS sub' .format(s=schema, t=table, n=PROBE_SAMPLE_LIMIT)) r = cur.fetchone() update_sample_total = int(r[0]) update_sample_notnull = int(r[1]) if update_sample_total > 0: break del_candidates = sorted(r[1] for r in full_rows if 'del' in r[1].lower()) return { 'reltuples': reltuples, 'pk_cols': pk_cols, 'pk_auto_increment': pk_auto_increment, 'create_exists': create_exists, 'create_earliest': create_earliest, 'create_latest': create_latest, 'update_exists': update_exists, 'update_sample_total': update_sample_total, 'update_sample_notnull': update_sample_notnull, 'del_candidates': del_candidates, } def render_probe_md(stats): """渲染探查段 markdown。""" lines = ['### 探查', ''] lines.append('- 行数估值(pg_class.reltuples):{:,}'.format(stats['reltuples'])) pk_cols = stats['pk_cols'] if not pk_cols: pk_desc = '无(DataX channel 无法并行)' elif len(pk_cols) > 1: pk_desc = '复合 ({}) (DataX splitPk 不支持复合,退串行)'.format( ', '.join('`{}`'.format(c) for c in pk_cols)) elif stats['pk_auto_increment']: pk_desc = '`{}`(自增)'.format(pk_cols[0]) else: pk_desc = '`{}`(非自增,DataX channel 切分分布可能不均)'.format(pk_cols[0]) lines.append('- 主键:' + pk_desc) lines.append('- 锚点字段:') if not stats['create_exists']: lines.append(' - `create_time`:缺失') elif stats['create_earliest']: lines.append(' - `create_time`:存在;按主键序范围 {} ~ {}'.format( stats['create_earliest'], stats['create_latest'])) else: lines.append(' - `create_time`:存在') if not stats['update_exists']: lines.append(' - `update_time`:缺失') else: nn = stats['update_sample_notnull'] total = stats['update_sample_total'] if total == 0: tail = '抽样无数据' elif nn > 0: tail = '抽样 {} 行 {} 行非空(已启用)'.format(total, nn) else: tail = '抽样 {} 行全 NULL(猜测未启用,需对账)'.format(total) lines.append(' - `update_time`:存在;' + tail) if stats['del_candidates']: lines.append('- 软删字段(含 `del` 子串):' + ', '.join( '`{}`'.format(c) for c in stats['del_candidates'])) else: lines.append('- 软删字段(含 `del` 子串):未命中') return '\n'.join(lines) + '\n' def _resolve_to_project_root(path): """相对路径按项目根解析,绝对路径原样返回。 复用 dw_base.datax.entry._resolve_relative_to_base 的逻辑—— 任何 cwd 跑此脚本都能找到 mask conf 等相对路径资源, 与项目其他 bin 入口(datax-hive-import-starter 等)行为一致。 """ if os.path.isabs(path): return path return os.path.join(project_root, path) def load_mask_conf(path): """读 mask 配置 ini,返回 {field: method} dict。 格式(与 jobs/raw/{域}/{table}.mask.ini 同款): [mask] field1 = method1 field2 = method2 method ∈ trim / md5 / month_trunc / mask_middle / keep_first_n / keep_last_n - trim:整字段不入 raw(reader column 不查询) - 其他:字段入 raw,由 dw_base.datax.mask 在 reader 端脱敏 文件不存在直接 raise FileNotFoundError(不静默失败)。 """ if not os.path.isfile(path): raise FileNotFoundError('mask 配置不存在: ' + path) cp = ConfigParser() cp.read(path, encoding='utf-8') if not cp.has_section('mask'): return {} return dict(cp.items('mask')) def render_schema_md(rows, mask_dict=None): """输出 markdown 表格:序号 / 字段名 / 中文名 / 数据类型 / 主键标识 / 脱敏类型。 mask_dict 不传时脱敏类型列为空白;传入时填字段对应的 method(含 trim)。 """ lines = [ '| 序号 | 字段名 | 中文名 | 数据类型 | 主键标识 | 脱敏类型 |', '| --- | --- | --- | --- | --- | --- |', ] methods = mask_dict or {} for num, name, comment, typ, pk in rows: method = methods.get(name, '') lines.append('| {} | `{}` | {} | {} | {} | {} |'.format( num, name, comment or '', typ, pk, method)) return '\n'.join(lines) + '\n' def render_template(ds_ref, database, schema, table, columns, pk, mask_methods=None): """渲染 sync ini 模板。 columns: [(name, comment), ...] 已剔除 trim 字段,保持 PG 原顺序 mask_methods: {field: method} 仅含非 trim 方法(mask_middle / month_trunc 等), 渲染 [mask] 段;空 dict 或 None 时不渲染 [mask] 段 """ column_str = ','.join(c for c, _ in columns) today = datetime.now().strftime('%Y-%m-%d') if mask_methods: mask_lines = '\n'.join('{} = {}'.format(f, m) for f, m in mask_methods.items()) mask_section = '[mask]\n' + mask_lines + '\n\n' else: mask_section = '' return ( '; 作者:\n' '; 日期:{today}\n' '; 工单:\n' '; 目的:PG {database}.{schema}.{table} → Hive raw. 同步模板\n' '; 状态:[待执行]\n' '; 备注:自动生成的全字段参考模板。开发者按需裁剪字段 / 改 where / 加 mask 段 /\n' '; 调 splitPk / 改 writer.path 表名后缀(_inc_d / _his_o 等)\n' ';\n' '; 配套 DDL:manual/ddl/raw//raw__create.sql\n' '\n' '[reader]\n' 'dataSource = {ds_ref}\n' 'database = {database}\n' 'table = {schema}.{table}\n' 'column = {column_str}\n' 'columnType =\n' "where = update_time >= '${{start_date}}' AND update_time < '${{stop_date}}'\n" 'querySql =\n' 'splitPk = {pk}\n' 'fetchSize = 1000\n' '\n' '{mask_section}' '[writer]\n' 'dataSource = hdfs/\n' 'path = /user/hive/warehouse/raw.db/{table}_TODO_d/dt=${{dt}}/\n' 'column = {column_str}\n' 'columnType =\n' 'fileType = orc\n' 'fileName = {table}_TODO_d\n' 'encoding = UTF-8\n' 'writeMode = truncate\n' 'fieldDelimiter = \\t\n' ).format( today=today, ds_ref=ds_ref, database=database, schema=schema, table=table, column_str=column_str, pk=pk, mask_section=mask_section, ) def main(): parser = argparse.ArgumentParser( prog='datax-sync-template-gen', description='PG → HDFS DataX sync ini 模板生成器(全字段参考模板)', ) parser.add_argument('-ds', required=True, metavar='DS_REF', help='数据源 ref,形如 postgresql/prod-hobby(同 sync ini dataSource 字段)') parser.add_argument('-t', required=True, metavar='SCHEMA.TABLE', help='schema 限定的表名(如 public.card_group_order_info)') parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None, metavar='DIR', help='输出目录(任意三态 stdout 始终打印 md + ini;不传仅 stdout;不带值额外落盘 workspace/{yyyymmdd}/;带值额外落盘 /)') parser.add_argument('-mask-conf', default=None, metavar='PATH', dest='mask_conf', help='mask 配置 ini 路径({table}.mask.ini)。传入时按配置剔除 trim 字段 + 渲染 [mask] 段,md 脱敏类型列填好;不传时全字段输出,md 脱敏类型列空白') args = parser.parse_args() if '.' not in args.t: print('-t 必须 schema.table 格式,收到: ' + args.t, file=sys.stderr) sys.exit(2) schema, table = args.t.split('.', 1) ds = resolve_datasource(args.ds) ds_dict = ds.parse() jdbc_url = ds_dict[DS_POSTGRE_SQL_JDBC_URL] user = ds_dict['username'] password = ds_dict['password'] host, port, database = parse_jdbc_url(jdbc_url) import pg8000.dbapi conn = pg8000.dbapi.connect( host=host, port=port, database=database, user=user, password=password, ) try: full_rows = query_columns_full(conn, schema, table) if not full_rows: raise ValueError('表不存在或无字段: {}.{}'.format(schema, table)) probe_stats = probe_table(conn, schema, table, full_rows) finally: conn.close() # full_rows: [(attnum, attname, comment, pg_type, pk_flag), ...] if args.mask_conf: mask_path = _resolve_to_project_root(args.mask_conf) mask_dict = load_mask_conf(mask_path) else: mask_dict = {} # mask 配置含表中不存在字段时 stderr 警告(不阻断) pg_field_set = {r[1] for r in full_rows} unknown_fields = [f for f in mask_dict if f not in pg_field_set] if unknown_fields: print('警告:mask 配置含表中不存在字段(已忽略): ' + ', '.join(unknown_fields), file=sys.stderr) trim_set = {f for f, m in mask_dict.items() if m == 'trim'} non_trim_mask = {f: m for f, m in mask_dict.items() if m != 'trim'} # 已剔除 trim 字段的 column 列表,保持 PG 原顺序(attnum 升序) columns = [(r[1], r[2] or '') for r in full_rows if r[1] not in trim_set] pk_names = [r[1] for r in full_rows if r[4] == 'PK'] pk = pk_names[0] if len(pk_names) == 1 and pk_names[0] not in trim_set else '' probe_md = render_probe_md(probe_stats) schema_md = render_schema_md(full_rows, mask_dict) md_content = probe_md + '\n### 字段\n\n' + schema_md ini_content = render_template(args.ds, database, schema, table, columns, pk, non_trim_mask) # stdout 始终打印(先 md 表后 ini 模板),传 -o 时再额外落盘 sys.stdout.write(md_content) sys.stdout.write('\n') sys.stdout.write(ini_content) if args.o is not None: os.makedirs(args.o, exist_ok=True) md_path = os.path.join(args.o, table + '.md') ini_path = os.path.join(args.o, table + '.ini') with open(md_path, 'w', encoding='utf-8') as f: f.write(md_content) with open(ini_path, 'w', encoding='utf-8') as f: f.write(ini_content) print('已写入: ' + md_path, file=sys.stderr) print('已写入: ' + ini_path, file=sys.stderr) if __name__ == '__main__': main()