Bläddra i källkod

feat(bin): sync gen 加表探查(行数估值 + 锚点非空率 + 软删字段命中)

- pg_class.reltuples 估行数(避免大表 count(*) 全扫)
- 硬编码探 create_time / update_time 存在性 + 抽样非空率
  (TABLESAMPLE SYSTEM(1) LIMIT 1000,按存储页跳跃,亿级表毫秒级)
  非空率 < 95% 标 ⚠
- 软删字段:attname 含 'del' 子串全部命中(含噪声待人工筛)
- md 输出布局:### 探查 -> ### 字段
- 探查走默认行为,无开关;锚点 / 软删字段名硬编码(推行后端标准)
- 单测后续补

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu 6 dagar sedan
förälder
incheckning
40b5c63b08
1 ändrade filer med 99 tillägg och 6 borttagningar
  1. 99 6
      bin/datax-sync-template-gen.py

+ 99 - 6
bin/datax-sync-template-gen.py

@@ -1,12 +1,13 @@
 #!/usr/bin/env /usr/bin/python3
 # -*- coding:utf-8 -*-
 """
-PG → HDFS DataX sync ini 模板生成器 + raw 建模 metadata 表。
+PG → HDFS DataX sync ini 模板生成器 + raw 建模 metadata 表 + 表探查
 
-一次跑同时产出两件:
-  1. PG 全字段 metadata markdown 表(序号/字段名/中文名/数据类型/主键标识/
-     裁剪类型空列)—— 用于 kb/24 raw 建模文档讨论字段裁剪
-  2. 全字段 sync ini 模板 —— 开发者按 md 讨论结果手动裁剪字段 / 改 where /
+一次跑同时产出三件:
+  1. PG 表探查段(行数估值 + 锚点字段维护质量 + 软删字段命中),落 md 头部
+  2. PG 全字段 metadata markdown 表(序号/字段名/中文名/数据类型/主键标识/
+     脱敏类型)—— 用于 kb/24 raw 建模文档
+  3. 全字段 sync ini 模板 —— 开发者按 md 讨论结果手动裁剪字段 / 改 where /
      加 [mask] / 调 splitPk / 改 writer.path 表名后缀等,再提交到 jobs/raw/{域}/
 
 CLI:
@@ -45,6 +46,13 @@ WORKSPACE_DEFAULT = os.path.join(
     project_root, 'workspace', datetime.now().strftime('%Y%m%d'),
 )
 
+# 探查硬编码:增量同步标准锚点字段(推行后端命名标准)
+ANCHOR_FIELDS = ('create_time', 'update_time')
+# 抽样上限:TABLESAMPLE SYSTEM(1) 按存储页跳跃后再 LIMIT 截断
+PROBE_SAMPLE_LIMIT = 1000
+# 锚点非空率告警阈值:低于此值认为业务方维护不全
+ANCHOR_NOTNULL_WARN_PCT = 95.0
+
 
 def resolve_datasource(ds_ref):
     """复用 plugin.py:34-42 的 ref → DataSource 解析逻辑。
@@ -97,6 +105,88 @@ def query_columns_full(conn, schema, table):
     return cur.fetchall()
 
 
