Преглед на файлове

docs(kb/93): 加 ADR-09 DWD 事件表跑批回算窗口 N=2

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu преди 3 дни
родител
ревизия
8a40f23b3b
променени са 2 файла, в които са добавени 43 реда и са изтрити 0 реда
  1. 1 0
      kb/92-重构进度.md
  2. 42 0
      kb/93-架构决策.md

+ 1 - 0
kb/92-重构进度.md

@@ -183,6 +183,7 @@
 | 2026-05-09 | **kb/27 discount→merchant_discount + discount_amount 加派生 + DWD 调度双前置**:(a) §2.5 派生映射 `discount` 加 merchant_ 前缀(`discount_amt_cny` → `merchant_discount_amt_cny`):业务库字段 `discount` 字面"折扣"未体现"商家方"语义,加全名 merchant_ 前缀突出商家维度(不抄业务侧 SQL 别名 mer_coupon,因公司"折扣 vs 券"是否同义未确认,待业务侧定)。(b) `discount_amount` 从"待问是否派生"拍板按通式派生,临时命名 `discount_amount_amt_cny ❓`,与 merchant_discount 语义区别 + 最终命名仍待业务答复。(c) §1.4 加调度依赖行:DS DEPENDENT 同 dt ODS + 同 dt DIM ful_d 双前置(按 Kimball 维度退化原则,DWD 必依赖 DIM 同分区跑完)。 | — |
 | 2026-05-10 | **kb/93 加 ADR-08 DIM ful_d 跑批:业界主流模式 B + 字段 CASE WHEN 整组判断**:本项目第一张 dim 表 dim_usr_user_ful_d 落地(commits 5a28815 / 305a63b / ad7925a / 5d09add / 7c6cb5e)后沉淀通用范式:(a) init 扫 ods 历史 dt<${dt} + ROW_NUMBER 取每 pk 最新版本落 dim.${pdt} 首日分区;(b) sche 今日 ods 增量 + 昨日 dim 基线 + UNION ALL(today_rebuilt 重 join + unchanged 直接保留);(c) 字段合并按"组"用 CASE WHEN(如 base 整组按 bu.id IS NOT NULL,cert 整组按 ci.user_id IS NOT NULL),不用字段级 COALESCE 避免业务库主动置 NULL 场景下昨日值错误兜底;(d) init 与 sche 同日上线 init 灌 ${pdt} + sche 写 ${dt} 链路打通。否决方案:每日全量重算(扫描成本高)/ FULL JOIN + COALESCE(NULL 不安全)/ 扫 ods 历史 + broadcast filter(IO 成本高)/ MERGE INTO(Spark 2.4 不支持,迁 Spark 3+ / Hudi / Iceberg / Delta Lake 后再用)。 | — |
 | 2026-05-10 | **dwd_pay sche 改回算近 2 日 + 删 order_type='group' 过滤**:(a) 删 init / sche 两处 `order_type='group'`:dwd_pay 是数仓支付明细不限定订单类型,原 SQL 把 temp.sql 业务侧"拼团 GMV"分析的 `order_type='group'` 限定误搬入 dwd 入仓 SQL。(b) sche 写入策略从"单分区 dt=T-1 不回算"改为"回算近 2 日(dt IN ${dt}/${pdt})":原"不回算"基于"业务库 update_time 与 payment_success_time 同步刷新 OLTP 契约"假设;用户拍板"后端不可信",按业界主流 N=2 兜底跨零点漂移(业务时间 T-1 但 update_time 漂到 T 的事件,T+1 跑批时通过扫 ods.dt=T 兜回)。INSERT OVERWRITE PARTITION (dt) 动态分区只覆盖 SELECT 出现的 dt(kb/26 §8 项目默认 DYNAMIC mode)。(c) kb/27 §1.4 写入策略段同步更新(原"不回算"改"回算近 2 日 N=2 业界主流")。本次反复经过:5/9 信契约不回算 → 5/10 编漂移场景反悔 → 复盘 sessions 5/9 T8-T10 发现"自破契约"违反共识 → 用户重新拍板"后端不可信"接受回算 → 业界范围 N=2/3 拍 N=2。 | — |
