Browse Source

docs(kb): 14 历史/增量入仓改 PySpark+UDF 共用方案 + 13 微调

- §1.2 加 his_o 表 schema 同 apd_d 注脚(仅 LOCATION 不同)
- §2.3 新增 Spark UDF 小节:dw_base/udf/business/spark_traces_udf.py
  封装 apply_mask,两路径共用
- §3.1 历史改 SQL(USING text)+ 极薄包装脚本(hdfs put → starter → 清 tmp),
  替换原单机 Python 6 步流程
- §4.3 每日 SQL 关键点指向 §2.3 UDF
- §5 checklist 加 UDF 一项 + 同步 his_o/apd_d 路径
- kb/13 设计文档同步 user 已有修订(两表 / 简化)
tianyu.chu 1 week ago
parent
commit
3d85e2cd17
2 changed files with 35 additions and 109 deletions
  1. 12 60
      kb/13-埋点同步-设计.md
  2. 23 49
      kb/14-埋点同步-开发.md

+ 12 - 60
kb/13-埋点同步-设计.md

@@ -1,12 +1,9 @@
 # 埋点数据同步方案(设计文档)
 
-> 受众:产品 / CTO / 协作方。本文档说明埋点数据从 ES 进入数仓的整体方案、合规取舍、协作约束。
-> 实现细节见 `kb/14-埋点同步-开发.md`。
-
 ## 1. 背景与需求
 
 - **数据源**:埋点系统后端 = Elasticsearch(生产单环境,无 dev / test)
-- **量级**:约 1.4M event / 天;事件类型 50+ 长尾
+- **量级**:约 1.4M event / 天;gz文件大小约200M; 事件类型 50+ 长尾
 - **存量形式**:按天 NDJSON.gz 文件(ES `_search` 导出,每行一个 ES hit 文档)
 - **增量形式**:实时写入 ES `traces-{YYYY-MM-DD}` 滚动索引
 - **业务目的**:用户行为分析(留存、漏斗、转化、活跃度等)
@@ -16,10 +13,7 @@
 - **公司原则**:敏感数据不出业务库 / 不入数仓
 - **涉敏举例**:
   - 下单事件:收货地址、收件人姓名、收件人手机号
-  - 退款事件:银行卡号
   - 其他事件按业务实际声明
-- **合规优先级**:合规 > 数据完整性 > 开发便利
-- **合规对"入仓"的定义**:物理存储层面的存在,不是逻辑层面的可访问性。一旦敏感字段落地到 HDFS / Hive 表,即使下游不查询也算违规
 
 ## 3. 整体方案
 
