Explorar el Código

docs(kb): 加 13/14 埋点同步设计 + 开发文档

- kb/13-埋点同步-设计.md:受众产品/CTO,合规背景、破例论证、
  脱敏策略(选项 3 不加固)、协作流程
- kb/14-埋点同步-开发.md:raw DDL、conf/tracking-mask.ini 格式、
  历史 + 增量入仓实现、落地 checklist
- README 索引补 1x 业务上下文两行
tianyu.chu hace 1 semana
padre
commit
aa8a6bad59
Se han modificado 3 ficheros con 360 adiciones y 0 borrados
  1. 2 0
      README.md
  2. 147 0
      kb/13-埋点同步-设计.md
  3. 211 0
      kb/14-埋点同步-开发.md

+ 2 - 0
README.md

@@ -75,6 +75,8 @@ PG/ES ──DataX(raw)──> RAW ──> ODS ──> DWD ──> DWS ──> TD
 | [10-业务流程](kb/10-业务流程.md) | **业务全景**:用户 + 商家 + 售后全链路流程 |
 | [11-数据资产](kb/11-数据资产.md) | **数据源清单**:业务库 / 埋点 / 爬虫 / 采购 |
 | [12-同步方案](kb/12-同步方案.md) | **同步策略**:PG→Hive 存量 / 增量 / 历史归档 / CDC 阶段演进 |
+| [13-埋点同步-设计](kb/13-埋点同步-设计.md) | **埋点同步设计**:合规约束、整体方案、脱敏策略、协作流程(受众:产品 / CTO) |
+| [14-埋点同步-开发](kb/14-埋点同步-开发.md) | **埋点同步实现**:raw DDL、脱敏配置、历史 / 增量入仓脚本、落地 checklist |
 
 ### 2x 数仓建模
 

+ 147 - 0
kb/13-埋点同步-设计.md