+| 2026-05-10 | **kb/93 加 ADR-09 DWD 事件表跑批回算窗口 N=2**:本项目第一张 dwd 事件表 dwd_trd_order_pay_apd_d 落地反复讨论后(5/9 信契约不回算 → 5/10 编漂移场景 → 自破契约 → 拍 N=2,commits 5a28815/a9b6eaa)定下范式。决策:sched=T 时扫 ods.dt IN (${dt},${pdt}) + 过滤业务时间 + INSERT OVERWRITE PARTITION (dt) 动态分区写 dwd.dt IN (${dt},${pdt});T+1 跑批时通过扫 ods.dt=T 兜回业务时间 T-1 漂移到 ods.dt=T 的事件重写 dwd.dt=T-1。理由:ods 已按 update_time 严格归位但业务库 OLTP 业务事件时间 / update_time 不一定严格同步(跨零点支付 600ms 延迟漂移到 ods.dt=T 单分区扫不到)。N=2 业界主流(阿里 OneData / 字节 / 美团默认)。否决:N=1 不回算(信契约不安全)/ N=3 保守 / N=7 极端 / MERGE INTO(Spark 2.4 不支持)/ 延后 1 天跑批(数据延迟不可接受)。反悔:业务库延迟 > 1 天频繁→上调 N / 迁 Spark 3+ → MERGE INTO / OLTP 强契约保证→降 N=1(实际很难保证)。 | — |
 | 2026-04-22 | **仓库改名 `tendata-warehouse-release` → `poyee-data-warehouse` 收尾**:项目根目录由用户手动改名完成;代码侧 `dw_base/utils/file_utils.py:9` + `dw_base/utils/hdfs_merge_small_file.py:7` 两处 `re.sub(r"tendata-warehouse.*", ...)` 字面量同步更新。`.idea/*.iml` / `modules.xml` / `workspace.xml` 因 `.idea` + `*.iml` 在 `.gitignore`,属本地 IDE 状态,不入库亦不影响运行(老 `tendata-warehouse-release.iml` + modules.xml / workspace.xml 里的 module name 残留不处理)。联动 kb/90 §1.1 L88 表格行打勾 + §2.3 末尾"与仓库改名的联动"段压缩为一行记录 | — |
 | 2026-04-21 | **聚簇 A.4 Spark 参数外配 + `spark_sql.py` 三级覆盖**:按业务调整频率拆两文件入库 —— `conf/spark-defaults.conf`(12 条底层行为/开关类,初始化后少改:`spark.sql.adaptive/broadcastTimeout/codegen/arrow*/files/statistics.*` + `spark.dynamicAllocation.enabled` + `spark.files.ignoreCorruptFiles` + `spark.debug.maxToStringFields` + `spark.port.maxRetries` + `hive.exec.orc.default.block.size`)+ `conf/spark-tuning.conf`(10 条资源/并行度/队列,业务早期常改:`spark.{driver,executor}.{memory,cores}` + `spark.executor.instances` + `spark.executor.memoryOverhead` + `spark.driver.maxResultSize` + `spark.default.parallelism` + `spark.sql.shuffle.partitions` + `spark.yarn.queue`)。`dw_base/spark/spark_sql.py` 改造:(a) 模块级新增 `_load_spark_conf_file(path)`,读 Spark 原生 `key value` 格式,支持 `#` 注释与空行,文件缺失返回 `{}` 容错单测;(b) `__init__` 10 个 tuning 相关构造参数默认值 `'2g' / 200 / ...` → `Optional[...] = None` sentinel,不破坏既有调用点显式传参;(c) `__init_spark_session` 原 22 条硬编码 `.config(...)` 链替换为三段:L1 先 `spark-defaults.conf` 后 `spark-tuning.conf`(相同 key tuning 覆盖 defaults)→ L2 `self._final_spark_config`(SQL 内 SET)→ L3 构造参数非 None 项 + `extra_spark_config`(L3 内 extra 覆盖 named),保持原"extra > SQL SET > named" 的向后兼容;日志分层打 `L1/L2/L3` 前缀便于排查。联动:`kb/90 §2.2` conf 结构加 `spark-tuning.conf` + `§2.3` 改写为两文件模型(去单文件草案)+ 删"坑 2"(B1 → A2 依赖边)+ 聚簇 L59 依赖边删 | — |
 | 2026-04-22 | **datasource 扁平化 + db_type 按父目录段判定(方向反转)**:反转 2026-04-15 起立项的"一套代码跑多环境"设计(kb/90 原 §2.5:`-env` 参数 + `datasource/{db_type}/{env}/{instance}.ini` 分层 + env 注入)。实际前期跨环境同步是常态(test 业务库 → prod HDFS),"全局 env"概念不成立。新方案:(a) source ini 落位 `datasource/{db_type}/{env}-{实例简称}.ini`,env 写进文件名(env ∈ dev/test/prod),扁平组织;(b) sync ini 内 `dataSource = {db_type}/{env}-{实例简称}`(强制带 db_type 前缀;旧裸名 `hdfs-ha` 不再支持);(c) 代码侧 `dw_base/datax/plugins/plugin.py:37` + `plugin_factory.py:34` 的 db_type 提取从"文件名按 `-` 切首词"改为"按 `/` 切首段"(父目录名)—— 旧算法在新文件名出现 `-` 时会把 `env` 误判为 db_type,必须改。联动:kb/00 §1 目录树重绘(去 env 子目录,加样例 `prod-hobby.ini` / `test-hobby.ini` / `prod-ha.ini` 等 + sync ini 引用形式说明段)+ §6.1 配置分类表落位描述、kb/02 §4 同步、kb/21 §3.9 过时 3 行修正(废 `-env` 注入表述;引用形式 `{db_type}/{env}-{实例简称}`;JSON 输出路径去 `{env}/` 层)、kb/90 §2.1 硬编码表合并 3 行为 1 行 + §2.5 大幅压缩(三阶段设计整段删,保留路径解耦部分)+ §2.8 末尾表行更新 + §八 状态表合并、kb/92 L90 checklist 落位改扁平。conf/templates/datasource/\*.template.ini × 3 头注释补"落位"+"dataSource 引用"两行。`bin/datax-job-config-generator.py` L5/L114/L115 的路径注释讲的是另一个 `conf/datax/config/` 作业分组层级,与本次 datasource 扁平化无关,不动;`dw_base/datax/plugins/reader/mysql_reader.py:178` `{group}/mysql-{database}` 生成逻辑是非活跃代码(批量采集未启动),留到 §2.7 重构时对齐 | — |