@@ -46,23 +40,21 @@
            └──────────────────┘
 ```
 
-两条路径**最终落到同一张 raw 表**,schema 一致;脱敏规则共用同一份配置。
+两条路径**最终落到张 raw 表**,schema 一致;脱敏规则共用同一份配置。
 
 ## 4. 设计取舍:raw 层默认范式破例
 
-数仓默认 raw 层走 schema-on-read landing 范式(数据原样落、ods 阶段才做类型转换 / 脏数据识别,详见 `kb/20-数仓分层与建模.md §8.1`)。本场景**主动破例**:
+数仓默认 raw 层走 schema-on-read landing 范式(数据原样落、ods 阶段才做类型转换 / 脏数据识别)。本场景**主动破例**:
 
 | §8.1 理由 | 是否适用 | 说明 |
 |---|---|---|
-| 1. 隔离源端类型变化 | ✗ | 埋点 schema 长尾,新事件 / 新字段频繁,本就要求协作通知 |
+| 1. 隔离源端类型变化 | ✗ | 埋点 schema 长尾,新事件 / 新字段频繁,要求协作通知 |
 | 2. 同步阶段不可失败 | △ | 部分适用,需对解析失败做容错处理 |
-| 3. 保留原始精度与原文 | ✗ | 合规要求就是要丢字段,与"保原文"目标冲突 |
+| 3. 保留原始精度与原文 | ✗ | 合规要求就是要丢字段 |
 | 4. 脏数据可观测 | △ | 事件本身可观测,脱敏后字段不可回溯(合规接受这代价) |
 | 5. schema-on-read 范式契合 | ✗ | 合规要求物理拦截敏感字段,不能等 ods 读取阶段才处理 |
 
-3 条不适用 ≥ §8.1 阈值,破例成立。
-
-**破例代价(明示)**:
+**破例代价**:
 
 - 入仓阶段需做 JSON 解析 + 字段脱敏 + 配置化逻辑
 - 埋点上下游 schema 变更需协作通知(违反"隔离源端"原则)
@@ -78,70 +70,30 @@
 | `mask` | 字段保留但用脱敏函数处理 | 中敏字段(脱敏后的姓名、订单号末四位等) |
 | `reject` | 整事件不入 raw | 极敏感事件(如有) |
 
-脱敏函数复用现有的 5 种:`md5 / month_trunc / mask_middle / keep_first_n / keep_last_n`(实现见开发文档)
+脱敏函数复用现有的 5 种:`md5 / month_trunc / mask_middle / keep_first_n / keep_last_n`。
 
 ### 5.2 配置驱动
 
-所有脱敏规则集中在一份配置文件(`conf/tracking-mask.ini`),格式与加载方式见开发文档
+所有脱敏规则集中在一份配置文件(`conf/tracking-mask.ini`)。
 
-### 5.3 兜底策略:选项 3 不加固
+### 5.3 兜底策略:
 
 - **未在配置中的事件 / 字段**:默认允许,全部原样入 raw
 - **配置中显式声明 drop / mask 的字段**:按规则处理
 - **风险**:新含敏事件没及时进配置 → 原文入仓 → 违规
-- **风险缓解**:靠协作流程(§6)+ 审计机制(§6.2)
-
-> **候选选项(已否决)**:
->
-> - **选项 1(事件级白名单)**:任何新事件不在 conf 都不入仓——卡业务上线节奏,否决
-> - **选项 2(字段级白名单仅对已声明事件)**:含敏事件首次进 conf 时所有字段必须登记,后续上游加字段自动 drop——多一道保险但仍有"全新含敏事件没及时进 conf"风险,与选项 3 漏的根本场景相同,多余的复杂度不抵收益,否决
+- **风险缓解**:靠协作流程
 
 ## 6. 协作流程
 
-### 6.1 三种新事件场景
-
 | 场景 | 处理 |
 |---|---|
 | 新事件 + 无敏感字段 | 默认入仓,无需通知 |
-| 新事件 + 有敏感字段 | 强制走流程:埋点开发方 PR 注明 → 数仓更新配置 |
+| 新事件 + 有敏感字段 | 强制走流程:埋点开发方注明 → 数仓更新配置 |
 | 已有事件加敏感字段 | 同上 |
 
-### 6.2 审计机制
-
-每日 ods 跑前对比当日 ES 出现的 event_name 集合 vs 配置已知集合,新增事件列表推 alerter(企微)。
-
-数仓收到通知后人工判断是否含敏:
-- 含敏 → 立即更新 conf
-- 不含敏 → 无需操作(继续兜底)
-
-### 6.3 应急响应
-
-发现敏感字段误入仓时:
-1. 立即更新 conf 加 drop / mask 规则
-2. 历史数据:对受影响 dt 分区跑回填,覆盖原数据
-3. 复盘:分析为何协作流程失守
-
-## 7. 风险与决策点
-
-### 7.1 已识别风险
-
-- **R1 协作流程失守**:新含敏事件没及时进 conf → 违规
-  - 缓解:审计机制(§6.2)+ 应急响应流程(§6.3)
-- **R2 ES Storage Handler 故障**:增量路径中断
-  - 缓解:DS 调度告警 + 重跑(INSERT OVERWRITE 幂等)
-- **R3 历史数据脱敏不彻底**:早期入仓的非脱敏数据残留
-  - 缓解:现 testbed 数据按本方案重跑(落地范围决策见 §7.2)
-
-### 7.2 已锁定决策
-
-- **加固方案**:选项 3 不加固
-- **raw 表组织**:单 wide schema 表(不按事件分多张)
-- **现 testbed 处置**:暂不处理(drop 重做或换名共存留待后续单独评估)
-
-## 8. 落地范围
+## 7. 落地范围
 
 本方案仅覆盖 raw 层入仓 + 脱敏。后续阶段(不在本方案):
 
 - ods 层 schema 设计(按事件分流解析 params_json)
 - dwd / dws / ads 层建模
-- 实时埋点路径(如需要)

+ 23 - 49
kb/14-埋点同步-开发.md

@@ -1,13 +1,12 @@
 # 埋点数据同步实现(开发文档)
 
-> 受众:内部 / 接手者 / AI。本文档是实现细节,与 `kb/13-埋点同步-设计.md` 配套。
-> 设计取舍、合规背景、协作流程见设计文档,不重述。
-
 ## 1. raw 层 DDL
 
 ### 1.1 表名
 
-`raw.raw_usr_traces_apd_d`(域 = usr 含埋点;源表名 = traces;apd 不可变事件;周期 d)
+`raw.raw_usr_traces_his_o` 历史埋点表
+
+`raw.raw_usr_traces_apd_d ` 增量埋点表
 
 ### 1.2 字段清单
 
@@ -60,7 +59,7 @@ STORED AS ORC
 LOCATION '/user/hive/warehouse/raw.db/raw_usr_traces_apd_d';
 ```
 
