Эх сурвалжийг харах

refactor(datax): 分区 dt 改用 start_date(业务日),与 ADR-03 raw 48h 宽窗对齐

老 dt = stop-1 在单日窗口下 == 业务日是巧合;一旦 raw 走 48h 宽窗
(stop = 业务日+2),dt 跑到业务日+1 与业务日错位。改 dt = start_date
后业务日 = 分区名,分区内允许含次日漂移(按 ADR-03 raw 不纠正分区漂移)。

- partition.py: 删 compute_partition_dt,parse_ini_partition 用 start_date
- hdfs_writer.py: ${dt} 替换从 stop_date-1 改 start_date,docstring 同步
- entry.py: 调用方传 start_date
- test_partition.py: 单测 schema 翻
- kb/93 ADR-03 + kb/90 §2.6 实现建议 5 + kb/20 §7.2 表格 + §7.3 写入窗口 +
  kb/94 重构对比第 10 行同步表述
tianyu.chu 1 долоо хоног өмнө
parent
commit
a7ba435e51

+ 1 - 1
dw_base/datax/entry.py

@@ -145,7 +145,7 @@ class DataxImport(_BaseDatax):
         if not skip_partition:
             ddls = []
             for ini in ini_list:
-                ddl = partition.parse_ini_partition(ini, stop_date)
+                ddl = partition.parse_ini_partition(ini, start_date)
                 if ddl:
                     ddls.append(ddl)
             partition.execute_ddls(ddls)

+ 6 - 15
dw_base/datax/partition.py

@@ -7,30 +7,22 @@ DataX hive-import 场景的分区管理(从 bin/datax-multiple-hive-job-starte
 - parse_ini_partition: 从 ini 的 writer.path 抽 {db}.{table},返回 ALTER DDL;非分区表返回 None
 - execute_ddls: 调 hive -e 执行一批 ALTER DDL
 
-关键约束(kb/90 §2.6 实现建议 5):分区 dt 用 stop_date - 1 day,与 HDFS writer 对齐;
-不沿用老脚本 START_DATE = dt 的假设(单日范围下两者相等没问题,多日范围错位)。
+关键约束:分区 dt = start_date(业务日),与 HDFS writer 对齐;分区内允许含次日漂移
+数据(按 ADR-03 raw 不纠正分区漂移)。配套 raw 48h 宽窗:where 用 [start, stop) 而 dt
+跟 start_date,抓到的"漂到次日"记录统一落 dt=start_date 分区,由 ods 按 update_time 归位。
 """
 import subprocess
 from configparser import ConfigParser
-from datetime import datetime, timedelta
 from typing import List, Optional
 
 
 _DT_PLACEHOLDER = '/dt=${dt}'
 
 
-def compute_partition_dt(stop_date: str) -> str:
-    """
-    分区日期 = stop_date - 1 day(与 HDFS writer 对齐)。
-    stop_date 格式 yyyyMMdd。
-    """
-    stop_at = datetime.strptime(stop_date, '%Y%m%d')
-    return (stop_at - timedelta(days=1)).strftime('%Y%m%d')
-
-
-def parse_ini_partition(ini_path: str, stop_date: str) -> Optional[str]:
+def parse_ini_partition(ini_path: str, start_date: str) -> Optional[str]:
     """
     读 ini 的 writer.path 提取 {db}.{table},生成 ALTER ADD PARTITION DDL。
+    分区 dt = start_date(业务日)。
     非分区表(path 不含 /dt=${dt})或找不到 {db}.{table} 段 → None。
     """
     cp = ConfigParser()
@@ -52,9 +44,8 @@ def parse_ini_partition(ini_path: str, stop_date: str) -> Optional[str]:
     if not (db and tbl):
         return None
 
-    dt = compute_partition_dt(stop_date)
     return 'ALTER TABLE {db}.{tbl} ADD IF NOT EXISTS PARTITION(dt={dt});'.format(
-        db=db, tbl=tbl, dt=dt,
+        db=db, tbl=tbl, dt=start_date,
     )
 
 

+ 3 - 4
dw_base/datax/plugins/writer/hdfs_writer.py

@@ -22,7 +22,8 @@ class HDFSWriter(Writer):
     HDFSWriter有4个时间字段:
     1. start_date:表示数据的起始日期,从Reader处传递而来,全量时传递19700101即可
     2. stop_date:表示数据的终止日期,从Reader传递而来
-    3. dt:表示分区日期,值为stop_date - 1 day
+    3. dt:表示分区日期,值为 start_date(业务日);分区内允许含次日漂移数据
+       (按 ADR-03 raw 不纠正分区漂移,宽窗 [start, stop) 抓到的全部入此分区)
     4. biz_date:表示文件的前缀,当start_date + 1 day = stop_date时,值为start_date,否则值为${start_date}-${stop_date - 1 day}
     """
 
@@ -34,9 +35,7 @@ class HDFSWriter(Writer):
         path = self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_PATH)
         self.check_config(HDFS_WRITER_PARAMETER_PATH, path)
         if path.__contains__('${dt}'):
