浏览代码

fix(dwd/trd): dwd_pay 删 order_type 过滤 + sche 改回算近 2 日(兜底跨零点漂移)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu 2 天之前
父节点
当前提交
a9b6eaa

+ 14 - 11
jobs/dwd/trd/dwd_trd_order_pay_apd_d.sql

@@ -2,17 +2,19 @@
 -- 日期:2026-05-10
 -- 工单:(无)
 -- 目的:dwd_trd_order_pay_apd_d 日常增量(kb/27 §1.4 + §2):
---      扫 ods.dt=${dt} + 过滤业务时间 DATE_FORMAT(payment_success_time)=${dt} + status / order_type 支付成功 +
+--      回算近 2 日(kb/20 §7.3 通用兜底,业界主流 N=2):扫 ods.dt IN (${dt}, ${pdt}) +
+--      过滤业务时间 DATE(payment_success_time) IN (${dt}, ${pdt}) + 状态码筛选支付成功 +
 --      ROW_NUMBER 取每 order_id 最新版本 +
 --      LEFT JOIN dim_trd_card_group_ful_d.dt=${dt} 维度退化 +
---      11 字段金额 mer_act% 派生(kb/27 §2.5)+ 写入 dwd dt=${dt} 单分区
+--      11 字段金额 mer_act% 派生(kb/27 §2.5)+ 动态分区写入 dwd dt IN (${dt}, ${pdt})
 -- 状态:[草案]
--- 备注:sched=T,${dt}=T-1;
---      _apd_d 单分区不回算(ODS 漂移已在 ODS 层归位,kb/27 §1.4);
---      过滤 DATE(payment_success_time)=${dt} 取业务时间是 ${dt} 的事件(排除"今天 update 的旧订单"误归);
---      前置 DS DEPENDENT:ods_trd_card_group_order_info_inc_d.dt=${dt} + dim_trd_card_group_ful_d.dt=${dt}
+-- 备注:sched=T,${dt}=T-1,${pdt}=T-2;
+--      回算 N=2 兜底跨零点漂移:业务时间 T-1 但 update_time 漂到 T 的事件,在 T+1 跑批时通过扫 ods.dt=T 兜回;
+--      INSERT OVERWRITE 动态分区(kb/26 §8 项目默认 DYNAMIC mode):只覆盖 SELECT 出现的 dt 分区,不动其他历史分区;
+--      dwd_pay 是数仓支付明细,不限定 order_type(所有订单类型都进);
+--      前置 DS DEPENDENT:ods.${dt} + dim.${dt}(${pdt} 历史分区已就绪)
 
-INSERT OVERWRITE TABLE dwd.dwd_trd_order_pay_apd_d PARTITION (dt='${dt}')
+INSERT OVERWRITE TABLE dwd.dwd_trd_order_pay_apd_d PARTITION (dt)
 SELECT
     o.id                                                                                AS order_id,
     o.order_no                                                                          AS order_no,
@@ -67,7 +69,8 @@ SELECT
     o.create_time                                                                       AS order_create_time,
     o.expire_time                                                                       AS expire_time,
     o.is_deleted                                                                        AS is_deleted,
-    CURRENT_TIMESTAMP()                                                                 AS etl_time
+    CURRENT_TIMESTAMP()                                                                 AS etl_time,
+    DATE_FORMAT(o.payment_success_time, 'yyyyMMdd')                                     AS dt
 FROM (
     SELECT *
     FROM (
@@ -77,10 +80,10 @@ FROM (
                 ORDER BY COALESCE(update_time, create_time) DESC
             ) AS rn
         FROM ods.ods_trd_card_group_order_info_inc_d
-        WHERE dt = '${dt}'
-          AND order_type = 'group'
+        WHERE dt IN ('${dt}', '${pdt}')
           AND status IN (101, 103, 104, 105, 106, 301, 302)
-          AND DATE_FORMAT(payment_success_time, 'yyyyMMdd') = '${dt}'
+          AND payment_success_time IS NOT NULL
+          AND DATE_FORMAT(payment_success_time, 'yyyyMMdd') IN ('${dt}', '${pdt}')
     ) t
     WHERE t.rn = 1
 ) o

+ 3 - 2
kb/27-dwd建模.md

