# 埋点数据同步实现(开发文档) ## 1. raw 层 DDL ### 1.1 表名 `raw.raw_usr_traces_his_o` 历史埋点表 `raw.raw_usr_traces_apd_d ` 增量埋点表 ### 1.2 字段清单 事件核心字段拍平 + 长尾字段(properties_json / params_json)兜底: ```sql CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_traces_apd_d ( -- 事件核心 event_name STRING COMMENT '事件名', event_time TIMESTAMP COMMENT '事件发生时间(_source.time 毫秒转)', flush_time TIMESTAMP COMMENT 'SDK 上报时间(_source.flushTime)', evt_type STRING COMMENT '事件类型(track / track_signup)', -- 用户标识 login_id STRING COMMENT '登录用户 ID(_source.loginId)', anonymous_id STRING COMMENT '匿名设备 ID(_source.anonymousId)', distinct_id STRING COMMENT 'SDK 唯一标识', user_id STRING COMMENT '业务用户 ID(properties.userId)', user_name STRING COMMENT '用户昵称', user_lvl STRING COMMENT '用户等级', -- SDK lib_name STRING COMMENT 'SDK 端类型', lib_version STRING COMMENT 'SDK 版本', lib_method STRING COMMENT 'SDK 触发方式', -- 设备 os STRING COMMENT '操作系统', os_version STRING COMMENT '系统版本', device_id STRING COMMENT '设备 ID', model STRING COMMENT '设备型号', manufacturer STRING COMMENT '厂商', network_type STRING COMMENT '网络类型', screen_width INT COMMENT '屏幕宽', screen_height INT COMMENT '屏幕高', -- 应用 app_id STRING COMMENT 'App 包名', app_name STRING COMMENT 'App 名称', app_version STRING COMMENT 'App 版本', wgt_version STRING COMMENT 'wgt 版本', is_first_day BOOLEAN COMMENT '是否首日', -- 长尾兜底(已脱敏后版本) properties_json STRING COMMENT 'properties 整体 JSON(含已拍平字段,便于回查)', params_json STRING COMMENT 'properties.params JSON(按事件不同 schema)', -- 技术字段 src_sys STRING COMMENT '源系统标识', src_tbl STRING COMMENT '源表名(ES index)', etl_time TIMESTAMP COMMENT 'ETL 处理时间' ) COMMENT '埋点 raw 层(脱敏后)' PARTITIONED BY (dt STRING COMMENT 'yyyymmdd') STORED AS ORC LOCATION '/user/hive/warehouse/raw.db/raw_usr_traces_apd_d'; ``` DDL 文件落点:`manual/ddl/raw/usr/raw_usr_traces_apd_d_create.sql` + `manual/ddl/raw/usr/raw_usr_traces_his_o_create.sql`。`raw.raw_usr_traces_his_o` schema 同上,仅表名和 LOCATION(`/user/hive/warehouse/raw.db/raw_usr_traces_his_o`)不同。 > **与 §8.1 default ORC 一致;与现 testbed `test.raw_usr_traces_apd_d` 的单列 STRING + TEXTFILE 不同**——testbed 是冒烟阶段产物,本方案不复用其 schema。 ## 2. 脱敏配置 ### 2.1 配置文件 `conf/tracking-mask.ini`(入库,与 alerter / workers / datax-tuning 同级别) 格式(按选项 3 不加固,仅声明含敏事件): ```ini # 仅声明含敏事件;未在此文件出现的事件 → 全字段原样入 raw(兜底) # 配置中已声明事件的字段: # drop = field1, field2 → 整字段删除(不入 raw) # mask = field3:method, field4:method → 字段脱敏(method 见 dw_base/datax/mask.py) # 同一字段同时出现在 drop 和 mask 时,drop 优先 [event:OrderPay] drop = receiver_address, receiver_phone mask = receiver_name:mask_middle, order_amount:keep_first_2 [event:Refund] drop = bank_account ``` ### 2.2 加载模块 `dw_base/tracking/mask.py`(参考 `dw_base/datax/mask.py` 模式,复用其 5 种脱敏函数)。 接口签名: ```python def load_mask_conf(path: str = 'conf/tracking-mask.ini') -> Dict[str, Dict]: """ 返回 {event_name: {'drop': [...field], 'mask': {field: method_str}}} """ def apply_mask(event_name: str, properties: Dict, conf: Dict) -> Dict: """ 对单条事件的 properties dict 应用脱敏: - drop 字段 del 掉 - mask 字段调对应 method 处理 - 未在 conf 的 event_name → properties 原样返回(兜底) 返回脱敏后的 dict(不修改入参) """ ``` 复用 `dw_base/datax/mask.py` 的 5 种脱敏函数:`md5 / month_trunc / mask_middle / keep_first_n / keep_last_n`。 单测:`tests/unit/tracking/test_mask.py`(覆盖 drop / mask / 未声明事件兜底 / drop+mask 同字段优先级) ### 2.3 Spark UDF `dw_base/udf/business/spark_traces_udf.py`:包装 `apply_mask` 为 Spark UDF `parse_and_mask(raw_es_hit_json STRING) → STRUCT<...>`,输入 ES hit 整行 JSON,输出拍平字段 + properties_json + params_json(已脱敏)。两条入仓路径(历史 / 增量)共用同一份 UDF,差异仅在数据源 SQL。 ## 3. 历史数据入仓 ### 3.1 工具 `jobs/raw/usr/raw_usr_traces_his_o.sql`:Spark SQL,`USING text` 读 HDFS gz 临时目录 → 调 §2.3 `parse_and_mask` UDF → INSERT OVERWRITE 写 raw `his_o` 分区。 `jobs/raw/usr/raw_usr_traces_his_o.py`:极薄包装脚本,三步: 1. `hdfs dfs -put` 本地 .gz 到 HDFS 临时目录(如 `/tmp/raw_usr_traces_his/{dt}/`) 2. 调 `bin/spark-sql-starter.py -f` 跑 SQL 3. 清 HDFS 临时目录 ### 3.2 CLI 继承现脚本 `-dt YYYYMMDD` 4 种形式(单日 / `20260407-` / 区间 / 离散);复用 `dw_base.utils.datetime_utils.get_date_range` 解析。 ## 4. 增量数据入仓 ### 4.1 ES Storage Handler CDH 6.3.2 上有过往使用经验,可用。具体安装 / Hive 配置 / jar 路径在落地时补本节。 ### 4.2 ES 映射表 每日 SQL 创建一个 ES external table 指向当日索引(用完即丢): ```sql CREATE TEMPORARY EXTERNAL TABLE traces_es_${dt} ( raw_doc STRING ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES ( 'es.resource' = 'traces-${dt}', 'es.nodes' = '...', -- 其他配置详见落地时补 ); ``` ### 4.3 每日 SQL `jobs/raw/usr/raw_usr_traces_apd_d.sql`(落地时补完整 SQL) 关键点: 1. CREATE TEMP VIEW 映射当日 ES 索引(见 §4.2) 2. SELECT 调 §2.3 `parse_and_mask` UDF,字段拍平 + properties_json + params_json 3. `INSERT OVERWRITE TABLE raw.raw_usr_traces_apd_d PARTITION (dt='${dt}')` ### 4.4 调度 DolphinScheduler 工作流,每日 T+1 跑一次。失败重跑(INSERT OVERWRITE 幂等)。 ## 5. 落地 checklist - [ ] 写 `manual/ddl/raw/usr/raw_usr_traces_{his_o,apd_d}_create.sql` - [ ] 写 `conf/tracking-mask.ini` 初版(含至少 1 个含敏事件示例) - [ ] 写 `dw_base/tracking/mask.py` 模块 + `tests/unit/tracking/test_mask.py` 单测 - [ ] 写 `dw_base/udf/business/spark_traces_udf.py`(封装 mask 为 Spark UDF) - [ ] 写历史 SQL `jobs/raw/usr/raw_usr_traces_his_o.sql` + 包装脚本 `.py`(hdfs put + 调 starter + 清 tmp) - [ ] 写增量 SQL `jobs/raw/usr/raw_usr_traces_apd_d.sql`(CREATE TEMP VIEW ES + INSERT OVERWRITE) - [ ] 历史gz文件按新方案重跑 - [ ] 上 DS 调度