@@ -0,0 +1,147 @@
+# 埋点数据同步方案(设计文档)
+
+> 受众:产品 / CTO / 协作方。本文档说明埋点数据从 ES 进入数仓的整体方案、合规取舍、协作约束。
+> 实现细节见 `kb/14-埋点同步-开发.md`。
+
+## 1. 背景与需求
+
+- **数据源**:埋点系统后端 = Elasticsearch(生产单环境,无 dev / test)
+- **量级**:约 1.4M event / 天;事件类型 50+ 长尾
+- **存量形式**:按天 NDJSON.gz 文件(ES `_search` 导出,每行一个 ES hit 文档)
+- **增量形式**:实时写入 ES `traces-{YYYY-MM-DD}` 滚动索引
+- **业务目的**:用户行为分析(留存、漏斗、转化、活跃度等)
+
+## 2. 合规约束
+
+- **公司原则**:敏感数据不出业务库 / 不入数仓
+- **涉敏举例**:
+  - 下单事件:收货地址、收件人姓名、收件人手机号
+  - 退款事件:银行卡号
+  - 其他事件按业务实际声明
+- **合规优先级**:合规 > 数据完整性 > 开发便利
+- **合规对"入仓"的定义**:物理存储层面的存在,不是逻辑层面的可访问性。一旦敏感字段落地到 HDFS / Hive 表,即使下游不查询也算违规
+
+## 3. 整体方案
+
+```
+                 ┌────────────────┐
+                 │  埋点 ES (prod) │
+                 └────┬───────────┘
+                      │
+         ┌────────────┴─────────────┐
+         │                          │
+    [增量路径]                  [历史路径]
+   每日 ES → Hive          一次性 .json.gz 文件
+   ES Storage Handler            按天解压解析
+   映射表 SELECT                 Python 脚本
+         │                          │
+         └────────────┬─────────────┘
+                      │
+              【入仓阶段脱敏】
+              (按事件类型应用规则)
+                      │
+                      ▼
+           ┌──────────────────┐
+           │   raw 层(数仓)  │
+           └──────────────────┘
+```
+
+两条路径**最终落到同一张 raw 表**,schema 一致;脱敏规则共用同一份配置。
+
+## 4. 设计取舍:raw 层默认范式破例
+
+数仓默认 raw 层走 schema-on-read landing 范式(数据原样落、ods 阶段才做类型转换 / 脏数据识别,详见 `kb/20-数仓分层与建模.md §8.1`)。本场景**主动破例**:
+
+| §8.1 理由 | 是否适用 | 说明 |
+|---|---|---|
+| 1. 隔离源端类型变化 | ✗ | 埋点 schema 长尾,新事件 / 新字段频繁,本就要求协作通知 |
+| 2. 同步阶段不可失败 | △ | 部分适用,需对解析失败做容错处理 |
+| 3. 保留原始精度与原文 | ✗ | 合规要求就是要丢字段,与"保原文"目标冲突 |
+| 4. 脏数据可观测 | △ | 事件本身可观测,脱敏后字段不可回溯(合规接受这代价) |
+| 5. schema-on-read 范式契合 | ✗ | 合规要求物理拦截敏感字段,不能等 ods 读取阶段才处理 |
+
+3 条不适用 ≥ §8.1 阈值,破例成立。
+
+**破例代价(明示)**:
+
+- 入仓阶段需做 JSON 解析 + 字段脱敏 + 配置化逻辑
+- 埋点上下游 schema 变更需协作通知(违反"隔离源端"原则)
+- ods 失去"raw 一定有原始数据可回溯"的兜底保障
+
+## 5. 脱敏策略
+
+### 5.1 三种脱敏动作
+
+| 动作 | 含义 | 适用 |
+|---|---|---|
+| `drop` | 字段直接删除,不入 raw | 高敏字段(地址、手机号、卡号等) |
+| `mask` | 字段保留但用脱敏函数处理 | 中敏字段(脱敏后的姓名、订单号末四位等) |
+| `reject` | 整事件不入 raw | 极敏感事件(如有) |
+
+脱敏函数复用现有的 5 种:`md5 / month_trunc / mask_middle / keep_first_n / keep_last_n`(实现见开发文档)。
+
+### 5.2 配置驱动
+
+所有脱敏规则集中在一份配置文件(`conf/tracking-mask.ini`),格式与加载方式见开发文档。
+
+### 5.3 兜底策略:选项 3 不加固
+
+- **未在配置中的事件 / 字段**:默认允许,全部原样入 raw
+- **配置中显式声明 drop / mask 的字段**:按规则处理
+- **风险**:新含敏事件没及时进配置 → 原文入仓 → 违规
+- **风险缓解**:靠协作流程(§6)+ 审计机制(§6.2)
+
+> **候选选项(已否决)**:
+>
+> - **选项 1(事件级白名单)**:任何新事件不在 conf 都不入仓——卡业务上线节奏,否决
+> - **选项 2(字段级白名单仅对已声明事件)**:含敏事件首次进 conf 时所有字段必须登记,后续上游加字段自动 drop——多一道保险但仍有"全新含敏事件没及时进 conf"风险,与选项 3 漏的根本场景相同,多余的复杂度不抵收益,否决
+
+## 6. 协作流程
+
+### 6.1 三种新事件场景
+
+| 场景 | 处理 |
+|---|---|
+| 新事件 + 无敏感字段 | 默认入仓,无需通知 |
+| 新事件 + 有敏感字段 | 强制走流程:埋点开发方 PR 注明 → 数仓更新配置 |
+| 已有事件加敏感字段 | 同上 |
+
+### 6.2 审计机制
+
+每日 ods 跑前对比当日 ES 出现的 event_name 集合 vs 配置已知集合,新增事件列表推 alerter(企微)。
+
+数仓收到通知后人工判断是否含敏:
+- 含敏 → 立即更新 conf
+- 不含敏 → 无需操作(继续兜底)
+
+### 6.3 应急响应
+
+发现敏感字段误入仓时:
+1. 立即更新 conf 加 drop / mask 规则
+2. 历史数据:对受影响 dt 分区跑回填,覆盖原数据
+3. 复盘:分析为何协作流程失守
+
+## 7. 风险与决策点
+
+### 7.1 已识别风险
+
+- **R1 协作流程失守**:新含敏事件没及时进 conf → 违规
+  - 缓解:审计机制(§6.2)+ 应急响应流程(§6.3)
+- **R2 ES Storage Handler 故障**:增量路径中断
+  - 缓解:DS 调度告警 + 重跑(INSERT OVERWRITE 幂等)
+- **R3 历史数据脱敏不彻底**:早期入仓的非脱敏数据残留
+  - 缓解:现 testbed 数据按本方案重跑(落地范围决策见 §7.2)
+
+### 7.2 已锁定决策
+
+- **加固方案**:选项 3 不加固
+- **raw 表组织**:单 wide schema 表(不按事件分多张)
+- **现 testbed 处置**:暂不处理(drop 重做或换名共存留待后续单独评估)
+
+## 8. 落地范围
+
+本方案仅覆盖 raw 层入仓 + 脱敏。后续阶段(不在本方案):
+
+- ods 层 schema 设计(按事件分流解析 params_json)
+- dwd / dws / ads 层建模
+- 实时埋点路径(如需要)