@@ -21,8 +21,9 @@ DWD 直引 DIM 层已清洗字段,**不在 DWD 二次清洗、不做空值兜
 ### 1.4 分区与写入
 
 - 分区锚点:业务时间(事件发生日,如 `payment_success_time`),不是抽取日
-- 写入策略:默认单分区 `dt=T-1`,不回算(ODS 漂移已在 ODS 层归位,DWD 不二次兜底)
-- 重跑幂等:`INSERT OVERWRITE` 单分区
+- 写入策略:**回算近 2 日**(`dt IN (${dt}, ${pdt})`),兜底跨零点 ODS 漂移(业务时间 T-1 但 update_time 漂到 T 的事件,第二天跑批时通过扫 ods.dt=T 兜回);动态分区写入(kb/26 §8 项目默认 DYNAMIC mode 只覆盖 SELECT 出现的 dt,不动其他历史分区)
+- 回算窗口 N:本项目按业界主流 N=2(阿里 OneData / 字节 / 美团默认);金融 / 强稳定性场景可调到 N=3 或更大(kb/20 §7.3 通用兜底默认)
+- 重跑幂等:`INSERT OVERWRITE PARTITION (dt)` 动态分区
 - 调度依赖:DS DEPENDENT 同 dt ODS + 同 dt DIM ful_d 双前置(按 Kimball 维度退化原则,DWD 必依赖 DIM 同分区跑完)
 
 ---

+ 1 - 0
kb/92-重构进度.md

@@ -182,6 +182,7 @@
 | 2026-05-09 | **kb/27 §2.5 + §2.6 大改:dwd_pay 金额派生通式 + 业界全名命名 + 删 8 个状态/异步字段**:(a) §2.5 改"金额字段 mer_act% 修正规则":通式 `CASE WHEN point_type LIKE 'mer_act%' THEN ROUND(/100.00, 2) ELSE 直取 END` + 11 字段派生映射(accounts_payable→payable_amt_cny GMV / actual_payment+point→pay_amt_cny Net Revenue 特例 / trade_amount / settlement_amount / card_price / act_price / discount / platform_discount / act_discount / member_discount / point_deduct)+ Net Revenue 特例(mer_act% 时换字段用 point/100,非同字段除 100)+ 2 字段直取(shipping_cost / shipping_free_amount)+ 1 字段待业务确认(discount_amount 与 discount 区别)。(b) §2.6 字段表按业界全名 + `_amt_cny` 后缀重命名(pay_amt_cny / payable_amt_cny / settle_amt_cny / discount_amt_cny / platform_discount_amt_cny / member_discount_amt_cny / act_discount_amt_cny / point_deduct_amt_cny / shipping_amt_cny / shipping_free_amt_cny / trade_amt_cny),数量改 cnt 词根(purchase_cnt / give_cnt);按 Kimball / OneData "DWD 事件表只装事件本身的事实快照"原则删 8 字段(status / serve_status / update_time / refuse_status / waring_type / waring_status / goods_allocate / invoice_id),归 dim_trd_order_zip_d 拉链或后续业务事件表;保留 open_self(业务侧待确认归处,本期暂留);settlement_amount 保留并派生为 settle_amt_cny(_apd_d 不被退款改写)。 | — |
 | 2026-05-09 | **kb/27 discount→merchant_discount + discount_amount 加派生 + DWD 调度双前置**:(a) §2.5 派生映射 `discount` 加 merchant_ 前缀(`discount_amt_cny` → `merchant_discount_amt_cny`):业务库字段 `discount` 字面"折扣"未体现"商家方"语义,加全名 merchant_ 前缀突出商家维度(不抄业务侧 SQL 别名 mer_coupon,因公司"折扣 vs 券"是否同义未确认,待业务侧定)。(b) `discount_amount` 从"待问是否派生"拍板按通式派生,临时命名 `discount_amount_amt_cny ❓`,与 merchant_discount 语义区别 + 最终命名仍待业务答复。(c) §1.4 加调度依赖行:DS DEPENDENT 同 dt ODS + 同 dt DIM ful_d 双前置(按 Kimball 维度退化原则,DWD 必依赖 DIM 同分区跑完)。 | — |
 | 2026-05-10 | **kb/93 加 ADR-08 DIM ful_d 跑批:业界主流模式 B + 字段 CASE WHEN 整组判断**:本项目第一张 dim 表 dim_usr_user_ful_d 落地(commits 5a28815 / 305a63b / ad7925a / 5d09add / 7c6cb5e)后沉淀通用范式:(a) init 扫 ods 历史 dt<${dt} + ROW_NUMBER 取每 pk 最新版本落 dim.${pdt} 首日分区;(b) sche 今日 ods 增量 + 昨日 dim 基线 + UNION ALL(today_rebuilt 重 join + unchanged 直接保留);(c) 字段合并按"组"用 CASE WHEN(如 base 整组按 bu.id IS NOT NULL,cert 整组按 ci.user_id IS NOT NULL),不用字段级 COALESCE 避免业务库主动置 NULL 场景下昨日值错误兜底;(d) init 与 sche 同日上线 init 灌 ${pdt} + sche 写 ${dt} 链路打通。否决方案:每日全量重算(扫描成本高)/ FULL JOIN + COALESCE(NULL 不安全)/ 扫 ods 历史 + broadcast filter(IO 成本高)/ MERGE INTO(Spark 2.4 不支持,迁 Spark 3+ / Hudi / Iceberg / Delta Lake 后再用)。 | — |
