26-时间语义.md 5.8 KB

时间语义

1. 时间锚点与术语

T = 任务日 = sched 当天 = DS 工作流被 cron 触发的当天(补数模式下 = 补数目标日)。所有时间变量都基于 T 展开。

T-1 / T-2 / T+1 是相对偏移,无别名。

避免用"业务日"描述 raw / ods 的 dt:raw / ods 用 update_time 字段切片,update_time 是业务库的系统更新时间(应用层 last-modified),不是真业务时间。真业务时间(下单 / 交易 / 支付完成等)从 dwd 层开始引入。raw / ods 这一层不要混用"业务日"。

2. DS 时间变量底层

${system.biz.*} 系列(基于 sched):

变量 数值
${system.biz.curdate} T
${system.biz.date} T-1

$[yyyyMMdd±N] 系列(基于 sched 当天,不是基于 system.biz.date):

变量 数值
$[yyyyMMdd] T
$[yyyyMMdd+N] T+N
$[yyyyMMdd-N] T-N

⚠️ $[yyyyMMdd] 不是 ${system.biz.date} 的别名。这一点反复绊人,写表达式前先回这里核对。

数值上恒等关系${system.biz.date}$[yyyyMMdd-1] = T-1。两者实现路径不同但值相同,可互换;项目当前 ${dt}${system.biz.date} 是历史选择。

实测验证

DS SHELL 任务跑:

echo $[yyyyMMdd-2]
echo ${system.biz.date}
echo ${system.biz.curdate}
echo $[yyyyMMdd+1]

定时跑 7 号(sched=2026-05-07)输出:

20260505    # T-2
20260506    # T-1
20260507    # T
20260508    # T+1

补数 1 号(sched=2026-05-01)输出:

20260429    # T-2
20260430    # T-1
20260501    # T
20260502    # T+1

3. 项目自定义全局参数

DS 项目级 globalParams(poyee-data-warehouse 项目,所有工作流继承):

变量 DS 实现 数值
${cdt} ${system.biz.curdate} T
${dt} ${system.biz.date} T-1
${tdt} $[yyyyMMdd+1] T+1
${pdt} $[yyyyMMdd-2] T-2

四变量统一以 T 为锚点。命名仅为简短可读,不带业务语义。

4. raw 层 dt 语义

  • 抓窗update_time ∈ [${dt}, ${tdt}) = [T-1, T+1),48h 宽窗(raw ini reader.where)
  • 写入分区:dt = ${dt} = T-1
  • 48h 宽窗设计目的:覆盖跨日 update_time 漂移(详见 ADR-03 §零点漂移决策)
  • 重跑幂等:DataX hdfs writer 写 dt=T-1 单分区,INSERT OVERWRITE 单分区语义 → 同 sched 重跑结果一致

5. ods 层 dt 语义

  • 来源:raw 双源 union — WHERE dt IN ('${dt}', '${pdt}') 即 raw dt=T-1 + raw dt=T-2 两个分区
  • 过滤DATE_FORMAT(update_time, 'yyyyMMdd') = '${dt}',只保留 update_time 落在 T-1 那天的记录
  • 写入分区:动态分区 PARTITION (dt),行级 dt = DATE_FORMAT(update_time, 'yyyyMMdd'),配合过滤实际只写到 dt=T-1 一个分区
  • 跨日漂移修正:raw dt=T-2 因 48h 宽窗抓到的"漂到 T-1"的部分,被 union 进 ods dt=T-1(详见 ADR-03)
  • dedupeROW_NUMBER() OVER (PARTITION BY id, DATE_FORMAT(update_time, 'yyyyMMdd') ORDER BY update_time DESC) = 1,分区内取最新版本
  • 跨 ods dt 不去重:同 pk 多 dt 分区并存 = 上层拉链表(SCD Type 2)的底层
  • 重跑幂等:动态分区 INSERT OVERWRITE 只覆盖 SELECT 出现的 dt,其他历史 dt 保留(实测见 §8 + tests/integration/spark/idempotence/)

6. dwd / dws / ads 层 dt 语义

待后续设计落地后补充。

设计预期:dwd 引入真业务时间(订单 create_time / 支付 payment_success_time / 交易 trade_amount 时间等),dwd dt 锚点 ≠ ods dt(update_time 锚点),跨层会出现 dt 错位——同一笔订单在 ods 中按 update_time 进入 dt 分区,在 dwd 中按 create_time 进入 dt 分区,两者通常不同。

7. 补数(COMPLEMENT_DATA)语义

DS 补数把 sched 设为补数目标日,所有时间变量按补数日重算。补数与定时的变量取值规则完全一致,无特殊处理。

例:补 5/1 → sched=5/1 → cdt=5/1, dt=4/30, tdt=5/2, pdt=4/29 → raw 抓 [4/30, 5/2) 写 raw dt=4/30;ods 取 raw dt=4/30 + raw dt=4/29 → filter DATE(update_time)=4/30 → 写 ods dt=4/30。

8. 串行重跑 / 日期递增幂等

raw / ods 都按 "sched 唯一锚定 + INSERT OVERWRITE 单 dt 分区" 模式:

  • 任意 sched 重跑:覆盖该 sched 对应的单个 dt 分区,不影响其他 dt
  • 日期递增串行(sched=5/1, 5/2, 5/3 ... 依次跑):每次写自己的 dt 分区,互不干扰
  • ✓ 幂等,✓ 任意顺序

Spark partitionOverwriteMode 实测

spark.sql.sources.partitionOverwriteMode 控制 Spark 在动态分区 INSERT OVERWRITE TABLE x PARTITION (col) 时的覆盖行为:

模式 动态分区 INSERT OVERWRITE 行为
STATIC 覆盖整张表所有分区,没在 SELECT 出现的历史 dt 全部消失
DYNAMIC 只覆盖 SELECT 实际产生的那些 dt,其他历史分区保留

PARTITION 子句静态指定值(如 PARTITION (dt='20260507'))不受此模式影响——无论 STATIC / DYNAMIC 都只覆盖指定的那个分区。

本环境实测(2026-05-07,tests/integration/spark/idempotence/partition_overwrite_default.sql):Spark 2.4 + CDH 6.3.2 + Hive ORC EXTERNAL TABLE 不显式设置时默认即 DYNAMIC —— 灌 5 个 dt 后跑动态分区 INSERT OVERWRITE 只产 2 个 dt,剩 5 个分区(03/04 被覆盖,01/02/05 保留),符合 DYNAMIC 语义。

不需要在 dw_base/spark/spark_sql.py 显式设 DYNAMIC,也不需要在 ods SQL 顶部加 SET。本环境实测结果是这样,但 Spark / Hive 升级或 CDH 配置变更可能改变默认行为,回归靠 §"幂等测试"。

幂等测试(入仓守护)

幂等性必须由 tests/ 下集成测试守护:跑两次(或日期递增多次)验证 dt 分区集合一致、行数一致。具体测试设计见后续阶段。