Преглед на файлове

refactor(dim/usr): init dt < ${dt} 落 ${pdt} 分区 + sche 改 full join 三表 COALESCE 简化

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu преди 3 дни
родител
ревизия
ad7925a
променени са 2 файла, в които са добавени 75 реда и са изтрити 104 реда
  1. 62 89
      jobs/dim/usr/dim_usr_user_ful_d.sql
  2. 13 15
      manual/backfill/20260509_dim_usr_user_ful_d_init.sql

+ 62 - 89
jobs/dim/usr/dim_usr_user_ful_d.sql

@@ -1,26 +1,18 @@
--- 作者:tianyu.chu
--- 日期:2026-05-09
--- 工单:(无)
--- 目的:dim_usr_user_ful_d 日常增量(kb/28 §1.3):
---       昨日 dim (dt=${pdt}) + 今日 ods 增量 (dt=${dt}) 按 user_id 合并去重,落 dt=${dt} 单分区
--- 状态:[草案]
--- 备注:sched=T,${dt}=T-1,${pdt}=T-2;
---       (a) 找今日变更 user_id 集合(base 或 cert 任一变)
---       (b) 这些 user_id 扫 ods dt<=${dt} 重新取最新 base + cert(兼顾 base 没变只 cert 变 / cert 没变只 base 变)
---       (c) 昨日 dim 中没变的 user_id 直接保留
---       (d) UNION ALL 写入 dim dt=${dt} 单分区;
---       cert_info update_time 几乎全空,ORDER BY 用 COALESCE(update_time, create_time);
---       前置 DS DEPENDENT:ods_usr_app_base_user_inc_d.dt=${dt} + ods_usr_app_user_cert_info_inc_d.dt=${dt} + dim_usr_user_ful_d.dt=${pdt}
+-- 作者:tianyu.chu
+-- 日期:2026-05-09
+-- 工单:(无)
+-- 目的:dim_usr_user_ful_d 日常增量(kb/28 §1.3):
+--      new = 今日 ods (base / cert) 增量;old = dim.dt=${pdt} 昨日全量;
+--      old FULL OUTER JOIN today_base FULL OUTER JOIN today_cert,字段 COALESCE(today, old) 优先取今日
+-- 状态:[草案]
+-- 备注:sched=T,${dt}=T-1,${pdt}=T-2;
+--      ods 增量同步只含当天有 update 的 user,所以今日 ods 即"今日变更行";
+--      base 字段从 today_base 取(今日没变就 NULL→fallback old);cert 字段从 today_cert 取同理;
+--      cert_info update_time 几乎全空,ORDER BY 用 COALESCE(update_time, create_time);
+--      前置 DS DEPENDENT:ods.base_user.dt=${dt} + ods.cert_info.dt=${dt} + dim.dt=${pdt}
 
 INSERT OVERWRITE TABLE dim.dim_usr_user_ful_d PARTITION (dt='${dt}')
