Преглед изворни кода

refactor(bin): sync gen 探查精简(去抽样非空率 + 加全表 min(update_time))

- 去掉 TABLESAMPLE 抽样整体非空率(含早期 NULL 混合信号,决策无用)
- 加 min(update_time) 全表查询(statement_timeout 60s 保护)
  → 直接给"业务方何时开始维护"切换日,作 backfill 边界
- 近期窗口缩为 100 行(小表更不易"等于全表",速度同样毫秒级)
- 最终探查段每表 5-6 行,含:行数 / 主键 / create_time 主键序范围 /
  update_time 最早非空 + 近期非空率 / 软删字段

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu пре 6 дана
родитељ
комит
f9515df5e8
1 измењених фајлова са 78 додато и 80 уклоњено
  1. 78 80
      bin/datax-sync-template-gen.py

+ 78 - 80
bin/datax-sync-template-gen.py

@@ -48,8 +48,8 @@ WORKSPACE_DEFAULT = os.path.join(
 
 # 探查硬编码:增量同步标准锚点字段(推行后端命名标准)
 ANCHOR_FIELDS = ('create_time', 'update_time')
-# 抽样上限:TABLESAMPLE SYSTEM(1) 按存储页跳跃后再 LIMIT 截断
-PROBE_SAMPLE_LIMIT = 1000
+# 近期窗口:按 PK 倒序取 N 行验证 update_time 是否近期连续维护
+PROBE_RECENT_LIMIT = 100
 
 
 def resolve_datasource(ds_ref):
@@ -104,16 +104,14 @@ def query_columns_full(conn, schema, table):
 
 
 def probe_table(conn, schema, table, full_rows):
-    """对表做行数估值 + PK + 锚点抽样 + 近期 update_time + 主键序时间范围 + 软删命中。
+    """对表做行数 + PK + 锚点(主键序范围 / 最早非空 update_time / 近期连续)+ 软删命中。
 
     - 行数:pg_class.reltuples 估值
-    - PK:单/复合/无 + 是否自增(pg_get_serial_sequence
-    - 锚点:create_time / update_time 存在性 + 抽样非空率(TABLESAMPLE SYSTEM(1) LIMIT 1000)
-    - 近期 update_time 非空率:仅当单 PK + update_time 存在;ORDER BY pk DESC LIMIT 1000
-    - create_time 主键序范围:仅当单列自增 PK + create_time 存在;ORDER BY pk ASC/DESC LIMIT 1
+    - PK:单/复合/无 + 是否自增(attidentity + default 表达式 nextval 双判
+    - create_time 主键序范围:单列自增 PK + create_time 存在;ORDER BY pk ASC/DESC LIMIT 1
+    - update_time 最早非空:全表 min(update_time),加 statement_timeout 60s 保护
+    - update_time 近期连续:单 PK 时 ORDER BY pk DESC LIMIT 100 验证近期是否连续维护
     - 软删:full_rows 筛 'del' 子串(不区分大小写)
-
-    返回 dict 见 render_probe_md 引用字段。
     """
     cur = conn.cursor()
 
@@ -129,9 +127,6 @@ def probe_table(conn, schema, table, full_rows):
     pk_cols = [r[1] for r in full_rows if r[4] == 'PK']
     pk_auto_increment = False
     if len(pk_cols) == 1:
-        # pg_get_serial_sequence 只识别 OWNED BY 关联的 sequence——
-        # 业务库手工建的 sequence 没 OWNED BY 标记会漏判,所以同时查 attidentity
-        # (PG 10+ IDENTITY 列)和 default 表达式(含 nextval 即视为自增)。
         cur.execute("""
             SELECT a.attidentity, pg_get_expr(ad.adbin, ad.adrelid)
             FROM pg_attribute a
@@ -149,53 +144,53 @@ def probe_table(conn, schema, table, full_rows):
             )
 
     field_names = {r[1] for r in full_rows}
-    anchor = {col: {'exists': col in field_names, 'notnull': None}
-              for col in ANCHOR_FIELDS}
-
-    sample_total = 0
-    present = [c for c in ANCHOR_FIELDS if anchor[c]['exists']]
-    if present:
-        notnull_select = ', '.join('count("{}")'.format(c) for c in present)
-        sql = (
-            'SELECT count(*), {nn} FROM '
-            '(SELECT * FROM "{schema}"."{table}" '
-            ' TABLESAMPLE SYSTEM(1) LIMIT {lim}) AS sub'
-        ).format(nn=notnull_select, schema=schema, table=table,
-                 lim=PROBE_SAMPLE_LIMIT)
-        cur.execute(sql)
-        result = cur.fetchone()
-        sample_total = int(result[0])
-        for i, c in enumerate(present):
-            anchor[c]['notnull'] = int(result[i + 1])
+    create_exists = 'create_time' in field_names
+    update_exists = 'update_time' in field_names
+
+    create_earliest = None
+    create_latest = None
+    if pk_auto_increment and create_exists:
+        cur.execute(
+            'SELECT '
+            '(SELECT create_time FROM "{s}"."{t}" ORDER BY "{p}" ASC LIMIT 1), '
+            '(SELECT create_time FROM "{s}"."{t}" ORDER BY "{p}" DESC LIMIT 1)'
+            .format(s=schema, t=table, p=pk_cols[0]))
+        r = cur.fetchone()
+        create_earliest, create_latest = r[0], r[1]
+
+    update_earliest = None
+    update_earliest_timeout = False
+    if update_exists:
+        try:
+            cur.execute("SET statement_timeout = '60s'")
+            cur.execute('SELECT min(update_time) FROM "{s}"."{t}"'.format(
+                s=schema, t=table))
+            update_earliest = cur.fetchone()[0]
+        except Exception:
+            update_earliest_timeout = True
+            try:
+                conn.rollback()
+            except Exception:
+                pass
+        try:
+            cur.execute("RESET statement_timeout")
+        except Exception:
+            try:
+                conn.rollback()
+                cur.execute("RESET statement_timeout")
+            except Exception:
+                pass
 
     recent_total = None
     recent_update_notnull = None
-    if len(pk_cols) == 1 and anchor['update_time']['exists']:
-        sql = (
+    if len(pk_cols) == 1 and update_exists:
+        cur.execute(
             'SELECT count(*), count("update_time") FROM '
-            '(SELECT update_time FROM "{schema}"."{table}" '
-            ' ORDER BY "{pk}" DESC LIMIT {lim}) AS sub'
-        ).format(schema=schema, table=table, pk=pk_cols[0],
-                 lim=PROBE_SAMPLE_LIMIT)
-        cur.execute(sql)
-        result = cur.fetchone()
-        recent_total = int(result[0])
-        recent_update_notnull = int(result[1])
-
-    create_time_earliest = None
-    create_time_latest = None
-    if pk_auto_increment and anchor['create_time']['exists']:
-        sql = (
-            'SELECT '
-            '(SELECT create_time FROM "{schema}"."{table}" '
-            ' ORDER BY "{pk}" ASC LIMIT 1), '
-            '(SELECT create_time FROM "{schema}"."{table}" '
-            ' ORDER BY "{pk}" DESC LIMIT 1)'
-        ).format(schema=schema, table=table, pk=pk_cols[0])
-        cur.execute(sql)
-        result = cur.fetchone()
-        create_time_earliest = result[0]
-        create_time_latest = result[1]
+            '(SELECT update_time FROM "{s}"."{t}" ORDER BY "{p}" DESC LIMIT {n}) AS sub'
+            .format(s=schema, t=table, p=pk_cols[0], n=PROBE_RECENT_LIMIT))
+        r = cur.fetchone()
+        recent_total = int(r[0])
+        recent_update_notnull = int(r[1])
 
     del_candidates = sorted(r[1] for r in full_rows if 'del' in r[1].lower())
 
@@ -203,12 +198,14 @@ def probe_table(conn, schema, table, full_rows):
         'reltuples': reltuples,
         'pk_cols': pk_cols,
         'pk_auto_increment': pk_auto_increment,
-        'sample_total': sample_total,
-        'anchor': anchor,
+        'create_exists': create_exists,
+        'create_earliest': create_earliest,
+        'create_latest': create_latest,
+        'update_exists': update_exists,
+        'update_earliest': update_earliest,
+        'update_earliest_timeout': update_earliest_timeout,
         'recent_total': recent_total,
         'recent_update_notnull': recent_update_notnull,
-        'create_time_earliest': create_time_earliest,
-        'create_time_latest': create_time_latest,
         'del_candidates': del_candidates,
     }
 
@@ -231,28 +228,29 @@ def render_probe_md(stats):
     lines.append('- 主键:' + pk_desc)
 
     lines.append('- 锚点字段:')
-    total = stats['sample_total']
-    for col in ANCHOR_FIELDS:
-        s = stats['anchor'][col]
-        if not s['exists']:
-            lines.append('  - `{}`:缺失'.format(col))
-            continue
-        if total > 0 and s['notnull'] is not None:
-            pct = 100.0 * s['notnull'] / total
-            base = '`{}`:存在;整体非空率 {:.1f}% ({}/{} 抽样)'.format(
-                col, pct, s['notnull'], total)
+    if not stats['create_exists']:
+        lines.append('  - `create_time`:缺失')
+    elif stats['create_earliest']:
+        lines.append('  - `create_time`:存在;按主键序范围 {} ~ {}'.format(
+            stats['create_earliest'], stats['create_latest']))
+    else:
+        lines.append('  - `create_time`:存在')
+
+    if not stats['update_exists']:
+        lines.append('  - `update_time`:缺失')
+    else:
+        parts = ['`update_time`:存在']
+        if stats['update_earliest_timeout']:
+            parts.append('最早非空(查询超时)')
+        elif stats['update_earliest'] is not None:
+            parts.append('最早非空 {}'.format(stats['update_earliest']))
         else:
-            base = '`{}`:存在;抽样无数据'.format(col)
-        if col == 'create_time' and stats['create_time_earliest']:
-            base += ';按主键序范围 {} ~ {}'.format(
-                stats['create_time_earliest'], stats['create_time_latest'])
-        lines.append('  - ' + base)
-        if col == 'update_time' and stats['recent_total'] is not None:
-            rt = stats['recent_total']
-            rnn = stats['recent_update_notnull']
-            rpct = 100.0 * rnn / rt if rt else 0.0
-            lines.append('    - 近期非空率 {:.1f}% ({}/{} 最近 1000 行)'.format(
-                rpct, rnn, rt))
+            parts.append('最早非空(全 NULL)')
+        if stats['recent_total'] is not None and stats['recent_total'] > 0:
+            rpct = 100.0 * stats['recent_update_notnull'] / stats['recent_total']
+            parts.append('近期 {} 行非空率 {:.1f}%'.format(
+                stats['recent_total'], rpct))
+        lines.append('  - ' + ';'.join(parts))
 
     if stats['del_candidates']:
         lines.append('- 软删字段(含 `del` 子串):' + ', '.join(