-            stop_at = datetime.strptime(self.stop_date, '%Y%m%d')
-            dt = (stop_at - timedelta(days=1)).strftime('%Y%m%d')
-            path = path.replace('${dt}', dt)
+            path = path.replace('${dt}', self.start_date)
         self.parameter[HDFS_WRITER_PARAMETER_PATH] = path
         self.parameter[HDFS_WRITER_PARAMETER_FILE_TYPE] = \
             self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FILE_TYPE) or 'text'

+ 2 - 2
kb/20-数仓分层与建模.md

@@ -205,7 +205,7 @@ RDS PG / ES ──DataX──▶ RAW ──SparkSQL──▶ ODS ──▶ DWD 
 
 | 层 | 分区键语义 | 时间字段来源 | 漂移容忍 | 重复容忍 | 分区间关系 |
 |---|---|---|---|---|---|
-| Raw | 批次日(系统时间) | `dt = stop - 1`(抽取任务的逻辑日期) | **容忍** | **容忍**(同 pk 同 dt 内可多条) | 独立,可含跨日漂移数据 |
+| Raw | 批次日(系统时间) | `dt = start_date`(业务日) | **容忍** | **容忍**(同 pk 同 dt 内可多条) | 独立,可含跨日漂移数据 |
 | ODS | 记录最后写入日(系统时间) | `DATE(update_time)` | 不容忍 | 分区内不容忍、**跨分区容忍**(保留 `update_time` 轨迹) | 独立,同 pk 可跨多分区 |
 | DWD 事实表 | 业务行为发生日(业务时间) | `DATE(order_create_time)` / `DATE(event_time)` 等业务字段 | 不容忍 | 不容忍(事件不可变) | 独立,追加写 |
 | DIM 拉链表 | 不按时间分区(或 `is_current` 二级分区) | —— | —— | **多版本非重复**(`[start_date, end_date)` 区间不重叠) | 每行是状态生效区间 |
@@ -239,7 +239,7 @@ RDS PG / ES ──DataX──▶ RAW ──SparkSQL──▶ ODS ──▶ DWD 
 
 > 基础职责见 §2 + §8.1。
 
-- **写入窗口**:抽取窗口 `[day-start, day+1-stop)`(48 小时宽),所有抓到的记录统一落 `dt = stop - 1` 分区
+- **写入窗口**:抽取窗口 `[day-start, day+1-stop)`(48 小时宽),所有抓到的记录统一落 `dt = start_date`(业务日)分区
 - **设计理由**:宽窗覆盖"零点漂移"和"覆盖式更新下的永久丢失",保证数据永不丢失
 - **代价**:分区里混有"未来时间"的记录 + 同 pk 可能重复出现,接受这两个代价换抽取逻辑简单
 

+ 1 - 1
kb/90-重构路线.md

@@ -194,7 +194,7 @@ L3   SparkSQL(...) 显式传参  +  extra_spark_config  +  命令行 -sc
 2. 模块放 `dw_base/datax/` 包内,不横跨 `io/` / `utils/` —— **放宽聚簇 B2 前置**(四模块边界定稿前先落地)。等 B2 定稿后,`path_utils.log_path` 等纯函数、`worker.ssh_run` 中跨包部分再挪到 `dw_base/utils/` / `dw_base/io/`,只改 import 路径
 3. `-inis <dir>` 目录扫描只递归 `.ini` 文件
 4. 老脚本 `datax-{single,multiple,multiple-hive}-job-starter.{sh,py}` + `datax-job-config-generator.py` 在冒烟 2(新入口端到端通过)后整体删,**不保留兼容转发封装**