-DDL 文件落点:`manual/ddl/raw/usr/raw_usr_traces_apd_d_create.sql`
+DDL 文件落点:`manual/ddl/raw/usr/raw_usr_traces_apd_d_create.sql` + `manual/ddl/raw/usr/raw_usr_traces_his_o_create.sql`。`raw.raw_usr_traces_his_o` schema 同上,仅表名和 LOCATION(`/user/hive/warehouse/raw.db/raw_usr_traces_his_o`)不同。
 
 > **与 §8.1 default ORC 一致;与现 testbed `test.raw_usr_traces_apd_d` 的单列 STRING + TEXTFILE 不同**——testbed 是冒烟阶段产物,本方案不复用其 schema。
 
@@ -113,30 +112,28 @@ def apply_mask(event_name: str, properties: Dict, conf: Dict) -> Dict:
 
 单测:`tests/unit/tracking/test_mask.py`(覆盖 drop / mask / 未声明事件兜底 / drop+mask 同字段优先级)
 
+### 2.3 Spark UDF
+
+`dw_base/udf/business/spark_traces_udf.py`:包装 `apply_mask` 为 Spark UDF `parse_and_mask(raw_es_hit_json STRING) → STRUCT<...>`,输入 ES hit 整行 JSON,输出拍平字段 + properties_json + params_json(已脱敏)。两条入仓路径(历史 / 增量)共用同一份 UDF,差异仅在数据源 SQL。
+
 ## 3. 历史数据入仓
 
 ### 3.1 工具
 
-`jobs/raw/usr/raw_usr_traces_apd_d.py`(基于现 `tests/integration/tracking/raw_usr_traces_apd_d.py` 演进)
+`jobs/raw/usr/raw_usr_traces_his_o.sql`:Spark SQL,`USING text` 读 HDFS gz 临时目录 → 调 §2.3 `parse_and_mask` UDF → INSERT OVERWRITE 写 raw `his_o` 分区。
 
-变化点:
-- `HIVE_DB` 从 `test` 改回 `raw`
-- `HDFS_TBL_DIR` 改为 `/user/hive/warehouse/raw.db/raw_usr_traces_apd_d`
-- 不再 put 整 .gz 文件到 HDFS(旧方案是单列 STRING + TEXTFILE 直存),改为:
-  1. 解压 .gz
-  2. 逐行解析 JSON
-  3. 提取拍平字段(按 §1.2 schema)
-  4. 调 `dw_base.tracking.mask.apply_mask` 脱敏 properties
-  5. 序列化为 ORC(PySpark / pyorc)
-  6. put HDFS → ALTER TABLE ADD PARTITION
+`jobs/raw/usr/raw_usr_traces_his_o.py`:极薄包装脚本,三步:
+1. `hdfs dfs -put` 本地 .gz 到 HDFS 临时目录(如 `/tmp/raw_usr_traces_his/{dt}/`)
+2. 调 `bin/spark-sql-starter.py -f` 跑 SQL
+3. 清 HDFS 临时目录
+
+> 不走 Python 单机解析:1.4M event/天单机不利用集群算力;改 PySpark + UDF 后,与增量路径共用 §2.3 UDF,逻辑统一。
 
 ### 3.2 CLI
 
 继承现脚本 `-dt YYYYMMDD` 4 种形式(单日 / `20260407-` / 区间 / 离散);复用 `dw_base.utils.datetime_utils.get_date_range` 解析。
 
