Jelajahi Sumber

refactor(jobs/dwd): 日调度+补数 SQL 合一为参数化 dwd_trd_order_pay_apd_d.sql

原日调度(N=2)与 _backfill.sql 逻辑 100% 重合,仅 start_date 来源不同,
合并为一份参数化 SQL(宽扫窄落+status外层),删 _backfill.sql。两个 DS
workflow 传不同参数区分职能:日调度 start_date=$[yyyyMMdd-30] 挂 schedule,
补数 start_date=手动 不挂。kb/30 §4.2 + ADR-11 同步更新。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu 1 Minggu lalu
induk
melakukan
2c017616f2

+ 21 - 15
jobs/dwd/trd/dwd_trd_order_pay_apd_d.sql

@@ -1,18 +1,23 @@
 -- 作者:tianyu.chu
--- 日期:2026-05-10
+-- 日期:2026-05-10(2026-06-03 日调度+补数合一为统一参数化)
 -- 工单:(无)
--- 目的:dwd_trd_order_pay_apd_d 日常增量(kb/27 §1.4 + §2):
---      回算近 2 日(kb/20 §7.3 通用兜底,业界主流 N=2):扫 ods.dt IN (${dt}, ${pdt}) +
---      过滤业务时间 DATE(payment_success_time) IN (${dt}, ${pdt}) + 状态码筛选支付成功 +
---      ROW_NUMBER 取每 order_id 最新版本 +
---      LEFT JOIN dim_trd_card_group_ful_d.dt=${dt} 维度退化 +
---      11 字段金额 mer_act% 派生(kb/27 §2.5)+ 动态分区写入 dwd dt IN (${dt}, ${pdt})
+-- 目的:dwd_trd_order_pay_apd_d 统一计算 SQL(kb/93 ADR-09/ADR-11 宽扫窄落):
+--      日调度 + 手动补数共用本文件,传参区分职能(两个独立 DS workflow):
+--        日调度(滚动 N=30): -p start_date=$[yyyyMMdd-30] -p stop_date=${cdt}(挂 schedule 自动跑)
+--        手动补数:           -p start_date=<起始> -p stop_date=${cdt}(不挂 schedule,运维兜底)
+--      ods.dt 宽扫 [${start_date}-1, 不限](捞回 update_time 漂移到后续 ods 分区的版本);
+--      payment 业务时间窄落 [${start_date}, ${stop_date}) 左闭右开;
+--      status 在 ROW_NUMBER 外层判最新版本(避免捞回已退款单的退款前版本,见 ADR-09/12);
+--      LEFT JOIN dim_trd.dt=${stop_date}-1(最新全量快照退化 category 等)
 -- 状态:[草案]
--- 备注:sched=T,${dt}=T-1,${pdt}=T-2;
---      回算 N=2 兜底跨零点漂移:业务时间 T-1 但 update_time 漂到 T 的事件,在 T+1 跑批时通过扫 ods.dt=T 兜回;
---      INSERT OVERWRITE 动态分区(kb/26 §8 项目默认 DYNAMIC mode):只覆盖 SELECT 出现的 dt 分区,不动其他历史分区;
---      dwd_pay 是数仓支付明细,不限定 order_type(所有订单类型都进);
---      前置 DS DEPENDENT:ods.${dt} + dim.${dt}(${pdt} 历史分区已就绪)
+-- 备注:${stop_date}=${cdt}(today,左闭右开覆盖到 today-1,当天不落交次日);
+--      回算窗 N=30 依据退款窗实测(P95=15/MAX=20,ADR-09);跨多 dt 动态分区 SET 上限 2000;
+--      前置:ods 补到 today-1 + dim_trd 补到 today-1(否则 category 全 NULL);
+--      dwd 历史可变可接受(ods 真源,dwd 可从 ods 回放重建,ADR-09);
+--      上界不限 = 扫到 ods 最新分区,payment ∈ [${start_date}, ${stop_date}) 的订单 ods 版本落点必 ≤ 最新,全部捞到
+
+set hive.exec.max.dynamic.partitions=2000;
+set hive.exec.max.dynamic.partitions.pernode=200;
 
 INSERT OVERWRITE TABLE dwd.dwd_trd_order_pay_apd_d PARTITION (dt)
 SELECT
