瀏覽代碼

docs(kb): 加 kb/26 时间语义 + ADR-03 交叉引用

T 任务日作为唯一时间锚点,cdt/dt/tdt/pdt 数值映射 + DS 变量底层($[yyyyMMdd]
基于 sched 当天而非 system.biz.date 的反复绊人点)+ raw 48h 宽窗 / ods 双源
union 动态分区 + STATIC vs DYNAMIC 区分(DYNAMIC 是 ods 硬前提)+ 补数与串行
重跑幂等条件统一沉淀。避开"业务日"误称,留给 dwd 引入真业务时间。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu 1 天之前
父節點
當前提交
7d97935d24
共有 4 個文件被更改,包括 128 次插入0 次删除
  1. 1 0
      README.md
  2. 124 0
      kb/26-时间语义.md
  3. 1 0
      kb/92-重构进度.md
  4. 2 0
      kb/93-架构决策.md

+ 1 - 0
README.md

@@ -88,6 +88,7 @@ PG/ES ──DataX(raw)──> RAW ──> ODS ──> DWD ──> DWS ──> TD
 | [23-标签体系](kb/23-标签体系.md) | **TDM 画像**:用户 / 商品 / 商家标签 |
 | [24-raw 建模](kb/24-raw建模.md) | **raw 层字段裁剪决策**:各 raw 表三态标记(保留 / 明性裁 / 隐性裁)+ 决策理由溯源 |
 | [25-raw 接入流程](kb/25-raw接入流程.md) | **新表入仓标准 8 步**:sync 生成器 + mask 配置 + DDL 生成器 + raw 建模文档协同 |
+| [26-时间语义](kb/26-时间语义.md) | **时间变量约定**:T 任务日锚点 + DS 变量底层 + cdt/dt/tdt/pdt 项目参数 + raw/ods 各层 dt 语义 + 重跑幂等条件 |
 
 ### 3x 开发流程
 

+ 124 - 0
kb/26-时间语义.md

