Преглед изворни кода

refactor(bin): sync gen update_time 探查改抽样判二元启用

- 去掉 min(update_time) 全表查询(启用日期对决策意义不大,且 62M 大表
  无索引时 60s 超时)
- 改抽样判"是否启用":抽样里有任何非空 update_time 即视为业务方已启用
- 大表(reltuples >= 100k):TABLESAMPLE SYSTEM(1) LIMIT 1000,3 次 retry
  避开偶发抽 0 行
- 小表(reltuples < 100k):全表 count(毫秒级),避开 SYSTEM 抽样在小表
  page 集中时的概率失败
- 文案:抽样全 NULL 时显示"猜测未启用,需对账"

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu пре 6 дана
родитељ
комит
a9f15c6a7d
1 измењених фајлова са 39 додато и 32 уклоњено
  1. 39 32
      bin/datax-sync-template-gen.py

+ 39 - 32
bin/datax-sync-template-gen.py

@@ -48,6 +48,10 @@ WORKSPACE_DEFAULT = os.path.join(
 
 # 探查硬编码:增量同步标准锚点字段(推行后端命名标准)
 ANCHOR_FIELDS = ('create_time', 'update_time')
+# 抽样上限:TABLESAMPLE SYSTEM(1) 按存储页跳跃后再 LIMIT 截断
+PROBE_SAMPLE_LIMIT = 1000
+# 小表阈值:reltuples 低于此值直接走全表 count(毫秒级,避免 SYSTEM 抽样在小表上概率失败)
+PROBE_SMALL_TABLE_THRESHOLD = 100000
 
 
 def resolve_datasource(ds_ref):
@@ -102,14 +106,14 @@ def query_columns_full(conn, schema, table):
 
 
 def probe_table(conn, schema, table, full_rows):
-    """对表做行数 + PK + 锚点(主键序范围 / 最早非空 update_time)+ 软删命中。
+    """对表做行数 + PK + 锚点(主键序范围 / update_time 抽样非空)+ 软删命中。
 
     - 行数:pg_class.reltuples 估值
     - PK:单/复合/无 + 是否自增(attidentity + default 表达式 nextval 双判)
     - create_time 主键序范围:单列自增 PK + create_time 存在;ORDER BY pk ASC/DESC LIMIT 1
-    - update_time 最早非空:全表 min(update_time),加 statement_timeout 60s 保护
-      (非空即视为业务方已启用 update_time 维护;近期 100 行非空率不能区分
-      "业务方未维护"和"近期创建的行还没被 update 过",所以不再探查
+    - update_time 抽样:TABLESAMPLE SYSTEM(1) LIMIT 1000,非空数 > 0 即视为业务方已启用
+      (不取最早非空时间——全表 min 在大表上慢;启用日期对决策意义不大,
+      只关心"是否启用"二元值
     - 软删:full_rows 筛 'del' 子串(不区分大小写)
     """
     cur = conn.cursor()
@@ -157,28 +161,29 @@ def probe_table(conn, schema, table, full_rows):
         r = cur.fetchone()
         create_earliest, create_latest = r[0], r[1]
 
-    update_earliest = None
-    update_earliest_timeout = False
+    update_sample_total = 0
+    update_sample_notnull = 0
     if update_exists:
-        try:
-            cur.execute("SET statement_timeout = '60s'")
-            cur.execute('SELECT min(update_time) FROM "{s}"."{t}"'.format(
-                s=schema, t=table))
-            update_earliest = cur.fetchone()[0]
-        except Exception:
-            update_earliest_timeout = True
-            try:
-                conn.rollback()
-            except Exception:
-                pass
-        try:
-            cur.execute("RESET statement_timeout")
-        except Exception:
-            try:
-                conn.rollback()
-                cur.execute("RESET statement_timeout")
-            except Exception:
-                pass
+        if reltuples < PROBE_SMALL_TABLE_THRESHOLD:
+            # 小表全表 count,避开 SYSTEM 抽样在小表上的概率失败
+            cur.execute('SELECT count(*), count("update_time") FROM "{s}"."{t}"'
+                        .format(s=schema, t=table))
+            r = cur.fetchone()
+            update_sample_total = int(r[0])
+            update_sample_notnull = int(r[1])
+        else:
+            # 大表 SYSTEM(1) LIMIT 1000 抽样,3 次 retry 避开偶发 0 行
+            for _ in range(3):
+                cur.execute(
+                    'SELECT count(*), count("update_time") FROM '
+                    '(SELECT update_time FROM "{s}"."{t}" '
+                    ' TABLESAMPLE SYSTEM(1) LIMIT {n}) AS sub'
+                    .format(s=schema, t=table, n=PROBE_SAMPLE_LIMIT))
+                r = cur.fetchone()
+                update_sample_total = int(r[0])
+                update_sample_notnull = int(r[1])
+                if update_sample_total > 0:
+                    break
 
     del_candidates = sorted(r[1] for r in full_rows if 'del' in r[1].lower())
 
@@ -190,8 +195,8 @@ def probe_table(conn, schema, table, full_rows):
         'create_earliest': create_earliest,
         'create_latest': create_latest,
         'update_exists': update_exists,
-        'update_earliest': update_earliest,
-        'update_earliest_timeout': update_earliest_timeout,
+        'update_sample_total': update_sample_total,
+        'update_sample_notnull': update_sample_notnull,
         'del_candidates': del_candidates,
     }
 
@@ -225,12 +230,14 @@ def render_probe_md(stats):
     if not stats['update_exists']:
         lines.append('  - `update_time`:缺失')
     else:
-        if stats['update_earliest_timeout']:
-            tail = '最早非空(查询超时)'
-        elif stats['update_earliest'] is not None:
-            tail = '最早非空 {}'.format(stats['update_earliest'])
+        nn = stats['update_sample_notnull']
+        total = stats['update_sample_total']
+        if total == 0:
+            tail = '抽样无数据'
+        elif nn > 0:
+            tail = '抽样 {} 行 {} 行非空(已启用)'.format(total, nn)
         else:
-            tail = '最早非空(全 NULL,业务方未启用维护)'
+            tail = '抽样 {} 行全 NULL(猜测未启用,需对账)'.format(total)
         lines.append('  - `update_time`:存在;' + tail)
 
     if stats['del_candidates']: