Bladeren bron

feat(bin): sync gen 探查加 PK + 近期 update_time + 主键序时间范围

- PK 探查:单/复合/无 + 是否自增(attidentity + default 表达式 nextval 双判,
  避开 pg_get_serial_sequence 对未 OWNED BY 的 sequence 漏判)
- update_time 近期非空率:单 PK 时按 ORDER BY pk DESC LIMIT 1000 取最近行计数
- create_time 主键序时间范围:单列自增 PK 时按 ORDER BY pk ASC/DESC LIMIT 1
  取首末行 create_time,作为 backfill 范围参考(PK 索引 O(log N))
- 去掉 ⚠ 自动告警(脚本无法区分流水表与维护不全,由评审按业务语义判断)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu 6 dagen geleden
bovenliggende
commit
64cc6ef7a2
1 gewijzigde bestanden met toevoegingen van 99 en 24 verwijderingen
  1. 99 24
      bin/datax-sync-template-gen.py

+ 99 - 24
bin/datax-sync-template-gen.py

@@ -50,8 +50,6 @@ WORKSPACE_DEFAULT = os.path.join(
 ANCHOR_FIELDS = ('create_time', 'update_time')
 # 抽样上限:TABLESAMPLE SYSTEM(1) 按存储页跳跃后再 LIMIT 截断
 PROBE_SAMPLE_LIMIT = 1000
-# 锚点非空率告警阈值:低于此值认为业务方维护不全
-ANCHOR_NOTNULL_WARN_PCT = 95.0
 
 
 def resolve_datasource(ds_ref):
@@ -106,19 +104,16 @@ def query_columns_full(conn, schema, table):
 
 
 def probe_table(conn, schema, table, full_rows):
-    """对表做行数估值 + 锚点抽样非空率 + 软删字段命中。
-
-    - 行数:pg_class.reltuples 估值(autovacuum 后准,避免大表 count(*) 全扫)
-    - 锚点字段:硬编码 create_time / update_time,存在性 + 抽样非空率
-      (TABLESAMPLE SYSTEM(1) LIMIT 1000,按存储页跳跃采样,亿级表毫秒级)
-    - 软删字段:从 full_rows 字段名筛 'del' 子串(不区分大小写)
-
-    返回 dict {
-        'reltuples': int,
-        'sample_total': int,
-        'anchor': {col: {'exists': bool, 'notnull': int or None}, ...},
-        'del_candidates': [str, ...],
-    }
+    """对表做行数估值 + PK + 锚点抽样 + 近期 update_time + 主键序时间范围 + 软删命中。
+
+    - 行数:pg_class.reltuples 估值
+    - PK:单/复合/无 + 是否自增(pg_get_serial_sequence)
+    - 锚点:create_time / update_time 存在性 + 抽样非空率(TABLESAMPLE SYSTEM(1) LIMIT 1000)
+    - 近期 update_time 非空率:仅当单 PK + update_time 存在;ORDER BY pk DESC LIMIT 1000
+    - create_time 主键序范围:仅当单列自增 PK + create_time 存在;ORDER BY pk ASC/DESC LIMIT 1
+    - 软删:full_rows 筛 'del' 子串(不区分大小写)
+
+    返回 dict 见 render_probe_md 引用字段。
     """
     cur = conn.cursor()
 
@@ -131,6 +126,28 @@ def probe_table(conn, schema, table, full_rows):
     row = cur.fetchone()
     reltuples = int(row[0]) if row and row[0] is not None else 0
 
+    pk_cols = [r[1] for r in full_rows if r[4] == 'PK']
+    pk_auto_increment = False
+    if len(pk_cols) == 1:
+        # pg_get_serial_sequence 只识别 OWNED BY 关联的 sequence——
+        # 业务库手工建的 sequence 没 OWNED BY 标记会漏判,所以同时查 attidentity
+        # (PG 10+ IDENTITY 列)和 default 表达式(含 nextval 即视为自增)。
+        cur.execute("""
+            SELECT a.attidentity, pg_get_expr(ad.adbin, ad.adrelid)
+            FROM pg_attribute a
+            LEFT JOIN pg_attrdef ad ON ad.adrelid = a.attrelid AND ad.adnum = a.attnum
+            JOIN pg_class c ON c.oid = a.attrelid
+            JOIN pg_namespace n ON n.oid = c.relnamespace
+            WHERE n.nspname = %s AND c.relname = %s AND a.attname = %s
+        """, (schema, table, pk_cols[0]))
+        r = cur.fetchone()
+        if r:
+            attidentity, default_expr = r[0], r[1]
+            pk_auto_increment = (
+                attidentity in ('a', 'd')
+                or (default_expr is not None and 'nextval' in default_expr.lower())
+            )
+
     field_names = {r[1] for r in full_rows}
     anchor = {col: {'exists': col in field_names, 'notnull': None}
               for col in ANCHOR_FIELDS}
@@ -151,12 +168,47 @@ def probe_table(conn, schema, table, full_rows):
         for i, c in enumerate(present):
             anchor[c]['notnull'] = int(result[i + 1])
 
+    recent_total = None
+    recent_update_notnull = None
+    if len(pk_cols) == 1 and anchor['update_time']['exists']:
+        sql = (
+            'SELECT count(*), count("update_time") FROM '
+            '(SELECT update_time FROM "{schema}"."{table}" '
+            ' ORDER BY "{pk}" DESC LIMIT {lim}) AS sub'
+        ).format(schema=schema, table=table, pk=pk_cols[0],
+                 lim=PROBE_SAMPLE_LIMIT)
+        cur.execute(sql)
+        result = cur.fetchone()
+        recent_total = int(result[0])
+        recent_update_notnull = int(result[1])
+
+    create_time_earliest = None
+    create_time_latest = None
+    if pk_auto_increment and anchor['create_time']['exists']:
+        sql = (
+            'SELECT '
+            '(SELECT create_time FROM "{schema}"."{table}" '
+            ' ORDER BY "{pk}" ASC LIMIT 1), '
+            '(SELECT create_time FROM "{schema}"."{table}" '
+            ' ORDER BY "{pk}" DESC LIMIT 1)'
+        ).format(schema=schema, table=table, pk=pk_cols[0])
+        cur.execute(sql)
+        result = cur.fetchone()
+        create_time_earliest = result[0]
+        create_time_latest = result[1]
+
     del_candidates = sorted(r[1] for r in full_rows if 'del' in r[1].lower())
 
     return {
         'reltuples': reltuples,
+        'pk_cols': pk_cols,
+        'pk_auto_increment': pk_auto_increment,
         'sample_total': sample_total,
         'anchor': anchor,
+        'recent_total': recent_total,
+        'recent_update_notnull': recent_update_notnull,
+        'create_time_earliest': create_time_earliest,
+        'create_time_latest': create_time_latest,
         'del_candidates': del_candidates,
     }
 
@@ -165,25 +217,48 @@ def render_probe_md(stats):
     """渲染探查段 markdown。"""
     lines = ['### 探查', '']
     lines.append('- 行数估值(pg_class.reltuples):{:,}'.format(stats['reltuples']))
+
+    pk_cols = stats['pk_cols']
+    if not pk_cols:
+        pk_desc = '无(DataX channel 无法并行)'
+    elif len(pk_cols) > 1:
+        pk_desc = '复合 ({}) (DataX splitPk 不支持复合,退串行)'.format(
+            ', '.join('`{}`'.format(c) for c in pk_cols))
+    elif stats['pk_auto_increment']:
+        pk_desc = '`{}`(自增)'.format(pk_cols[0])
+    else:
+        pk_desc = '`{}`(非自增,DataX channel 切分分布可能不均)'.format(pk_cols[0])
+    lines.append('- 主键:' + pk_desc)
+
     lines.append('- 锚点字段:')
     total = stats['sample_total']
     for col in ANCHOR_FIELDS:
         s = stats['anchor'][col]
         if not s['exists']:
-            lines.append('  - `{}`:**缺失** ⚠'.format(col))
-        elif total > 0 and s['notnull'] is not None:
-            nn = s['notnull']
-            pct = 100.0 * nn / total
-            warn = ' ⚠' if pct < ANCHOR_NOTNULL_WARN_PCT else ''
-            lines.append('  - `{}`:存在;抽样 {} 行非空 {} ({:.1f}%){}'.format(
-                col, total, nn, pct, warn))
+            lines.append('  - `{}`:缺失'.format(col))
+            continue
+        if total > 0 and s['notnull'] is not None:
+            pct = 100.0 * s['notnull'] / total
+            base = '`{}`:存在;整体非空率 {:.1f}% ({}/{} 抽样)'.format(
+                col, pct, s['notnull'], total)
         else:
-            lines.append('  - `{}`:存在;抽样无数据'.format(col))
+            base = '`{}`:存在;抽样无数据'.format(col)
+        if col == 'create_time' and stats['create_time_earliest']:
+            base += ';按主键序范围 {} ~ {}'.format(
+                stats['create_time_earliest'], stats['create_time_latest'])
+        lines.append('  - ' + base)
+        if col == 'update_time' and stats['recent_total'] is not None:
+            rt = stats['recent_total']
+            rnn = stats['recent_update_notnull']
+            rpct = 100.0 * rnn / rt if rt else 0.0
+            lines.append('    - 近期非空率 {:.1f}% ({}/{} 最近 1000 行)'.format(
+                rpct, rnn, rt))
+
     if stats['del_candidates']:
         lines.append('- 软删字段(含 `del` 子串):' + ', '.join(
             '`{}`'.format(c) for c in stats['del_candidates']))
     else:
-        lines.append('- 软删字段(含 `del` 子串):**未命中** ⚠')
+        lines.append('- 软删字段(含 `del` 子串):未命中')
     return '\n'.join(lines) + '\n'