+| 2026-05-10 | **dwd_pay sche 改回算近 2 日 + 删 order_type='group' 过滤**:(a) 删 init / sche 两处 `order_type='group'`:dwd_pay 是数仓支付明细不限定订单类型,原 SQL 把 temp.sql 业务侧"拼团 GMV"分析的 `order_type='group'` 限定误搬入 dwd 入仓 SQL。(b) sche 写入策略从"单分区 dt=T-1 不回算"改为"回算近 2 日(dt IN ${dt}/${pdt})":原"不回算"基于"业务库 update_time 与 payment_success_time 同步刷新 OLTP 契约"假设;用户拍板"后端不可信",按业界主流 N=2 兜底跨零点漂移(业务时间 T-1 但 update_time 漂到 T 的事件,T+1 跑批时通过扫 ods.dt=T 兜回)。INSERT OVERWRITE PARTITION (dt) 动态分区只覆盖 SELECT 出现的 dt(kb/26 §8 项目默认 DYNAMIC mode)。(c) kb/27 §1.4 写入策略段同步更新(原"不回算"改"回算近 2 日 N=2 业界主流")。本次反复经过:5/9 信契约不回算 → 5/10 编漂移场景反悔 → 复盘 sessions 5/9 T8-T10 发现"自破契约"违反共识 → 用户重新拍板"后端不可信"接受回算 → 业界范围 N=2/3 拍 N=2。 | — |
 | 2026-04-22 | **仓库改名 `tendata-warehouse-release` → `poyee-data-warehouse` 收尾**:项目根目录由用户手动改名完成;代码侧 `dw_base/utils/file_utils.py:9` + `dw_base/utils/hdfs_merge_small_file.py:7` 两处 `re.sub(r"tendata-warehouse.*", ...)` 字面量同步更新。`.idea/*.iml` / `modules.xml` / `workspace.xml` 因 `.idea` + `*.iml` 在 `.gitignore`,属本地 IDE 状态,不入库亦不影响运行(老 `tendata-warehouse-release.iml` + modules.xml / workspace.xml 里的 module name 残留不处理)。联动 kb/90 §1.1 L88 表格行打勾 + §2.3 末尾"与仓库改名的联动"段压缩为一行记录 | — |
 | 2026-04-21 | **聚簇 A.4 Spark 参数外配 + `spark_sql.py` 三级覆盖**:按业务调整频率拆两文件入库 —— `conf/spark-defaults.conf`(12 条底层行为/开关类,初始化后少改:`spark.sql.adaptive/broadcastTimeout/codegen/arrow*/files/statistics.*` + `spark.dynamicAllocation.enabled` + `spark.files.ignoreCorruptFiles` + `spark.debug.maxToStringFields` + `spark.port.maxRetries` + `hive.exec.orc.default.block.size`)+ `conf/spark-tuning.conf`(10 条资源/并行度/队列,业务早期常改:`spark.{driver,executor}.{memory,cores}` + `spark.executor.instances` + `spark.executor.memoryOverhead` + `spark.driver.maxResultSize` + `spark.default.parallelism` + `spark.sql.shuffle.partitions` + `spark.yarn.queue`)。`dw_base/spark/spark_sql.py` 改造:(a) 模块级新增 `_load_spark_conf_file(path)`,读 Spark 原生 `key value` 格式,支持 `#` 注释与空行,文件缺失返回 `{}` 容错单测;(b) `__init__` 10 个 tuning 相关构造参数默认值 `'2g' / 200 / ...` → `Optional[...] = None` sentinel,不破坏既有调用点显式传参;(c) `__init_spark_session` 原 22 条硬编码 `.config(...)` 链替换为三段:L1 先 `spark-defaults.conf` 后 `spark-tuning.conf`(相同 key tuning 覆盖 defaults)→ L2 `self._final_spark_config`(SQL 内 SET)→ L3 构造参数非 None 项 + `extra_spark_config`(L3 内 extra 覆盖 named),保持原"extra > SQL SET > named" 的向后兼容;日志分层打 `L1/L2/L3` 前缀便于排查。联动:`kb/90 §2.2` conf 结构加 `spark-tuning.conf` + `§2.3` 改写为两文件模型(去单文件草案)+ 删"坑 2"(B1 → A2 依赖边)+ 聚簇 L59 依赖边删 | — |
 | 2026-04-22 | **datasource 扁平化 + db_type 按父目录段判定(方向反转)**:反转 2026-04-15 起立项的"一套代码跑多环境"设计(kb/90 原 §2.5:`-env` 参数 + `datasource/{db_type}/{env}/{instance}.ini` 分层 + env 注入)。实际前期跨环境同步是常态(test 业务库 → prod HDFS),"全局 env"概念不成立。新方案:(a) source ini 落位 `datasource/{db_type}/{env}-{实例简称}.ini`,env 写进文件名(env ∈ dev/test/prod),扁平组织;(b) sync ini 内 `dataSource = {db_type}/{env}-{实例简称}`(强制带 db_type 前缀;旧裸名 `hdfs-ha` 不再支持);(c) 代码侧 `dw_base/datax/plugins/plugin.py:37` + `plugin_factory.py:34` 的 db_type 提取从"文件名按 `-` 切首词"改为"按 `/` 切首段"(父目录名)—— 旧算法在新文件名出现 `-` 时会把 `env` 误判为 db_type,必须改。联动:kb/00 §1 目录树重绘(去 env 子目录,加样例 `prod-hobby.ini` / `test-hobby.ini` / `prod-ha.ini` 等 + sync ini 引用形式说明段)+ §6.1 配置分类表落位描述、kb/02 §4 同步、kb/21 §3.9 过时 3 行修正(废 `-env` 注入表述;引用形式 `{db_type}/{env}-{实例简称}`;JSON 输出路径去 `{env}/` 层)、kb/90 §2.1 硬编码表合并 3 行为 1 行 + §2.5 大幅压缩(三阶段设计整段删,保留路径解耦部分)+ §2.8 末尾表行更新 + §八 状态表合并、kb/92 L90 checklist 落位改扁平。conf/templates/datasource/\*.template.ini × 3 头注释补"落位"+"dataSource 引用"两行。`bin/datax-job-config-generator.py` L5/L114/L115 的路径注释讲的是另一个 `conf/datax/config/` 作业分组层级,与本次 datasource 扁平化无关,不动;`dw_base/datax/plugins/reader/mysql_reader.py:178` `{group}/mysql-{database}` 生成逻辑是非活跃代码(批量采集未启动),留到 §2.7 重构时对齐 | — |

+ 1 - 2
manual/backfill/20260510_dwd_trd_order_pay_apd_d_init.sql

@@ -82,8 +82,7 @@ FROM (
                 ORDER BY COALESCE(update_time, create_time) DESC
             ) AS rn
         FROM ods.ods_trd_card_group_order_info_inc_d
-        WHERE order_type = 'group'
-          AND status IN (101, 103, 104, 105, 106, 301, 302)
+        WHERE status IN (101, 103, 104, 105, 106, 301, 302)
           AND payment_success_time IS NOT NULL
     ) t
     WHERE t.rn = 1