-WITH today_changed_users AS (
-    -- 今日变更的 user_id 集合(base 或 cert 任一变)
-    SELECT DISTINCT id AS user_id FROM ods.ods_usr_app_base_user_inc_d WHERE dt = '${dt}'
-    UNION
-    SELECT DISTINCT user_id FROM ods.ods_usr_app_user_cert_info_inc_d WHERE dt = '${dt}'
-),
-today_base_latest AS (
-    -- 今日变更 user 的最新 base 版本(扫 ods <= ${dt},只取 today_changed_users 集合)
+WITH today_base AS (
     SELECT *
     FROM (
         SELECT *,
@@ -29,13 +21,11 @@ today_base_latest AS (
                 ORDER BY COALESCE(update_time, create_time) DESC
             ) AS rn
         FROM ods.ods_usr_app_base_user_inc_d
-        WHERE dt <= '${dt}'
-          AND id IN (SELECT user_id FROM today_changed_users)
+        WHERE dt = '${dt}'
     ) t
     WHERE t.rn = 1
 ),
-today_cert_latest AS (
-    -- 今日变更 user 的最新 cert 版本
+today_cert AS (
     SELECT *
     FROM (
         SELECT *,
@@ -44,71 +34,54 @@ today_cert_latest AS (
                 ORDER BY COALESCE(update_time, create_time) DESC
             ) AS rn
         FROM ods.ods_usr_app_user_cert_info_inc_d
-        WHERE dt <= '${dt}'
-          AND user_id IN (SELECT user_id FROM today_changed_users)
+        WHERE dt = '${dt}'
     ) t
     WHERE t.rn = 1
 ),
-today_dim_rebuild AS (
-    -- 今日变更 user 重 join 形成新 dim 行
-    SELECT
-        bu.id                              AS user_id,
-        bu.appid                           AS appid,
-        bu.username                        AS username,
-        bu.code                            AS code,
-        ci.cert_sex                        AS sex_cert,
-        ci.cert_birthday                   AS birthday_cert,
-        ci.cert_province                   AS cert_province,
-        ci.cert_city                       AS cert_city,
-        CASE WHEN ci.user_id IS NOT NULL THEN TRUE ELSE FALSE END AS is_cert,
-        bu.id_card                         AS id_card,
-        bu.face_verify                     AS face_verify,
-        bu.cancel_verify_num               AS cancel_verify_num,
-        bu.register_channel                AS register_channel,
-        bu.register_addr                   AS register_addr,
-        bu.register_ip_addr                AS register_ip_addr,
-        bu.create_time                     AS reg_create_time,
-        bu.login_addr                      AS login_addr,
-        bu.login_ip_addr                   AS login_ip_addr,
-        bu.level                           AS level,
-        bu.member_level                    AS member_level,
-        bu.member_name                     AS member_name,
-        bu.growth_num                      AS growth_num,
-        bu.current_month_growth            AS current_month_growth,
-        bu.member_keep_growth              AS member_keep_growth,
-        bu.member_init_flag                AS member_init_flag,
-        bu.point                           AS point,
-        bu.consume_amount                  AS consume_amount_cny,
-        bu.order_total_num                 AS order_total_num,
-        bu.status                          AS status,
-        bu.blacklist                       AS blacklist,
-        bu.refuse_pick_up                  AS refuse_pick_up,
-        bu.notify_flag                     AS notify_flag,
-        bu.open_invoice                    AS open_invoice,
-        bu.open_psd                        AS open_psd,
-        bu.daily_limit                     AS daily_limit,
-        bu.weekly_limit                    AS weekly_limit,
-        bu.monthly_limit                   AS monthly_limit,
-        bu.update_time                     AS last_update_time,
-        bu.is_deleted                      AS is_deleted,
-        CURRENT_TIMESTAMP()                AS etl_time
-    FROM today_base_latest bu
-    LEFT JOIN today_cert_latest ci ON bu.id = ci.user_id
-),
-yesterday_dim_unchanged AS (
-    -- 昨日 dim 中今日没变的 user_id 直接保留
-    SELECT
-        user_id, appid, username, code, sex_cert, birthday_cert, cert_province, cert_city, is_cert,
-        id_card, face_verify, cancel_verify_num, register_channel, register_addr, register_ip_addr,
-        reg_create_time, login_addr, login_ip_addr, level, member_level, member_name,
-        growth_num, current_month_growth, member_keep_growth, member_init_flag, point,
-        consume_amount_cny, order_total_num, status, blacklist, refuse_pick_up, notify_flag,
-        open_invoice, open_psd, daily_limit, weekly_limit, monthly_limit,
-        last_update_time, is_deleted, etl_time
-    FROM dim.dim_usr_user_ful_d
-    WHERE dt = '${pdt}'
-      AND user_id NOT IN (SELECT user_id FROM today_changed_users)
+old_dim AS (
+    SELECT * FROM dim.dim_usr_user_ful_d WHERE dt = '${pdt}'
 )
-SELECT * FROM today_dim_rebuild
-UNION ALL
-SELECT * FROM yesterday_dim_unchanged;
+SELECT
+    COALESCE(bu.id, ci.user_id, o.user_id)                    AS user_id,
+    COALESCE(bu.appid, o.appid)                               AS appid,
+    COALESCE(bu.username, o.username)                         AS username,
+    COALESCE(bu.code, o.code)                                 AS code,
+    COALESCE(ci.cert_sex, o.sex_cert)                         AS sex_cert,
+    COALESCE(ci.cert_birthday, o.birthday_cert)               AS birthday_cert,
+    COALESCE(ci.cert_province, o.cert_province)               AS cert_province,
+    COALESCE(ci.cert_city, o.cert_city)                       AS cert_city,
+    CASE WHEN ci.user_id IS NOT NULL OR o.is_cert = TRUE THEN TRUE ELSE FALSE END AS is_cert,
+    COALESCE(bu.id_card, o.id_card)                           AS id_card,
+    COALESCE(bu.face_verify, o.face_verify)                   AS face_verify,
+    COALESCE(bu.cancel_verify_num, o.cancel_verify_num)       AS cancel_verify_num,
+    COALESCE(bu.register_channel, o.register_channel)         AS register_channel,
+    COALESCE(bu.register_addr, o.register_addr)               AS register_addr,
+    COALESCE(bu.register_ip_addr, o.register_ip_addr)         AS register_ip_addr,
+    COALESCE(bu.create_time, o.reg_create_time)               AS reg_create_time,
+    COALESCE(bu.login_addr, o.login_addr)                     AS login_addr,
+    COALESCE(bu.login_ip_addr, o.login_ip_addr)               AS login_ip_addr,
+    COALESCE(bu.level, o.level)                               AS level,
+    COALESCE(bu.member_level, o.member_level)                 AS member_level,
+    COALESCE(bu.member_name, o.member_name)                   AS member_name,
+    COALESCE(bu.growth_num, o.growth_num)                     AS growth_num,
+    COALESCE(bu.current_month_growth, o.current_month_growth) AS current_month_growth,
+    COALESCE(bu.member_keep_growth, o.member_keep_growth)     AS member_keep_growth,
+    COALESCE(bu.member_init_flag, o.member_init_flag)         AS member_init_flag,
+    COALESCE(bu.point, o.point)                               AS point,
+    COALESCE(bu.consume_amount, o.consume_amount_cny)         AS consume_amount_cny,
+    COALESCE(bu.order_total_num, o.order_total_num)           AS order_total_num,
+    COALESCE(bu.status, o.status)                             AS status,
+    COALESCE(bu.blacklist, o.blacklist)                       AS blacklist,
+    COALESCE(bu.refuse_pick_up, o.refuse_pick_up)             AS refuse_pick_up,
+    COALESCE(bu.notify_flag, o.notify_flag)                   AS notify_flag,
+    COALESCE(bu.open_invoice, o.open_invoice)                 AS open_invoice,
+    COALESCE(bu.open_psd, o.open_psd)                         AS open_psd,
+    COALESCE(bu.daily_limit, o.daily_limit)                   AS daily_limit,
+    COALESCE(bu.weekly_limit, o.weekly_limit)                 AS weekly_limit,
+    COALESCE(bu.monthly_limit, o.monthly_limit)               AS monthly_limit,
+    COALESCE(bu.update_time, o.last_update_time)              AS last_update_time,
+    COALESCE(bu.is_deleted, o.is_deleted)                     AS is_deleted,
+    CURRENT_TIMESTAMP()                                       AS etl_time
+FROM old_dim o
+FULL OUTER JOIN today_base bu ON o.user_id = bu.id
+FULL OUTER JOIN today_cert ci ON COALESCE(o.user_id, bu.id) = ci.user_id;

+ 13 - 15
manual/backfill/20260509_dim_usr_user_ful_d_init.sql

@@ -1,17 +1,15 @@
--- 作者:tianyu.chu
--- 日期:2026-05-09
--- 工单:(无)
--- 目的:dim_usr_user_ful_d 初始化(kb/28 §1.3):扫 ods 全量历史分区 + ROW_NUMBER 取每 pk 最新版本,
---       base_user LEFT JOIN cert_info 落 dim dt=${dt} 单分区
--- 状态:[待执行]
--- 备注:${dt} 由 DS 填最新可用 ods dt(首日分区);cert_info update_time 几乎全空,
---       ORDER BY 用 COALESCE(update_time, create_time) 兜底;
---       跑一次后由 jobs/dim/usr/dim_usr_user_ful_d.sql 接管日常增量
+-- 作者:tianyu.chu
+-- 日期:2026-05-09
+-- 工单:(无)
+-- 目的:dim_usr_user_ful_d 初始化(kb/28 §1.3):扫 ods 历史分区 (dt < ${dt}) + ROW_NUMBER 取每 pk 最新版本,
+--      base_user LEFT JOIN cert_info 落 dim dt=${pdt} 单分区
+-- 状态:[待执行]
+-- 备注:首次上线 init 与 sche 同一天跑(sched=T),${dt}=T-1,${pdt}=T-2;
+--      init 灌 dim.dt=${pdt} 作为 sche 首次依赖的"昨日 dim";扫 ods 范围 dt < ${dt} (即 dt <= ${pdt});
+--      cert_info update_time 几乎全空,ORDER BY 用 COALESCE(update_time, create_time) 兜底;
+--      跑一次后由 jobs/dim/usr/dim_usr_user_ful_d.sql 接管日常增量
 
--- 动态分区不需要(init 单分区写入),但首次会扫 ods 多 dt:
--- ods.base_user 30 万行 × N dt + ods.cert_info 15 万行 × N dt,ROW_NUMBER 取最新即可
-
-INSERT OVERWRITE TABLE dim.dim_usr_user_ful_d PARTITION (dt='${dt}')
+INSERT OVERWRITE TABLE dim.dim_usr_user_ful_d PARTITION (dt='${pdt}')
 SELECT
     bu.id                              AS user_id,
     bu.appid                           AS appid,
@@ -62,7 +60,7 @@ FROM (
                 ORDER BY COALESCE(update_time, create_time) DESC
             ) AS rn
         FROM ods.ods_usr_app_base_user_inc_d
-        WHERE dt <= '${dt}'
+        WHERE dt < '${dt}'
     ) t
     WHERE t.rn = 1
 ) bu
@@ -75,7 +73,7 @@ LEFT JOIN (
                 ORDER BY COALESCE(update_time, create_time) DESC
             ) AS rn
         FROM ods.ods_usr_app_user_cert_info_inc_d
-        WHERE dt <= '${dt}'
+        WHERE dt < '${dt}'
     ) t
     WHERE t.rn = 1
 ) ci ON bu.id = ci.user_id;