-### 3.3 现 testbed 数据
 
-按设计文档 §7.2,本轮**暂不处理** `test.raw_usr_traces_apd_d`。后续视情况 drop 重做。
 
 ## 4. 增量数据入仓
 
@@ -165,47 +162,24 @@ TBLPROPERTIES (
 `jobs/raw/usr/raw_usr_traces_apd_d.sql`(落地时补完整 SQL)
 
 关键点:
-1. SELECT 时按 event_name 分支调用脱敏 UDF(封装 `dw_base.tracking.mask.apply_mask`
-2. 字段拍平 + properties_json + params_json
+1. CREATE TEMP VIEW 映射当日 ES 索引(见 §4.2
+2. SELECT 调 §2.3 `parse_and_mask` UDF,字段拍平 + properties_json + params_json
 3. `INSERT OVERWRITE TABLE raw.raw_usr_traces_apd_d PARTITION (dt='${dt}')`
 
 ### 4.4 调度
 
 DolphinScheduler 工作流,每日 T+1 跑一次。失败重跑(INSERT OVERWRITE 幂等)。
 
-## 5. 协作流程的实现
-
-### 5.1 新事件审计 SQL
-
-每日 raw 跑完后执行(实现待落地):
 
-```sql
--- 当日 raw 出现的 event_name diff 配置已知集合
--- 配置已知集合 = conf/tracking-mask.ini 里所有 [event:XXX] section 名
--- 输出:(new_event_names, sample_count) → 推 alerter
-```
-
-### 5.2 alerter 推送
-
-复用 `conf/alerter.ini`,新增事件推到数仓内部群(具体 channel key 落地时定)。
 
-## 6. 落地 checklist
+## 5. 落地 checklist
 
-- [ ] 写 `manual/ddl/raw/usr/raw_usr_traces_apd_d_create.sql`
+- [ ] 写 `manual/ddl/raw/usr/raw_usr_traces_{his_o,apd_d}_create.sql`
 - [ ] 写 `conf/tracking-mask.ini` 初版(含至少 1 个含敏事件示例)
 - [ ] 写 `dw_base/tracking/mask.py` 模块 + `tests/unit/tracking/test_mask.py` 单测
-- [ ] 改造 `tests/integration/tracking/raw_usr_traces_apd_d.py` → `jobs/raw/usr/raw_usr_traces_apd_d.py`(DB / 路径 / 脱敏逻辑
-- [ ] 验证 ES Storage Handler 在新 CDH 集群可用 + 写映射 SQL 模板
-- [ ] 写增量 SQL `jobs/raw/usr/raw_usr_traces_apd_d.sql`
-- [ ] 历史 4 个 dt 文件按新方案重跑(不动现 testbed,写到正式 raw 表)
+- [ ] 写 `dw_base/udf/business/spark_traces_udf.py`(封装 mask 为 Spark UDF)
+- [ ] 写历史 SQL `jobs/raw/usr/raw_usr_traces_his_o.sql` + 包装脚本 `.py`(hdfs put + 调 starter + 清 tmp)
+- [ ] 写增量 SQL `jobs/raw/usr/raw_usr_traces_apd_d.sql`(CREATE TEMP VIEW ES + INSERT OVERWRITE)
+- [ ] 历史 4 个 dt 文件按新方案重跑
 - [ ] 上 DS 调度
-- [ ] 写新事件审计 SQL + alerter 集成
-
-## 7. 与现 testbed 的关系
-
-现 `test.raw_usr_traces_apd_d`(单列 raw_json STRING + TEXTFILE)属冒烟测试遗留,**不是本方案 raw 表**。本方案上线后两种处置选择:
-
-- 选择 1:drop 现表 + 历史 4 个 dt 重做按新方案入正式 raw 表
-- 选择 2:现 test 表保留作历史快照,新方案 raw 表重新启动(推荐)
 
-按设计文档 §7.2 暂不处理,留待落地时单独评估。