# 埋点数据同步实现(开发文档) > 受众:内部 / 接手者 / AI。本文档是实现细节,与 `kb/13-埋点同步-设计.md` 配套。 > 设计取舍、合规背景、协作流程见设计文档,不重述。 ## 1. raw 层 DDL ### 1.1 表名 `raw.raw_usr_traces_apd_d`(域 = usr 含埋点;源表名 = traces;apd 不可变事件;周期 d) ### 1.2 字段清单 事件核心字段拍平 + 长尾字段(properties_json / params_json)兜底: ```sql CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_traces_apd_d ( -- 事件核心 event_name STRING COMMENT '事件名', event_time TIMESTAMP COMMENT '事件发生时间(_source.time 毫秒转)', flush_time TIMESTAMP COMMENT 'SDK 上报时间(_source.flushTime)', evt_type STRING COMMENT '事件类型(track / track_signup)', -- 用户标识 login_id STRING COMMENT '登录用户 ID(_source.loginId)', anonymous_id STRING COMMENT '匿名设备 ID(_source.anonymousId)', distinct_id STRING COMMENT 'SDK 唯一标识', user_id STRING COMMENT '业务用户 ID(properties.userId)', user_name STRING COMMENT '用户昵称', user_lvl STRING COMMENT '用户等级', -- SDK lib_name STRING COMMENT 'SDK 端类型', lib_version STRING COMMENT 'SDK 版本', lib_method STRING COMMENT 'SDK 触发方式', -- 设备 os STRING COMMENT '操作系统', os_version STRING COMMENT '系统版本', device_id STRING COMMENT '设备 ID', model STRING COMMENT '设备型号', manufacturer STRING COMMENT '厂商', network_type STRING COMMENT '网络类型', screen_width INT COMMENT '屏幕宽', screen_height INT COMMENT '屏幕高', -- 应用 app_id STRING COMMENT 'App 包名', app_name STRING COMMENT 'App 名称', app_version STRING COMMENT 'App 版本', wgt_version STRING COMMENT 'wgt 版本', is_first_day BOOLEAN COMMENT '是否首日', -- 长尾兜底(已脱敏后版本) properties_json STRING COMMENT 'properties 整体 JSON(含已拍平字段,便于回查)', params_json STRING COMMENT 'properties.params JSON(按事件不同 schema)', -- 技术字段 src_sys STRING COMMENT '源系统标识', src_tbl STRING COMMENT '源表名(ES index)', etl_time TIMESTAMP COMMENT 'ETL 处理时间' ) COMMENT '埋点 raw 层(脱敏后)' PARTITIONED BY (dt STRING COMMENT 'yyyymmdd') STORED AS ORC LOCATION '/user/hive/warehouse/raw.db/raw_usr_traces_apd_d'; ``` DDL 文件落点:`manual/ddl/raw/usr/raw_usr_traces_apd_d_create.sql` > **与 §8.1 default ORC 一致;与现 testbed `test.raw_usr_traces_apd_d` 的单列 STRING + TEXTFILE 不同**——testbed 是冒烟阶段产物,本方案不复用其 schema。 ## 2. 脱敏配置 ### 2.1 配置文件 `conf/tracking-mask.ini`(入库,与 alerter / workers / datax-tuning 同级别) 格式(按选项 3 不加固,仅声明含敏事件): ```ini # 仅声明含敏事件;未在此文件出现的事件 → 全字段原样入 raw(兜底) # 配置中已声明事件的字段: # drop = field1, field2 → 整字段删除(不入 raw) # mask = field3:method, field4:method → 字段脱敏(method 见 dw_base/datax/mask.py) # 同一字段同时出现在 drop 和 mask 时,drop 优先 [event:OrderPay] drop = receiver_address, receiver_phone mask = receiver_name:mask_middle, order_amount:keep_first_2 [event:Refund] drop = bank_account ``` ### 2.2 加载模块 `dw_base/tracking/mask.py`(参考 `dw_base/datax/mask.py` 模式,复用其 5 种脱敏函数)。 接口签名: ```python def load_mask_conf(path: str = 'conf/tracking-mask.ini') -> Dict[str, Dict]: """ 返回 {event_name: {'drop': [...field], 'mask': {field: method_str}}} """ def apply_mask(event_name: str, properties: Dict, conf: Dict) -> Dict: """ 对单条事件的 properties dict 应用脱敏: - drop 字段 del 掉 - mask 字段调对应 method 处理 - 未在 conf 的 event_name → properties 原样返回(兜底) 返回脱敏后的 dict(不修改入参) """ ``` 复用 `dw_base/datax/mask.py` 的 5 种脱敏函数:`md5 / month_trunc / mask_middle / keep_first_n / keep_last_n`。 单测:`tests/unit/tracking/test_mask.py`(覆盖 drop / mask / 未声明事件兜底 / drop+mask 同字段优先级) ## 3. 历史数据入仓 ### 3.1 工具 `jobs/raw/usr/raw_usr_traces_apd_d.py`(基于现 `tests/integration/tracking/raw_usr_traces_apd_d.py` 演进) 变化点: - `HIVE_DB` 从 `test` 改回 `raw` - `HDFS_TBL_DIR` 改为 `/user/hive/warehouse/raw.db/raw_usr_traces_apd_d` - 不再 put 整 .gz 文件到 HDFS(旧方案是单列 STRING + TEXTFILE 直存),改为: 1. 解压 .gz 2. 逐行解析 JSON 3. 提取拍平字段(按 §1.2 schema) 4. 调 `dw_base.tracking.mask.apply_mask` 脱敏 properties 5. 序列化为 ORC(PySpark / pyorc) 6. put HDFS → ALTER TABLE ADD PARTITION ### 3.2 CLI 继承现脚本 `-dt YYYYMMDD` 4 种形式(单日 / `20260407-` / 区间 / 离散);复用 `dw_base.utils.datetime_utils.get_date_range` 解析。 ### 3.3 现 testbed 数据 按设计文档 §7.2,本轮**暂不处理** `test.raw_usr_traces_apd_d`。后续视情况 drop 重做。 ## 4. 增量数据入仓 ### 4.1 ES Storage Handler CDH 6.3.2 上有过往使用经验,可用。具体安装 / Hive 配置 / jar 路径在落地时补本节。 ### 4.2 ES 映射表 每日 SQL 创建一个 ES external table 指向当日索引(用完即丢): ```sql CREATE TEMPORARY EXTERNAL TABLE traces_es_${dt} ( raw_doc STRING ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES ( 'es.resource' = 'traces-${dt}', 'es.nodes' = '...', -- 其他配置详见落地时补 ); ``` ### 4.3 每日 SQL `jobs/raw/usr/raw_usr_traces_apd_d.sql`(落地时补完整 SQL) 关键点: 1. SELECT 时按 event_name 分支调用脱敏 UDF(封装 `dw_base.tracking.mask.apply_mask`) 2. 字段拍平 + properties_json + params_json 3. `INSERT OVERWRITE TABLE raw.raw_usr_traces_apd_d PARTITION (dt='${dt}')` ### 4.4 调度 DolphinScheduler 工作流,每日 T+1 跑一次。失败重跑(INSERT OVERWRITE 幂等)。 ## 5. 协作流程的实现 ### 5.1 新事件审计 SQL 每日 raw 跑完后执行(实现待落地): ```sql -- 当日 raw 出现的 event_name diff 配置已知集合 -- 配置已知集合 = conf/tracking-mask.ini 里所有 [event:XXX] section 名 -- 输出:(new_event_names, sample_count) → 推 alerter ``` ### 5.2 alerter 推送 复用 `conf/alerter.ini`,新增事件推到数仓内部群(具体 channel key 落地时定)。 ## 6. 落地 checklist - [ ] 写 `manual/ddl/raw/usr/raw_usr_traces_apd_d_create.sql` - [ ] 写 `conf/tracking-mask.ini` 初版(含至少 1 个含敏事件示例) - [ ] 写 `dw_base/tracking/mask.py` 模块 + `tests/unit/tracking/test_mask.py` 单测 - [ ] 改造 `tests/integration/tracking/raw_usr_traces_apd_d.py` → `jobs/raw/usr/raw_usr_traces_apd_d.py`(DB / 路径 / 脱敏逻辑) - [ ] 验证 ES Storage Handler 在新 CDH 集群可用 + 写映射 SQL 模板 - [ ] 写增量 SQL `jobs/raw/usr/raw_usr_traces_apd_d.sql` - [ ] 历史 4 个 dt 文件按新方案重跑(不动现 testbed,写到正式 raw 表) - [ ] 上 DS 调度 - [ ] 写新事件审计 SQL + alerter 集成 ## 7. 与现 testbed 的关系 现 `test.raw_usr_traces_apd_d`(单列 raw_json STRING + TEXTFILE)属冒烟测试遗留,**不是本方案 raw 表**。本方案上线后两种处置选择: - 选择 1:drop 现表 + 历史 4 个 dt 重做按新方案入正式 raw 表 - 选择 2:现 test 表保留作历史快照,新方案 raw 表重新启动(推荐) 按设计文档 §7.2 暂不处理,留待落地时单独评估。