@@ -80,13 +85,14 @@ FROM (
                 ORDER BY COALESCE(update_time, create_time) DESC
             ) AS rn
         FROM ods.ods_trd_card_group_order_info_inc_d
-        WHERE dt IN ('${dt}', '${pdt}')
+        WHERE dt >= DATE_FORMAT(DATE_SUB(FROM_UNIXTIME(UNIX_TIMESTAMP('${start_date}', 'yyyyMMdd')), 1), 'yyyyMMdd')
           AND payment_success_time IS NOT NULL
-          AND DATE_FORMAT(payment_success_time, 'yyyyMMdd') IN ('${dt}', '${pdt}')
+          AND DATE_FORMAT(payment_success_time, 'yyyyMMdd') >= '${start_date}'
+          AND DATE_FORMAT(payment_success_time, 'yyyyMMdd') <  '${stop_date}'
     ) t
     WHERE t.rn = 1
       AND t.status IN (101, 103, 104, 105, 106, 301, 302)
 ) o
 LEFT JOIN dim.dim_trd_card_group_ful_d cg
     ON o.group_info_id = cg.group_info_id
-   AND cg.dt = '${dt}';
+   AND cg.dt = DATE_FORMAT(DATE_SUB(FROM_UNIXTIME(UNIX_TIMESTAMP('${stop_date}', 'yyyyMMdd')), 1), 'yyyyMMdd');

+ 0 - 94
jobs/dwd/trd/dwd_trd_order_pay_apd_d_backfill.sql