+ 211 - 0
kb/14-埋点同步-开发.md

@@ -0,0 +1,211 @@
+# 埋点数据同步实现(开发文档)
+
+> 受众:内部 / 接手者 / AI。本文档是实现细节,与 `kb/13-埋点同步-设计.md` 配套。
+> 设计取舍、合规背景、协作流程见设计文档,不重述。
+
+## 1. raw 层 DDL
+
+### 1.1 表名
+
+`raw.raw_usr_traces_apd_d`(域 = usr 含埋点;源表名 = traces;apd 不可变事件;周期 d)
+
+### 1.2 字段清单
+
+事件核心字段拍平 + 长尾字段(properties_json / params_json)兜底:
+
+```sql
+CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_traces_apd_d (
+    -- 事件核心
+    event_name      STRING    COMMENT '事件名',
+    event_time      TIMESTAMP COMMENT '事件发生时间(_source.time 毫秒转)',
+    flush_time      TIMESTAMP COMMENT 'SDK 上报时间(_source.flushTime)',
+    evt_type        STRING    COMMENT '事件类型(track / track_signup)',
+    -- 用户标识
+    login_id        STRING    COMMENT '登录用户 ID(_source.loginId)',
+    anonymous_id    STRING    COMMENT '匿名设备 ID(_source.anonymousId)',
+    distinct_id     STRING    COMMENT 'SDK 唯一标识',
+    user_id         STRING    COMMENT '业务用户 ID(properties.userId)',
+    user_name       STRING    COMMENT '用户昵称',
+    user_lvl        STRING    COMMENT '用户等级',
+    -- SDK
+    lib_name        STRING    COMMENT 'SDK 端类型',
+    lib_version     STRING    COMMENT 'SDK 版本',
+    lib_method      STRING    COMMENT 'SDK 触发方式',
+    -- 设备
+    os              STRING    COMMENT '操作系统',
+    os_version      STRING    COMMENT '系统版本',
+    device_id       STRING    COMMENT '设备 ID',
+    model           STRING    COMMENT '设备型号',
+    manufacturer    STRING    COMMENT '厂商',
+    network_type    STRING    COMMENT '网络类型',
+    screen_width    INT       COMMENT '屏幕宽',
+    screen_height   INT       COMMENT '屏幕高',
+    -- 应用
+    app_id          STRING    COMMENT 'App 包名',
+    app_name        STRING    COMMENT 'App 名称',
+    app_version     STRING    COMMENT 'App 版本',
+    wgt_version     STRING    COMMENT 'wgt 版本',
+    is_first_day    BOOLEAN   COMMENT '是否首日',
+    -- 长尾兜底(已脱敏后版本)
+    properties_json STRING    COMMENT 'properties 整体 JSON(含已拍平字段,便于回查)',
+    params_json     STRING    COMMENT 'properties.params JSON(按事件不同 schema)',
+    -- 技术字段
+    src_sys         STRING    COMMENT '源系统标识',
+    src_tbl         STRING    COMMENT '源表名(ES index)',
+    etl_time        TIMESTAMP COMMENT 'ETL 处理时间'
+)
+COMMENT '埋点 raw 层(脱敏后)'
+PARTITIONED BY (dt STRING COMMENT 'yyyymmdd')
+STORED AS ORC
+LOCATION '/user/hive/warehouse/raw.db/raw_usr_traces_apd_d';
+```
+
+DDL 文件落点:`manual/ddl/raw/usr/raw_usr_traces_apd_d_create.sql`
+
+> **与 §8.1 default ORC 一致;与现 testbed `test.raw_usr_traces_apd_d` 的单列 STRING + TEXTFILE 不同**——testbed 是冒烟阶段产物,本方案不复用其 schema。
+
+## 2. 脱敏配置
+
+### 2.1 配置文件
+
+`conf/tracking-mask.ini`(入库,与 alerter / workers / datax-tuning 同级别)
+
+格式(按选项 3 不加固,仅声明含敏事件):
+
+```ini
+# 仅声明含敏事件;未在此文件出现的事件 → 全字段原样入 raw(兜底)
+# 配置中已声明事件的字段:
+#   drop = field1, field2  → 整字段删除(不入 raw)
+#   mask = field3:method, field4:method → 字段脱敏(method 见 dw_base/datax/mask.py)
+# 同一字段同时出现在 drop 和 mask 时,drop 优先
+
+[event:OrderPay]
+drop = receiver_address, receiver_phone
+mask = receiver_name:mask_middle, order_amount:keep_first_2
+
+[event:Refund]
+drop = bank_account
+```
+
+### 2.2 加载模块
+
+`dw_base/tracking/mask.py`(参考 `dw_base/datax/mask.py` 模式,复用其 5 种脱敏函数)。
+
+接口签名:
+
+```python
+def load_mask_conf(path: str = 'conf/tracking-mask.ini') -> Dict[str, Dict]:
+    """
+    返回 {event_name: {'drop': [...field], 'mask': {field: method_str}}}
+    """
+
+def apply_mask(event_name: str, properties: Dict, conf: Dict) -> Dict:
+    """
+    对单条事件的 properties dict 应用脱敏:
+    - drop 字段 del 掉
+    - mask 字段调对应 method 处理
+    - 未在 conf 的 event_name → properties 原样返回(兜底)
+    返回脱敏后的 dict(不修改入参)
+    """
+```
+
+复用 `dw_base/datax/mask.py` 的 5 种脱敏函数:`md5 / month_trunc / mask_middle / keep_first_n / keep_last_n`。
+
+单测:`tests/unit/tracking/test_mask.py`(覆盖 drop / mask / 未声明事件兜底 / drop+mask 同字段优先级)
+
+## 3. 历史数据入仓
+
+### 3.1 工具
+
+`jobs/raw/usr/raw_usr_traces_apd_d.py`(基于现 `tests/integration/tracking/raw_usr_traces_apd_d.py` 演进)
+
+变化点:
+- `HIVE_DB` 从 `test` 改回 `raw`
+- `HDFS_TBL_DIR` 改为 `/user/hive/warehouse/raw.db/raw_usr_traces_apd_d`
+- 不再 put 整 .gz 文件到 HDFS(旧方案是单列 STRING + TEXTFILE 直存),改为:
+  1. 解压 .gz
+  2. 逐行解析 JSON
+  3. 提取拍平字段(按 §1.2 schema)
+  4. 调 `dw_base.tracking.mask.apply_mask` 脱敏 properties
+  5. 序列化为 ORC(PySpark / pyorc)
+  6. put HDFS → ALTER TABLE ADD PARTITION
+
+### 3.2 CLI
+
+继承现脚本 `-dt YYYYMMDD` 4 种形式(单日 / `20260407-` / 区间 / 离散);复用 `dw_base.utils.datetime_utils.get_date_range` 解析。
+
+### 3.3 现 testbed 数据
+
+按设计文档 §7.2,本轮**暂不处理** `test.raw_usr_traces_apd_d`。后续视情况 drop 重做。
+
+## 4. 增量数据入仓
+
+### 4.1 ES Storage Handler
+
+CDH 6.3.2 上有过往使用经验,可用。具体安装 / Hive 配置 / jar 路径在落地时补本节。
+
+### 4.2 ES 映射表
+
+每日 SQL 创建一个 ES external table 指向当日索引(用完即丢):
+
+```sql
+CREATE TEMPORARY EXTERNAL TABLE traces_es_${dt} (
+    raw_doc STRING
+)
+STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
+TBLPROPERTIES (
+    'es.resource' = 'traces-${dt}',
+    'es.nodes'    = '...',
+    -- 其他配置详见落地时补
+);
+```
+
+### 4.3 每日 SQL
+
+`jobs/raw/usr/raw_usr_traces_apd_d.sql`(落地时补完整 SQL)
+
+关键点:
+1. SELECT 时按 event_name 分支调用脱敏 UDF(封装 `dw_base.tracking.mask.apply_mask`)
+2. 字段拍平 + properties_json + params_json
+3. `INSERT OVERWRITE TABLE raw.raw_usr_traces_apd_d PARTITION (dt='${dt}')`
+
+### 4.4 调度
+
+DolphinScheduler 工作流,每日 T+1 跑一次。失败重跑(INSERT OVERWRITE 幂等)。
+
+## 5. 协作流程的实现
+
+### 5.1 新事件审计 SQL
+
+每日 raw 跑完后执行(实现待落地):
+
+```sql
+-- 当日 raw 出现的 event_name diff 配置已知集合
+-- 配置已知集合 = conf/tracking-mask.ini 里所有 [event:XXX] section 名
+-- 输出:(new_event_names, sample_count) → 推 alerter
+```
+
+### 5.2 alerter 推送
+
+复用 `conf/alerter.ini`,新增事件推到数仓内部群(具体 channel key 落地时定)。
+
+## 6. 落地 checklist
+
+- [ ] 写 `manual/ddl/raw/usr/raw_usr_traces_apd_d_create.sql`
+- [ ] 写 `conf/tracking-mask.ini` 初版(含至少 1 个含敏事件示例)
+- [ ] 写 `dw_base/tracking/mask.py` 模块 + `tests/unit/tracking/test_mask.py` 单测
+- [ ] 改造 `tests/integration/tracking/raw_usr_traces_apd_d.py` → `jobs/raw/usr/raw_usr_traces_apd_d.py`(DB / 路径 / 脱敏逻辑)
+- [ ] 验证 ES Storage Handler 在新 CDH 集群可用 + 写映射 SQL 模板
+- [ ] 写增量 SQL `jobs/raw/usr/raw_usr_traces_apd_d.sql`
+- [ ] 历史 4 个 dt 文件按新方案重跑(不动现 testbed,写到正式 raw 表)
+- [ ] 上 DS 调度
+- [ ] 写新事件审计 SQL + alerter 集成
+
+## 7. 与现 testbed 的关系
+
+现 `test.raw_usr_traces_apd_d`(单列 raw_json STRING + TEXTFILE)属冒烟测试遗留,**不是本方案 raw 表**。本方案上线后两种处置选择:
+
+- 选择 1:drop 现表 + 历史 4 个 dt 重做按新方案入正式 raw 表
+- 选择 2:现 test 表保留作历史快照,新方案 raw 表重新启动(推荐)
+
+按设计文档 §7.2 暂不处理,留待落地时单独评估。