-5. `partition.py` 预建分区的 dt 必须用 `stop_date - 1 day`,和 HDFS writer(见 `hdfs_writer.py:23-26`)对齐;不沿用老脚本 `START_DATE` 作为 dt 的假设——老脚本在单日范围(`start+1 == stop`)下两者相等看不出问题,多日范围(如补 N 天)会出现 HDFS writer 写 `stop-1` 分区、`ALTER ADD PARTITION` 建 `start` 分区的错位,数据落的分区没被预建
+5. `partition.py` 预建分区的 dt 用 `start_date`(业务日),和 HDFS writer(见 `hdfs_writer.py:23-26`)对齐;分区内允许含次日漂移数据(按 ADR-03 raw 不纠正分区漂移,配套 raw 48h 宽窗机制)。早先方案曾用 `stop_date - 1 day`——单日窗口下与 start_date 数值相等没问题,但与 raw 宽窗(stop = 业务日+2)配合时分区会跑到业务日+1 与业务日错位,反向校正回 start_date
 
 **本轮不做、后延 ADR**:下列能力属新增需求、非老入口平迁,暂不实现;若将来出现明确场景,单独开 ADR 落 `kb/93`。按优先级分:
 

+ 1 - 1
kb/93-架构决策.md

@@ -70,7 +70,7 @@
 - **决策**:
   - raw 层 where 右界扩展为次日同期:`where update_time >= '{day-start} 00:00' AND update_time < '{day+1-stop} 00:00'`,在关闭侧加 1 天 buffer 覆盖漂移
   - buffer 取 1 天而非 N 小时:**定时可变更**(同步执行时刻从 6 点调到 3 点或 8 点都不影响窗口语义)+ **可维护**(固定区间可复跑、可回刷)
-  - raw 层不纠正分区漂移:所有抓到的记录(含"漂到次日"的)按 ini `dt = stop-1` 统一落当日分区
+  - raw 层不纠正分区漂移:所有抓到的记录(含"漂到次日"的)按 ini `dt = start_date`(业务日)统一落当日分区,分区内允许含次日漂移数据
   - ods 层 Spark SQL 用**动态分区** `PARTITION (dt)`,按每行 `update_time` 真实日期归位
   - ods 写入模式两方案并列(默认方案**本 ADR 不预设**,实施时按具体表需求选 / 可表级混用):
     - **方案 A:`INSERT OVERWRITE`(覆盖式)** —— 每次 ods 跑覆盖对应 dt 分区。不丢数据:**数据只会向后漂移一天**(某条 update_time=N 号 的记录若 N 号 raw 没抓到漂到 N+1 号,N+1 号 raw 必然抓到——漂移不会再漂到 N+2 号),N+1 号 ods 覆盖 dt=N 号 时能补齐

+ 1 - 1
kb/94-重构对比.md

@@ -18,7 +18,7 @@
 | 7 | 优雅与否 | `set -e` + command-substitution 静默退出坑;`parse_ddl` 用 `grep "path =" ` 字面子串(多空格对齐就漏);5 个硬编码数组 + `--override` 从未激活;`__contains__(k)` 反 Pythonic | `in` / ConfigParser / os.path 标准用法;NamedTuple / 纯函数 / 闭包分层;类型注解;异常抛堆栈 |
 | 8 | 死代码 | `plugin.py: import pwd` 无引用、`mysql_reader` 顶层 import 已删的 `dw_base.database.mysql_utils`、5 个 `xxx_array=()` 空壳、`--override` case 无作用对象、`conf/datax/config/` 前缀剥离(目录已归档 bak)、`conf/datax/generated` 路径默认值 | 无(单测 47 条覆盖) |
 | 9 | Hive 分区管理 | `parse_ddl` 依赖 ini `path = ` 单空格字面子串、从 path 按 `/` 切段找 `.db` 段 | `partition.parse_ini_partition` 走 `ConfigParser.get('writer', 'path')` 正规读;`.db` 段抽取逻辑同语义但容错更好 |
