26-时间语义.md 5.4 KB

时间语义

1. 时间锚点与术语

T = 任务日 = sched 当天 = DS 工作流被 cron 触发的当天(补数模式下 = 补数目标日)。所有时间变量都基于 T 展开。

T-1 / T-2 / T+1 是相对偏移,无别名。

避免用"业务日"描述 raw / ods 的 dt:raw / ods 用 update_time 字段切片,update_time 是业务库的系统更新时间(应用层 last-modified),不是真业务时间。真业务时间(下单 / 交易 / 支付完成等)从 dwd 层开始引入。raw / ods 这一层不要混用"业务日"。

2. DS 时间变量底层

${system.biz.*} 系列(基于 sched):

变量 数值
${system.biz.curdate} T
${system.biz.date} T-1

$[yyyyMMdd±N] 系列(基于 sched 当天,不是基于 system.biz.date):

变量 数值
$[yyyyMMdd] T
$[yyyyMMdd+N] T+N
$[yyyyMMdd-N] T-N

⚠️ $[yyyyMMdd] 不是 ${system.biz.date} 的别名。这一点反复绊人,写表达式前先回这里核对。

数值上恒等关系${system.biz.date}$[yyyyMMdd-1] = T-1。两者实现路径不同但值相同,可互换;项目当前 ${dt}${system.biz.date} 是历史选择。

实测验证

DS SHELL 任务跑:

echo $[yyyyMMdd-2]
echo ${system.biz.date}
echo ${system.biz.curdate}
echo $[yyyyMMdd+1]

定时跑 7 号(sched=2026-05-07)输出:

20260505    # T-2
20260506    # T-1
20260507    # T
20260508    # T+1

补数 1 号(sched=2026-05-01)输出:

20260429    # T-2
20260430    # T-1
20260501    # T
20260502    # T+1

3. 项目自定义全局参数

DS 项目级 globalParams(poyee-data-warehouse 项目,所有工作流继承):

变量 DS 实现 数值
${cdt} ${system.biz.curdate} T
${dt} ${system.biz.date} T-1
${tdt} $[yyyyMMdd+1] T+1
${pdt} $[yyyyMMdd-2] T-2

四变量统一以 T 为锚点。命名仅为简短可读,不带业务语义。

4. raw 层 dt 语义

  • 抓窗update_time ∈ [${dt}, ${tdt}) = [T-1, T+1),48h 宽窗(raw ini reader.where)
  • 写入分区:dt = ${dt} = T-1
  • 48h 宽窗设计目的:覆盖跨日 update_time 漂移(详见 ADR-03 §零点漂移决策)
  • 重跑幂等:DataX hdfs writer 写 dt=T-1 单分区,INSERT OVERWRITE 单分区语义 → 同 sched 重跑结果一致

5. ods 层 dt 语义

  • 来源:raw 双源 union — WHERE dt IN ('${dt}', '${pdt}') 即 raw dt=T-1 + raw dt=T-2 两个分区
  • 过滤DATE_FORMAT(update_time, 'yyyyMMdd') = '${dt}',只保留 update_time 落在 T-1 那天的记录
  • 写入分区:动态分区 PARTITION (dt),行级 dt = DATE_FORMAT(update_time, 'yyyyMMdd'),配合过滤实际只写到 dt=T-1 一个分区
  • 跨日漂移修正:raw dt=T-2 因 48h 宽窗抓到的"漂到 T-1"的部分,被 union 进 ods dt=T-1(详见 ADR-03)
  • dedupeROW_NUMBER() OVER (PARTITION BY id, DATE_FORMAT(update_time, 'yyyyMMdd') ORDER BY update_time DESC) = 1,分区内取最新版本
  • 跨 ods dt 不去重:同 pk 多 dt 分区并存 = 上层拉链表(SCD Type 2)的底层
  • 重跑幂等前提spark.sql.sources.partitionOverwriteMode = DYNAMIC(必设;STATIC 模式动态分区会清空全表,违反幂等,详见 §8)

6. dwd / dws / ads 层 dt 语义

待后续设计落地后补充。

设计预期:dwd 引入真业务时间(订单 create_time / 支付 payment_success_time / 交易 trade_amount 时间等),dwd dt 锚点 ≠ ods dt(update_time 锚点),跨层会出现 dt 错位——同一笔订单在 ods 中按 update_time 进入 dt 分区,在 dwd 中按 create_time 进入 dt 分区,两者通常不同。

7. 补数(COMPLEMENT_DATA)语义

DS 补数把 sched 设为补数目标日,所有时间变量按补数日重算。补数与定时的变量取值规则完全一致,无特殊处理。

例:补 5/1 → sched=5/1 → cdt=5/1, dt=4/30, tdt=5/2, pdt=4/29 → raw 抓 [4/30, 5/2) 写 raw dt=4/30;ods 取 raw dt=4/30 + raw dt=4/29 → filter DATE(update_time)=4/30 → 写 ods dt=4/30。

8. 串行重跑 / 日期递增幂等

raw / ods 都按 "sched 唯一锚定 + INSERT OVERWRITE 单 dt 分区" 模式:

  • 任意 sched 重跑:覆盖该 sched 对应的单个 dt 分区,不影响其他 dt
  • 日期递增串行(sched=5/1, 5/2, 5/3 ... 依次跑):每次写自己的 dt 分区,互不干扰
  • ✓ 幂等,✓ 任意顺序

Spark partitionOverwriteMode 必读

Spark 2.x 默认 spark.sql.sources.partitionOverwriteMode = STATIC。本项目 ods 用动态分区 PARTITION (dt) + SELECT 带出 dt 列,必须显式改为 DYNAMIC

模式 动态分区 INSERT OVERWRITE 行为
STATIC(Spark 2.x 默认) 覆盖整张表所有分区,没在 SELECT 出现的历史 dt 全部消失
DYNAMIC 只覆盖 SELECT 实际产生的那些 dt,其他历史分区保留

STATIC 模式下日常调度跑一次就清空 ods 全表,每天只剩当天一个 dt——彻底反幂等。DYNAMIC 是本项目 ods 调度的硬前提

PARTITION 子句静态指定值(如 PARTITION (dt='20260507'))不受此模式影响——无论 STATIC / DYNAMIC 都只覆盖指定的那个分区。

幂等测试(入仓守护)

幂等性必须由 tests/ 下集成测试守护:跑两次(或日期递增多次)验证 dt 分区集合一致、行数一致。具体测试设计见后续阶段。