@@ -0,0 +1,124 @@
+# 时间语义
+
+## 1. 时间锚点与术语
+
+**T = 任务日 = sched 当天 = DS 工作流被 cron 触发的当天**(补数模式下 = 补数目标日)。所有时间变量都基于 T 展开。
+
+**T-1 / T-2 / T+1** 是相对偏移,无别名。
+
+**避免用"业务日"描述 raw / ods 的 dt**:raw / ods 用 `update_time` 字段切片,update_time 是业务库的**系统更新时间**(应用层 last-modified),不是真业务时间。真业务时间(下单 / 交易 / 支付完成等)从 dwd 层开始引入。raw / ods 这一层不要混用"业务日"。
+
+## 2. DS 时间变量底层
+
+`${system.biz.*}` 系列(基于 sched):
+
+| 变量 | 数值 |
+|---|---|
+| `${system.biz.curdate}` | T |
+| `${system.biz.date}` | T-1 |
+
+`$[yyyyMMdd±N]` 系列(**基于 sched 当天,不是基于 system.biz.date**):
+
+| 变量 | 数值 |
+|---|---|
+| `$[yyyyMMdd]` | T |
+| `$[yyyyMMdd+N]` | T+N |
+| `$[yyyyMMdd-N]` | T-N |
+
+⚠️ `$[yyyyMMdd]` 不是 `${system.biz.date}` 的别名。这一点反复绊人,写表达式前先回这里核对。
+
+### 实测验证
+
+DS SHELL 任务跑:
+
+```bash
+echo $[yyyyMMdd-2]
+echo ${system.biz.date}
+echo ${system.biz.curdate}
+echo $[yyyyMMdd+1]
+```
+
+定时跑 7 号(sched=2026-05-07)输出:
+
+```
+20260505    # T-2
+20260506    # T-1
+20260507    # T
+20260508    # T+1
+```
+
+补数 1 号(sched=2026-05-01)输出:
+
+```
+20260429    # T-2
+20260430    # T-1
+20260501    # T
+20260502    # T+1
+```
+
+## 3. 项目自定义全局参数
+
+DS 项目级 globalParams(poyee-data-warehouse 项目,所有工作流继承):
+
+| 变量 | DS 实现 | 数值 |
+|---|---|---|
+| `${cdt}` | `${system.biz.curdate}` | T |
+| `${dt}` | `${system.biz.date}` | T-1 |
+| `${tdt}` | `$[yyyyMMdd+1]` | T+1 |
+| `${pdt}` | `$[yyyyMMdd-2]` | T-2 |
+
+四变量统一以 T 为锚点。命名仅为简短可读,不带业务语义。
+
+## 4. raw 层 dt 语义
+
+- **抓窗**:`update_time ∈ [${dt}, ${tdt}) = [T-1, T+1)`,48h 宽窗(raw ini reader.where)
+- **写入分区**:dt = `${dt}` = T-1
+- **48h 宽窗设计目的**:覆盖跨日 update_time 漂移(详见 ADR-03 §零点漂移决策)
+- **重跑幂等**:DataX hdfs writer 写 `dt=T-1` 单分区,INSERT OVERWRITE 单分区语义 → 同 sched 重跑结果一致
+
+## 5. ods 层 dt 语义
+
+- **来源**:raw 双源 union — `WHERE dt IN ('${dt}', '${pdt}')` 即 raw dt=T-1 + raw dt=T-2 两个分区
+- **过滤**:`DATE_FORMAT(update_time, 'yyyyMMdd') = '${dt}'`,只保留 update_time 落在 T-1 那天的记录
+- **写入分区**:动态分区 `PARTITION (dt)`,行级 dt = `DATE_FORMAT(update_time, 'yyyyMMdd')`,配合过滤实际只写到 dt=T-1 一个分区
+- **跨日漂移修正**:raw dt=T-2 因 48h 宽窗抓到的"漂到 T-1"的部分,被 union 进 ods dt=T-1(详见 ADR-03)
+- **dedupe**:`ROW_NUMBER() OVER (PARTITION BY id, DATE_FORMAT(update_time, 'yyyyMMdd') ORDER BY update_time DESC) = 1`,分区内取最新版本
+- **跨 ods dt 不去重**:同 pk 多 dt 分区并存 = 上层拉链表(SCD Type 2)的底层
+- **重跑幂等前提**:`spark.sql.sources.partitionOverwriteMode = DYNAMIC`(必设;STATIC 模式动态分区会清空全表,违反幂等,详见 §8)
+
+## 6. dwd / dws / ads 层 dt 语义
+
+待后续设计落地后补充。
+
+设计预期:dwd 引入真业务时间(订单 create_time / 支付 payment_success_time / 交易 trade_amount 时间等),dwd dt 锚点 ≠ ods dt(update_time 锚点),跨层会出现 dt 错位——同一笔订单在 ods 中按 update_time 进入 dt 分区,在 dwd 中按 create_time 进入 dt 分区,两者通常不同。
+
+## 7. 补数(COMPLEMENT_DATA)语义
+
+DS 补数把 sched 设为补数目标日,所有时间变量按补数日重算。补数与定时的变量取值规则**完全一致**,无特殊处理。
+
+例:补 5/1 → sched=5/1 → cdt=5/1, dt=4/30, tdt=5/2, pdt=4/29 → raw 抓 `[4/30, 5/2)` 写 raw dt=4/30;ods 取 raw dt=4/30 + raw dt=4/29 → filter DATE(update_time)=4/30 → 写 ods dt=4/30。
+
+## 8. 串行重跑 / 日期递增幂等
+
+raw / ods 都按 "sched 唯一锚定 + INSERT OVERWRITE 单 dt 分区" 模式:
+
+- 任意 sched 重跑:覆盖该 sched 对应的单个 dt 分区,不影响其他 dt
+- 日期递增串行(sched=5/1, 5/2, 5/3 ... 依次跑):每次写自己的 dt 分区,互不干扰
+- ✓ 幂等,✓ 任意顺序
+
+### Spark `partitionOverwriteMode` 必读
+
+Spark 2.x 默认 `spark.sql.sources.partitionOverwriteMode = STATIC`。本项目 ods 用动态分区 `PARTITION (dt)` + SELECT 带出 dt 列,**必须显式改为 DYNAMIC**:
+
+| 模式 | 动态分区 INSERT OVERWRITE 行为 |
+|---|---|
+| STATIC(Spark 2.x 默认) | 覆盖整张表所有分区,没在 SELECT 出现的历史 dt 全部消失 |
+| DYNAMIC | 只覆盖 SELECT 实际产生的那些 dt,其他历史分区保留 |
+
+STATIC 模式下日常调度跑一次就清空 ods 全表,每天只剩当天一个 dt——彻底反幂等。**DYNAMIC 是本项目 ods 调度的硬前提**。
+
+PARTITION 子句静态指定值(如 `PARTITION (dt='20260507')`)不受此模式影响——无论 STATIC / DYNAMIC 都只覆盖指定的那个分区。
+
+### 幂等测试(入仓守护)
+
+幂等性必须由 tests/ 下集成测试守护:跑两次(或日期递增多次)验证 dt 分区集合一致、行数一致。具体测试设计见后续阶段。

+ 1 - 0
kb/92-重构进度.md