-| 10 | 分区 dt 计算 | `START_DATE`(单日范围对、多日范围错位 —— 见 kb/90 §2.6 实现建议 5 的坑) | `stop_date - 1 day`,和 HDFS writer 注释对齐 |
+| 10 | 分区 dt 计算 | `START_DATE` 但 writer 用 `stop-1` → 不一致(单日范围歧义掩盖、多日范围错位) | `start_date`(业务日),partition + writer 双方对齐;分区内允许含次日漂移(配套 ADR-03 raw 48h 宽窗) |
 | 11 | 字段脱敏 | 不支持;需要在 ini 里自己手写 querySql 用 SQL 表达式脱敏 | `[mask]` 声明式段自动生成 querySql;PG 5 种脱敏(3 静态 `month_trunc` / `md5` / `mask_middle` + 2 动态 `keep_first_{n}` / `keep_last_{n}`),列名白名单防 SQL 注入 |
 | 12 | Workers 配置 | 硬编码在 `init.sh:13-28`(老 m3/d1-d4 列表 + 权重 map) | 外配 `conf/workers.ini`(新 cdhmaster02/cdhnode01-03) |
 | 13 | 日志路径 | 多维派生:`${LOG_ROOT_DIR}/datax/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${GROUP}/${START_DATE}/${JOB_NAME}.log`,路径含老 `conf/datax/config/` 残留语义 | 扁平化:`${LOG_ROOT_DIR}/datax/${dt}/${job_name}.log`(对齐 kb/90 §7.2.1) |

+ 9 - 17
tests/unit/datax/test_partition.py

@@ -3,7 +3,7 @@ import textwrap
 from pathlib import Path
 from unittest.mock import patch
 
-from dw_base.datax.partition import compute_partition_dt, execute_ddls, parse_ini_partition
+from dw_base.datax.partition import execute_ddls, parse_ini_partition
 
 
 def _write_ini(tmp_path: Path, content: str) -> str:
@@ -12,21 +12,13 @@ def _write_ini(tmp_path: Path, content: str) -> str:
     return str(p)
 
 
-def test_compute_partition_dt_stop_minus_1():
-    assert compute_partition_dt('20260423') == '20260422'
-
-
-def test_compute_partition_dt_month_boundary():
-    assert compute_partition_dt('20260401') == '20260331'
-
-
 def test_parse_partitioned_writer(tmp_path):
     ini = _write_ini(tmp_path, '''\
         [writer]
         dataSource = hdfs/prd-ha
         path = /user/hive/warehouse/test.db/raw_usr_app_user_cert_info_inc_d/dt=${dt}/
     ''')
-    ddl = parse_ini_partition(ini, stop_date='20260423')
+    ddl = parse_ini_partition(ini, start_date='20260422')
     assert ddl == ('ALTER TABLE test.raw_usr_app_user_cert_info_inc_d '
                    'ADD IF NOT EXISTS PARTITION(dt=20260422);')
 
@@ -37,7 +29,7 @@ def test_parse_non_partitioned(tmp_path):
         dataSource = hdfs/xx
         path = /user/hive/warehouse/test.db/non_partitioned/
     ''')
-    assert parse_ini_partition(ini, stop_date='20260423') is None
+    assert parse_ini_partition(ini, start_date='20260422') is None
 
 
 def test_parse_no_writer_path(tmp_path):
@@ -45,18 +37,18 @@ def test_parse_no_writer_path(tmp_path):
         [writer]
         dataSource = mongo/xx
     ''')
-    assert parse_ini_partition(ini, stop_date='20260423') is None
+    assert parse_ini_partition(ini, start_date='20260422') is None
 
 
-def test_parse_dt_aligns_with_stop_minus_1_for_multiday(tmp_path):
-    # 多日范围 start=20260401 stop=20260410:dt 应 = stop-1 = 20260409
-    # 避免老脚本 START_DATE=dt 假设引入的多日范围分区错位
+def test_parse_dt_aligns_with_start_date_for_multiday(tmp_path):
+    # 多日范围 start=20260401 stop=20260410:dt 应 = start_date = 20260401(业务日)
+    # 与 HDFS writer 对齐,分区内允许含 [start, stop) 范围内所有数据
     ini = _write_ini(tmp_path, '''\
         [writer]
         path = /user/hive/warehouse/db1.db/t1/dt=${dt}/
     ''')
-    ddl = parse_ini_partition(ini, stop_date='20260410')
-    assert 'PARTITION(dt=20260409)' in ddl
+    ddl = parse_ini_partition(ini, start_date='20260401')
+    assert 'PARTITION(dt=20260401)' in ddl
 
 
 @patch('dw_base.datax.partition.subprocess.run')