@@ -1,94 +0,0 @@
--- 作者:tianyu.chu
--- 日期:2026-06-02
--- 工单:(无)
--- 目的:dwd_trd_order_pay_apd_d 补数(backfill,kb/93 ADR-11 宽扫窄落):
---      调度中断后回刷历史区间。ods.dt 宽扫 [${start_date}-1, 不限](捞回 update_time 漂移到后续 ods 分区的版本);
---      payment 业务时间窄落 [${start_date}, ${stop_date}) 左闭右开(对齐 DataX 范围惯例);
---      status 口径 / 字段派生 / dim 退化 join 与日调度 dwd_trd_order_pay_apd_d.sql 完全一致
--- 状态:[草案]
--- 备注:传 ${start_date}(如 20260518) + ${stop_date}=${cdt}(DS globalParam=today,左闭右开覆盖到 today-1);
---      前置:ods 已补到 today-1 + dim_trd 补到 today-1(LEFT JOIN cg.dt=${stop_date}-1 否则 category 全 NULL);
---      跨多 dt 动态分区,SET 上限 2000;与日调度分离(ADR-11),不挂 schedule,手动传 start_date;
---      上界不限 = 扫到 ods 最新分区,payment ∈ [${start_date}, ${stop_date}) 的订单 ods 版本落点必 ≤ 最新,全部捞到;
---      rawScript: spark-sql-starter.py -f <本文件> -p start_date=${start_date} -p stop_date=${cdt}
-
-set hive.exec.max.dynamic.partitions=2000;
-set hive.exec.max.dynamic.partitions.pernode=200;
-
-INSERT OVERWRITE TABLE dwd.dwd_trd_order_pay_apd_d PARTITION (dt)
-SELECT
-    o.id                                                                                AS order_id,
-    o.order_no                                                                          AS order_no,
-    o.combination_no                                                                    AS combination_no,
-    o.give_order_id                                                                     AS give_order_id,
-    o.user_id                                                                           AS user_id,
-    o.merchant_id                                                                       AS merchant_id,
-    o.group_info_id                                                                     AS group_info_id,
-    cg.name                                                                             AS group_name,
-    cg.list_id                                                                          AS list_id,
-    cg.panini_list_id                                                                   AS panini_list_id,
-    cg.category                                                                         AS category,
-    cg.manufacturer                                                                     AS manufacturer,
-    cg.sets                                                                             AS sets,
-    cg.year                                                                             AS year,
-    o.shipping_address_id                                                               AS shipping_address_id,
-    o.purchase_count                                                                    AS purchase_cnt,
-    o.give_num                                                                          AS give_cnt,
-    CASE WHEN o.point_type LIKE 'mer_act%' THEN ROUND(o.accounts_payable / 100.00, 2)   ELSE o.accounts_payable    END AS payable_amt_cny,
-    CASE WHEN o.point_type LIKE 'mer_act%' THEN ROUND(o.point / 100.00, 2)              ELSE o.actual_payment      END AS pay_amt_cny,
-    CASE WHEN o.point_type LIKE 'mer_act%' THEN ROUND(o.trade_amount / 100.00, 2)       ELSE o.trade_amount        END AS trade_amt_cny,
-    CASE WHEN o.point_type LIKE 'mer_act%' THEN ROUND(o.settlement_amount / 100.00, 2)  ELSE o.settlement_amount   END AS settle_amt_cny,
-    CASE WHEN o.point_type LIKE 'mer_act%' THEN ROUND(o.card_price / 100.00, 2)         ELSE o.card_price          END AS card_price_cny,
-    CASE WHEN o.point_type LIKE 'mer_act%' THEN ROUND(o.act_price / 100.00, 2)          ELSE o.act_price           END AS act_price_cny,
-    CASE WHEN o.point_type LIKE 'mer_act%' THEN ROUND(o.discount / 100.00, 2)           ELSE o.discount            END AS merchant_discount_amt_cny,
-    CASE WHEN o.point_type LIKE 'mer_act%' THEN ROUND(o.platform_discount / 100.00, 2)  ELSE o.platform_discount   END AS platform_discount_amt_cny,
-    CASE WHEN o.point_type LIKE 'mer_act%' THEN ROUND(o.member_discount / 100.00, 2)    ELSE o.member_discount     END AS member_discount_amt_cny,
-    CASE WHEN o.point_type LIKE 'mer_act%' THEN ROUND(o.act_discount / 100.00, 2)       ELSE o.act_discount        END AS act_discount_amt_cny,
-    CASE WHEN o.point_type LIKE 'mer_act%' THEN ROUND(o.point_deduct / 100.00, 2)       ELSE o.point_deduct        END AS point_deduct_amt_cny,
-    o.shipping_cost                                                                     AS shipping_amt_cny,
-    o.shipping_free_amount                                                              AS shipping_free_amt_cny,
-    CASE WHEN o.point_type LIKE 'mer_act%' THEN ROUND(o.discount_amount / 100.00, 2)    ELSE o.discount_amount     END AS discount_amount_amt_cny,
-    o.point                                                                             AS point,
-    o.discount_point                                                                    AS discount_point,
-    o.coupon                                                                            AS coupon,
-    o.platform_coupon                                                                   AS platform_coupon,
-    o.shipping_free_id                                                                  AS shipping_free_id,
-    o.payment_type                                                                      AS payment_type,
-    o.payment_sub_type                                                                  AS payment_sub_type,
-    o.payment_status                                                                    AS payment_status,
-    o.payment_status_desc                                                               AS payment_status_desc,
-    o.payment_time                                                                      AS payment_time,
-    o.payment_success_time                                                              AS payment_success_time,
-    o.pay_record                                                                        AS pay_record,
-    o.order_type                                                                        AS order_type,
-    o.order_sub_type                                                                    AS order_sub_type,
-    o.give_user_code                                                                    AS give_user_code,
-    o.anonymous                                                                         AS anonymous,
-    o.pick_up_type                                                                      AS pick_up_type,
-    o.point_type                                                                        AS point_type,
-    o.open_self                                                                         AS open_self,
-    o.create_time                                                                       AS order_create_time,
-    o.expire_time                                                                       AS expire_time,
-    o.is_deleted                                                                        AS is_deleted,
-    CURRENT_TIMESTAMP()                                                                 AS etl_time,
-    DATE_FORMAT(o.payment_success_time, 'yyyyMMdd')                                     AS dt
-FROM (
-    SELECT *
-    FROM (
-        SELECT *,
-            ROW_NUMBER() OVER (
-                PARTITION BY id
-                ORDER BY COALESCE(update_time, create_time) DESC
-            ) AS rn
-        FROM ods.ods_trd_card_group_order_info_inc_d
-        WHERE dt >= DATE_FORMAT(DATE_SUB(FROM_UNIXTIME(UNIX_TIMESTAMP('${start_date}', 'yyyyMMdd')), 1), 'yyyyMMdd')
-          AND payment_success_time IS NOT NULL
-          AND DATE_FORMAT(payment_success_time, 'yyyyMMdd') >= '${start_date}'
-          AND DATE_FORMAT(payment_success_time, 'yyyyMMdd') <  '${stop_date}'
-    ) t
-    WHERE t.rn = 1
-      AND t.status IN (101, 103, 104, 105, 106, 301, 302)
-) o
-LEFT JOIN dim.dim_trd_card_group_ful_d cg
-    ON o.group_info_id = cg.group_info_id
-   AND cg.dt = DATE_FORMAT(DATE_SUB(FROM_UNIXTIME(UNIX_TIMESTAMP('${stop_date}', 'yyyyMMdd')), 1), 'yyyyMMdd');