@@ -201,3 +201,4 @@
 | 2026-04-29 | **`datax-gc-generator` 从零重写为 `datax-sync-template-gen`(kb/90 §2.7 完成)**:(a) 老 798 行 `bin/datax-gc-generator.py` 整文件删(mysql 链路 + 6 种 from × to 组合,全项目零外部引用);(b) 新 `bin/datax-sync-template-gen.py`(200+ 行)—— 仅 PG → HDFS,按 sync ini `dataSource` 字段格式 `postgresql/{env}-{instance}` 走 `../datasource/{ref}.ini`,复用 `DataSourceFactory` + `PostgreSQLDataSource` 拿 jdbc/user/pwd,pg8000 直连查 `pg_catalog.pg_attribute` 拿全字段 + 注释,查 `pg_index` 单字段 PK,渲染 sync ini 模板(全字段 column / where 用 update_time / writer.path 用源表名 `_TODO_d` 占位 / `-o` nargs='?' 三态:不传 stdout / 不带值 `workspace/{yyyymmdd}/` / 带值自定义);(c) `requirements.txt` 加 `pg8000~=1.30`;(d) 单测 `tests/unit/datax/test_sync_template_gen.py` 9 条覆盖渲染 + JDBC 解析 + PK auto-detect 边界(mock conn);(e) 配套死代码清理(kb/90 §2.7 拆除清单 scope 收紧):`dw_base/datax/datax_utils.py` + `dw_base/common/template_constants.py` 两整文件删(零外部引用),`mysql_reader.py` 删 `generate_hive_ddl` + `generate_hive_over_hbase_ddl` 两方法 + 顶部 `template_constants` import;**保留**:`mysql_data_source.py` / `mysql_reader.py` 整文件 / `mysql_writer.py` / `data_source_factory.py` mysql 分支 / `plugin_factory.py` mysql 分支 / `MYSQL_KEYWORDS` / `generate_definition`(mysql 运行时同步代码,与 PG 同等地位,未来 mysql 同步可能用,scope 收紧 vs 老拆除清单) | — |
 | 2026-05-05 | **kb/01 §5 集群层外部 jar 节新建**:登记 `elasticsearch-hadoop-7.17.25.jar`(落点 `/opt/cloudera/parcels/CDH/lib/spark/jars/`,所有节点,运维分发,不入 git),ES 集群版本 7.17.25 | — |
 | 2026-05-07 | **ods 启动(ADR-06 ods 部分 ✅ + kb/30 §4.4 新增)**:(a) ADR-06 状态草案 → 已采纳,标题日期补 ods ✅ 2026-05-06;ods 层定稿条款写入:类型映射(int 系→BIGINT、numeric→DECIMAL(20,4)、timestamp→TIMESTAMP、其他无争议)、is_deleted BOOLEAN 软删归一、不加技术字段(反悔早先 etl_time/src_sys/src_tbl 三件套);反悔条件加 2 条(跨时区集群迁移、numeric 高精度场景)。(b) 新建 `conf/pg-to-hive-type.ini` 类型映射 conf。(c) `bin/hive-ddl-gen.py` 加 -l ods 分支:normalize_pg_type / load_type_mapping / map_pg_to_hive / reverse_ods_table_name / render_ods_ddl 5 函数;ods DDL 末尾加 is_deleted;不加技术字段;LOCATION /user/hive/warehouse/ods.db/。(d) 单测 `tests/unit/datax/test_hive_ddl_gen.py` 扩展 17 个 ods 测试,33 全过。(e) 跑生成器出 8 张 ods DDL 落 `manual/ddl/ods/{trd,usr,shp,prd}/`,作者统一 tianyu.chu 头其他 TODO 字段保留。(f) kb/30 新增 §4.4 ods 层(类型转换 + 软删归一)整节,节号重排 4.4 ads → 4.5、4.5 文件命名速查 → 4.6、4.6 表结构变更 → 4.7,line 480 内部引用 §4.6 → §4.7 同步 | — |
+| 2026-05-07 | **ods 阶段二(同步 SQL × 8 + 订单 init)+ kb/26 时间语义新文档**:(a) `jobs/ods/{trd,usr,shp,prd}/<表>_inc_d.sql` × 8 调度 SQL + `manual/backfill/ods_trd_card_group_order_info_init.sql` 订单 his_o + inc_d 一次性灌入 SQL;按 ADR-03 双源 union (raw dt=${dt} + raw dt=${pdt}) + `DATE_FORMAT(update_time)=${dt}` 过滤 + (id, ods_dt) dedupe + 动态分区写入;ods 8 表 DDL COMMENT 填业务名 + 状态改 [已执行]。(b) `kb/26-时间语义.md` 新建:T 任务日为唯一时间锚点;DS 变量底层 + 反复绊人点(`$[yyyyMMdd]` 基于 sched 当天而非 `${system.biz.date}`)+ 实测 echo 输出对照;项目级 globalParams 表(cdt=T / dt=T-1 / tdt=T+1 / pdt=T-2);raw 48h 宽窗 + ods 双源 union 各层 dt 语义 + 跨日漂移修正;补数语义 + 串行重跑幂等矩阵 + STATIC vs DYNAMIC 区分(DYNAMIC 是 ods 调度硬前提);避开"业务日"误称(raw/ods 用 update_time = 业务库系统时间不是真业务时间,dwd 才引入真业务时间)。(c) ADR-03 标题下加交叉引用 → kb/26;README 索引 2x 组加 kb/26 行 | — |

+ 2 - 0
kb/93-架构决策.md

@@ -53,6 +53,8 @@
 
 ### ADR-03 零点漂移决策
 
+> 时间变量定义(cdt/dt/tdt/pdt 数值映射)+ 各层 dt 重跑幂等条件 详见 [kb/26-时间语义](26-时间语义.md)。
+
 - **状态**:草案
   T+1 按 `update_time` 过滤单日窗口 `[day-start, day-stop)` 同步时,源库在同步执行期间持续写入——跨零点的记录 `update_time` 会从 N 号"漂到" N+1 号,单日窗口**无法捕获漂移记录**。