+ 42 - 0
kb/93-架构决策.md

@@ -353,3 +353,45 @@
   - 迁 Spark 3+ / Iceberg / Hudi / Delta Lake 后改 MERGE INTO(更优)
   - dim 表数据量增长导致 sche shuffle 性能下降,评估改"分区裁剪 + 局部重算"
   - 业务库消除"主动置 NULL" 场景后,可放宽到字段级 COALESCE 简化写法
+
+### ADR-09 DWD 事件表跑批回算窗口 N=2
+
+- **状态**:已采纳
+
+  本项目 ods 层按 ADR-03 严格按 `update_time` 归位 dt(双源 union 完整捕获 `update_time ∈ [T-1, T)` 范围所有版本),dwd 事件表理论上单分区扫 `ods.dt=T-1` 即可拿到所有"业务时间 T-1 + update_time T-1"的事件。
+
+  但业务库 OLTP 写入业务事件时间(如 `payment_success_time`)与 `update_time` 不一定严格同步刷新——典型场景:跨零点支付,`payment_success_time = T-1 23:59:59.500`,业务库 ORM 延迟 600ms 刷 `update_time = T 00:00:00.100`,该行落 `ods.dt=T`;事件本应归 `dwd.dt=T-1`,但 dwd 单分区扫 `ods.dt=T-1` 拿不到。这种漂移在业务高峰跨零点场景下不可忽略。
+
+  本项目第一张 dwd 事件表 `dwd_trd_order_pay_apd_d` 落地反复讨论后定下范式(5/9 一度采纳"信 OLTP 契约不回算" → 5/10 自破契约编漂移场景 → 复盘 sessions 5/9 T8-T10 发现违反共识 → 用户重新拍"后端不可信"→ 业界范围 N=2/3 拍 N=2 → commit `a9b6eaa`)。
+
+- **决策**:DWD 事件表跑批回算近 N=2 日:
+
+  - sche sched=T 时,扫 `ods.dt IN (${dt}, ${pdt})`(即 T-1 + T-2)
+  - 过滤业务时间 `DATE(business_event_time) IN (${dt}, ${pdt})`(如 dwd_pay 用 `payment_success_time`)
+  - 写入 dwd `dt IN (${dt}, ${pdt})`:`INSERT OVERWRITE PARTITION (dt)` 动态分区,kb/26 §8 项目默认 DYNAMIC mode 只覆盖 SELECT 出现的 dt,不动其他历史分区
+  - `dwd.dt=T-1` 在 sched=T 跑批时首次写入;sched=T+1 跑批时通过扫 `ods.dt=T` 兜回业务时间 T-1 漂移到 `ods.dt=T` 的事件,重写 `dwd.dt=T-1`
+  - N=2 业界主流(阿里 OneData / 字节 / 美团默认)
+
+- **后果**:
+
+  - 正面:
+    - 兜底跨零点 ods 漂移(业务库 `update_time` 不严格同步刷新场景)
+    - 不依赖 OLTP 应用层契约的强假设
+    - 与 kb/20 §7.3 通用兜底一致
+  - 负面:
+    - 每个 dt 分区被回算 2 次(首次 + 次日兜底),写入 / 计算成本上升 2×
+    - 动态分区写入需注意 dynamic mode 默认行为(kb/26 §8 已实测)
+
+- **候选方案**:
+
+  - **N=1 单分区不回算**(5/9 一度采纳):基于"业务库 update_time 与业务事件时间同步刷新 OLTP 契约"假设;OLTP 后端实际不可信,跨零点漂移会永久丢失事件——否决
+  - **N=3 回算近 3 日**:偏保守,金融 / 强稳定性场景;本项目业务侧无此强稳定性需求,2× 写入对齐业界默认即可——否决但留作"业务库延迟 > 1 天频繁时上调"反悔触发
+  - **N=7 回算近 7 日**:极端保守,少见——否决
+  - **MERGE INTO**(Spark 3+ / Iceberg / Hudi / Delta Lake):不需回算逻辑,引擎原生 upsert 处理漂移——本项目 Spark 2.4 不支持,本阶段不取
+  - **延后 1 天跑批**(sched=T+1 写 dwd.dt=T-1,扫 ods.dt IN (T-1, T)):dwd 数据延迟 1 天可用,下游标签延迟,业务侧不可接受——否决
+
+- **反悔条件**:
+
+  - 业务库延迟 > 1 天的漂移场景频繁出现(如系统升级 / 故障恢复多日数据回流),N 上调到 3 或更大
+  - 迁 Spark 3+ / Iceberg / Hudi / Delta Lake 后改 MERGE INTO(更优)
+  - 业务侧能严格保证 OLTP 应用层契约(update_time 与事件时间同步刷新),可降到 N=1(实际很难保证)