Преглед изворни кода

refactor(bin): sync gen 探查去掉近期 N 行非空率

ORDER BY pk DESC LIMIT N 取的是"id 最大的 N 行"=最近创建的行,
update_time NULL 可能只是这些行还没被 update 过,跟"业务方是否启用
update_time 维护"无关。删除该指标,避免误导评审。

只要 min(update_time) 非 NULL 即视为业务方已启用维护。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu пре 6 дана
родитељ
комит
ff97afdd19
1 измењених фајлова са 7 додато и 26 уклоњено
  1. 7 26
      bin/datax-sync-template-gen.py

+ 7 - 26
bin/datax-sync-template-gen.py

@@ -48,8 +48,6 @@ WORKSPACE_DEFAULT = os.path.join(
 
 # 探查硬编码:增量同步标准锚点字段(推行后端命名标准)
 ANCHOR_FIELDS = ('create_time', 'update_time')
-# 近期窗口:按 PK 倒序取 N 行验证 update_time 是否近期连续维护
-PROBE_RECENT_LIMIT = 100
 
 
 def resolve_datasource(ds_ref):
@@ -104,13 +102,14 @@ def query_columns_full(conn, schema, table):
 
 
 def probe_table(conn, schema, table, full_rows):
-    """对表做行数 + PK + 锚点(主键序范围 / 最早非空 update_time / 近期连续)+ 软删命中。
+    """对表做行数 + PK + 锚点(主键序范围 / 最早非空 update_time)+ 软删命中。
 
     - 行数:pg_class.reltuples 估值
     - PK:单/复合/无 + 是否自增(attidentity + default 表达式 nextval 双判)
     - create_time 主键序范围:单列自增 PK + create_time 存在;ORDER BY pk ASC/DESC LIMIT 1
     - update_time 最早非空:全表 min(update_time),加 statement_timeout 60s 保护
-    - update_time 近期连续:单 PK 时 ORDER BY pk DESC LIMIT 100 验证近期是否连续维护
+      (非空即视为业务方已启用 update_time 维护;近期 100 行非空率不能区分
+      "业务方未维护"和"近期创建的行还没被 update 过",所以不再探查)
     - 软删:full_rows 筛 'del' 子串(不区分大小写)
     """
     cur = conn.cursor()
@@ -181,17 +180,6 @@ def probe_table(conn, schema, table, full_rows):
             except Exception:
                 pass
 
-    recent_total = None
-    recent_update_notnull = None
-    if len(pk_cols) == 1 and update_exists:
-        cur.execute(
-            'SELECT count(*), count("update_time") FROM '
-            '(SELECT update_time FROM "{s}"."{t}" ORDER BY "{p}" DESC LIMIT {n}) AS sub'
-            .format(s=schema, t=table, p=pk_cols[0], n=PROBE_RECENT_LIMIT))
-        r = cur.fetchone()
-        recent_total = int(r[0])
-        recent_update_notnull = int(r[1])
-
     del_candidates = sorted(r[1] for r in full_rows if 'del' in r[1].lower())
 
     return {
@@ -204,8 +192,6 @@ def probe_table(conn, schema, table, full_rows):
         'update_exists': update_exists,
         'update_earliest': update_earliest,
         'update_earliest_timeout': update_earliest_timeout,
-        'recent_total': recent_total,
-        'recent_update_notnull': recent_update_notnull,
         'del_candidates': del_candidates,
     }
 
@@ -239,18 +225,13 @@ def render_probe_md(stats):
     if not stats['update_exists']:
         lines.append('  - `update_time`:缺失')
     else:
-        parts = ['`update_time`:存在']
         if stats['update_earliest_timeout']:
-            parts.append('最早非空(查询超时)')
+            tail = '最早非空(查询超时)'
         elif stats['update_earliest'] is not None:
-            parts.append('最早非空 {}'.format(stats['update_earliest']))
+            tail = '最早非空 {}'.format(stats['update_earliest'])
         else:
-            parts.append('最早非空(全 NULL)')
-        if stats['recent_total'] is not None and stats['recent_total'] > 0:
-            rpct = 100.0 * stats['recent_update_notnull'] / stats['recent_total']
-            parts.append('近期 {} 行非空率 {:.1f}%'.format(
-                stats['recent_total'], rpct))
-        lines.append('  - ' + ';'.join(parts))
+            tail = '最早非空(全 NULL,业务方未启用维护)'
+        lines.append('  - `update_time`:存在;' + tail)
 
     if stats['del_candidates']:
         lines.append('- 软删字段(含 `del` 子串):' + ', '.join(