+ 1 - 1
kb/30-开发规范.md

@@ -358,7 +358,7 @@ manual/ddl/
 
 - **简单表** — `jobs/{layer}/{domain}/{表名}.sql` 一个文件顶到底(单次 `INSERT OVERWRITE`,可带 `WITH` CTE)
 - **多步表** — `jobs/{layer}/{domain}/{表名}/{表名}-{NN}-{描述}.sql`,序号三位,`99` 固定留给最终 `INSERT OVERWRITE` 目标表那一步。DS 工作流对应 N 个 task 节点按序号链式依赖
-- **补数 SQL** — `jobs/{layer}/{domain}/{表名}_backfill.sql`,与日调度同目录、`_backfill` 后缀区分;仅业务时间分区的 dwd 事件表需要(raw_ods / dim 走 DS COMPLEMENT_DATA)。设计见 `93-架构决策.md` ADR-11
+- **日调度 + 补数共用参数化** — 业务时间分区的 dwd 事件表,日调度(滚动重算)与手动补数**共用同一 `{表名}.sql`**(参数化 `start_date`/`stop_date`,宽扫窄落),靠**两个 DS workflow 传不同参数**区分职能:日调度 `-p start_date=$[yyyyMMdd-N]` 挂 schedule,补数 `-p start_date=<起始>` 不挂 schedule。不另建 `_backfill.sql`(避免逻辑两份漂移)。raw_ods / dim 走 DS COMPLEMENT_DATA 无需此模式。设计见 `93-架构决策.md` ADR-09/ADR-11
 
 所有 `.sql` 只写 `INSERT OVERWRITE` / `INSERT INTO`,**不写 CREATE TABLE**(表由 `manual/ddl/` 保证已存在)。
 

+ 3 - 3
kb/93-架构决策.md

@@ -468,10 +468,10 @@
 
 - **背景**:规划 dwd 补数(backfill,调度中断后回刷历史区间)能力。
 
-- **决策**:dwd 补数走独立任务流,与日调度(ADR-09)分离。核心是 **宽扫窄落** —— ods.dt 宽扫(捞回漂移到后续分区的版本)、payment 业务时间窄落(锁定覆盖哪些 dwd 分区):
+- **决策**:dwd 补数与日调度(ADR-09)**职能分离为两个独立 DS workflow,但共用同一参数化 SQL**(2026-06-03 合一:原独立 `_backfill.sql` 与日调度逻辑 100% 重合,仅 start_date 来源不同,合并到 `jobs/dwd/trd/dwd_trd_order_pay_apd_d.sql` 避免两份漂移)。核心是 **宽扫窄落** —— ods.dt 宽扫(捞回漂移到后续分区的版本)、payment 业务时间窄落(锁定覆盖哪些 dwd 分区):
 
-  - 独立 SQL `jobs/dwd/trd/dwd_trd_order_pay_apd_d_backfill.sql`
-  - 独立 DS workflow,手动触发,不挂 schedule,不加 DEPENDENT
+  - 共用 SQL `jobs/dwd/trd/dwd_trd_order_pay_apd_d.sql`(参数化 start_date/stop_date)
+  - 补数 workflow:手动触发,不挂 schedule,不加 DEPENDENT(日调度 workflow 见 ADR-09,挂 schedule 传 start_date=$[yyyyMMdd-30])
   - 参数(对齐 DataX `start_date`/`stop_date` 左闭右开范围惯例,下划线):
     - `${start_date}`:workflow 启动参数(补数起始日)
     - `${stop_date}`:取 `${cdt}`(DS globalParam=today),左闭右开 `< stop_date` 即覆盖到 today-1(当天不落交日调度)