+def probe_table(conn, schema, table, full_rows):
+    """对表做行数估值 + 锚点抽样非空率 + 软删字段命中。
+
+    - 行数:pg_class.reltuples 估值(autovacuum 后准,避免大表 count(*) 全扫)
+    - 锚点字段:硬编码 create_time / update_time,存在性 + 抽样非空率
+      (TABLESAMPLE SYSTEM(1) LIMIT 1000,按存储页跳跃采样,亿级表毫秒级)
+    - 软删字段:从 full_rows 字段名筛 'del' 子串(不区分大小写)
+
+    返回 dict {
+        'reltuples': int,
+        'sample_total': int,
+        'anchor': {col: {'exists': bool, 'notnull': int or None}, ...},
+        'del_candidates': [str, ...],
+    }
+    """
+    cur = conn.cursor()
+
+    cur.execute("""
+        SELECT c.reltuples::bigint
+        FROM pg_catalog.pg_class c
+        JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
+        WHERE n.nspname = %s AND c.relname = %s
+    """, (schema, table))
+    row = cur.fetchone()
+    reltuples = int(row[0]) if row and row[0] is not None else 0
+
+    field_names = {r[1] for r in full_rows}
+    anchor = {col: {'exists': col in field_names, 'notnull': None}
+              for col in ANCHOR_FIELDS}
+
+    sample_total = 0
+    present = [c for c in ANCHOR_FIELDS if anchor[c]['exists']]
+    if present:
+        notnull_select = ', '.join('count("{}")'.format(c) for c in present)
+        sql = (
+            'SELECT count(*), {nn} FROM '
+            '(SELECT * FROM "{schema}"."{table}" '
+            ' TABLESAMPLE SYSTEM(1) LIMIT {lim}) AS sub'
+        ).format(nn=notnull_select, schema=schema, table=table,
+                 lim=PROBE_SAMPLE_LIMIT)
+        cur.execute(sql)
+        result = cur.fetchone()
+        sample_total = int(result[0])
+        for i, c in enumerate(present):
+            anchor[c]['notnull'] = int(result[i + 1])
+
+    del_candidates = sorted(r[1] for r in full_rows if 'del' in r[1].lower())
+
+    return {
+        'reltuples': reltuples,
+        'sample_total': sample_total,
+        'anchor': anchor,
+        'del_candidates': del_candidates,
+    }
+
+
+def render_probe_md(stats):
+    """渲染探查段 markdown。"""
+    lines = ['### 探查', '']
+    lines.append('- 行数估值(pg_class.reltuples):{:,}'.format(stats['reltuples']))
+    lines.append('- 锚点字段:')
+    total = stats['sample_total']
+    for col in ANCHOR_FIELDS:
+        s = stats['anchor'][col]
+        if not s['exists']:
+            lines.append('  - `{}`:**缺失** ⚠'.format(col))
+        elif total > 0 and s['notnull'] is not None:
+            nn = s['notnull']
+            pct = 100.0 * nn / total
+            warn = ' ⚠' if pct < ANCHOR_NOTNULL_WARN_PCT else ''
+            lines.append('  - `{}`:存在;抽样 {} 行非空 {} ({:.1f}%){}'.format(
+                col, total, nn, pct, warn))
+        else:
+            lines.append('  - `{}`:存在;抽样无数据'.format(col))
+    if stats['del_candidates']:
+        lines.append('- 软删字段(含 `del` 子串):' + ', '.join(
+            '`{}`'.format(c) for c in stats['del_candidates']))
+    else:
+        lines.append('- 软删字段(含 `del` 子串):**未命中** ⚠')
+    return '\n'.join(lines) + '\n'
+
+
 def _resolve_to_project_root(path):
     """相对路径按项目根解析,绝对路径原样返回。
 
@@ -240,6 +330,7 @@ def main():
         full_rows = query_columns_full(conn, schema, table)
         if not full_rows:
             raise ValueError('表不存在或无字段: {}.{}'.format(schema, table))
+        probe_stats = probe_table(conn, schema, table, full_rows)
     finally:
         conn.close()
 
@@ -266,7 +357,9 @@ def main():
     pk_names = [r[1] for r in full_rows if r[4] == 'PK']
     pk = pk_names[0] if len(pk_names) == 1 and pk_names[0] not in trim_set else ''
 
-    md_content = render_schema_md(full_rows, mask_dict)
+    probe_md = render_probe_md(probe_stats)
+    schema_md = render_schema_md(full_rows, mask_dict)
+    md_content = probe_md + '\n### 字段\n\n' + schema_md
     ini_content = render_template(args.ds, database, schema, table, columns, pk, non_trim_mask)
 
     # stdout 始终打印(先 md 表后 ini 模板),传 -o 时再额外落盘