14-埋点同步-开发.md 7.9 KB

埋点数据同步实现(开发文档)

受众:内部 / 接手者 / AI。本文档是实现细节,与 kb/13-埋点同步-设计.md 配套。 设计取舍、合规背景、协作流程见设计文档,不重述。

1. raw 层 DDL

1.1 表名

raw.raw_usr_traces_apd_d(域 = usr 含埋点;源表名 = traces;apd 不可变事件;周期 d)

1.2 字段清单

事件核心字段拍平 + 长尾字段(properties_json / params_json)兜底:

CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_traces_apd_d (
    -- 事件核心
    event_name      STRING    COMMENT '事件名',
    event_time      TIMESTAMP COMMENT '事件发生时间(_source.time 毫秒转)',
    flush_time      TIMESTAMP COMMENT 'SDK 上报时间(_source.flushTime)',
    evt_type        STRING    COMMENT '事件类型(track / track_signup)',
    -- 用户标识
    login_id        STRING    COMMENT '登录用户 ID(_source.loginId)',
    anonymous_id    STRING    COMMENT '匿名设备 ID(_source.anonymousId)',
    distinct_id     STRING    COMMENT 'SDK 唯一标识',
    user_id         STRING    COMMENT '业务用户 ID(properties.userId)',
    user_name       STRING    COMMENT '用户昵称',
    user_lvl        STRING    COMMENT '用户等级',
    -- SDK
    lib_name        STRING    COMMENT 'SDK 端类型',
    lib_version     STRING    COMMENT 'SDK 版本',
    lib_method      STRING    COMMENT 'SDK 触发方式',
    -- 设备
    os              STRING    COMMENT '操作系统',
    os_version      STRING    COMMENT '系统版本',
    device_id       STRING    COMMENT '设备 ID',
    model           STRING    COMMENT '设备型号',
    manufacturer    STRING    COMMENT '厂商',
    network_type    STRING    COMMENT '网络类型',
    screen_width    INT       COMMENT '屏幕宽',
    screen_height   INT       COMMENT '屏幕高',
    -- 应用
    app_id          STRING    COMMENT 'App 包名',
    app_name        STRING    COMMENT 'App 名称',
    app_version     STRING    COMMENT 'App 版本',
    wgt_version     STRING    COMMENT 'wgt 版本',
    is_first_day    BOOLEAN   COMMENT '是否首日',
    -- 长尾兜底(已脱敏后版本)
    properties_json STRING    COMMENT 'properties 整体 JSON(含已拍平字段,便于回查)',
    params_json     STRING    COMMENT 'properties.params JSON(按事件不同 schema)',
    -- 技术字段
    src_sys         STRING    COMMENT '源系统标识',
    src_tbl         STRING    COMMENT '源表名(ES index)',
    etl_time        TIMESTAMP COMMENT 'ETL 处理时间'
)
COMMENT '埋点 raw 层(脱敏后)'
PARTITIONED BY (dt STRING COMMENT 'yyyymmdd')
STORED AS ORC
LOCATION '/user/hive/warehouse/raw.db/raw_usr_traces_apd_d';

DDL 文件落点:manual/ddl/raw/usr/raw_usr_traces_apd_d_create.sql

与 §8.1 default ORC 一致;与现 testbed test.raw_usr_traces_apd_d 的单列 STRING + TEXTFILE 不同——testbed 是冒烟阶段产物,本方案不复用其 schema。

2. 脱敏配置

2.1 配置文件

conf/tracking-mask.ini(入库,与 alerter / workers / datax-tuning 同级别)

格式(按选项 3 不加固,仅声明含敏事件):

# 仅声明含敏事件;未在此文件出现的事件 → 全字段原样入 raw(兜底)
# 配置中已声明事件的字段:
#   drop = field1, field2  → 整字段删除(不入 raw)
#   mask = field3:method, field4:method → 字段脱敏(method 见 dw_base/datax/mask.py)
# 同一字段同时出现在 drop 和 mask 时,drop 优先

[event:OrderPay]
drop = receiver_address, receiver_phone
mask = receiver_name:mask_middle, order_amount:keep_first_2

[event:Refund]
drop = bank_account

2.2 加载模块

dw_base/tracking/mask.py(参考 dw_base/datax/mask.py 模式,复用其 5 种脱敏函数)。

接口签名:

def load_mask_conf(path: str = 'conf/tracking-mask.ini') -> Dict[str, Dict]:
    """
    返回 {event_name: {'drop': [...field], 'mask': {field: method_str}}}
    """

def apply_mask(event_name: str, properties: Dict, conf: Dict) -> Dict:
    """
    对单条事件的 properties dict 应用脱敏:
    - drop 字段 del 掉
    - mask 字段调对应 method 处理
    - 未在 conf 的 event_name → properties 原样返回(兜底)
    返回脱敏后的 dict(不修改入参)
    """

复用 dw_base/datax/mask.py 的 5 种脱敏函数:md5 / month_trunc / mask_middle / keep_first_n / keep_last_n

单测:tests/unit/tracking/test_mask.py(覆盖 drop / mask / 未声明事件兜底 / drop+mask 同字段优先级)

3. 历史数据入仓

3.1 工具

jobs/raw/usr/raw_usr_traces_apd_d.py(基于现 tests/integration/tracking/raw_usr_traces_apd_d.py 演进)

变化点:

  • HIVE_DBtest 改回 raw
  • HDFS_TBL_DIR 改为 /user/hive/warehouse/raw.db/raw_usr_traces_apd_d
  • 不再 put 整 .gz 文件到 HDFS(旧方案是单列 STRING + TEXTFILE 直存),改为:
    1. 解压 .gz
    2. 逐行解析 JSON
    3. 提取拍平字段(按 §1.2 schema)
    4. dw_base.tracking.mask.apply_mask 脱敏 properties
    5. 序列化为 ORC(PySpark / pyorc)
    6. put HDFS → ALTER TABLE ADD PARTITION

3.2 CLI

继承现脚本 -dt YYYYMMDD 4 种形式(单日 / 20260407- / 区间 / 离散);复用 dw_base.utils.datetime_utils.get_date_range 解析。

3.3 现 testbed 数据

按设计文档 §7.2,本轮暂不处理 test.raw_usr_traces_apd_d。后续视情况 drop 重做。

4. 增量数据入仓

4.1 ES Storage Handler

CDH 6.3.2 上有过往使用经验,可用。具体安装 / Hive 配置 / jar 路径在落地时补本节。

4.2 ES 映射表

每日 SQL 创建一个 ES external table 指向当日索引(用完即丢):

CREATE TEMPORARY EXTERNAL TABLE traces_es_${dt} (
    raw_doc STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES (
    'es.resource' = 'traces-${dt}',
    'es.nodes'    = '...',
    -- 其他配置详见落地时补
);

4.3 每日 SQL

jobs/raw/usr/raw_usr_traces_apd_d.sql(落地时补完整 SQL)

关键点:

  1. SELECT 时按 event_name 分支调用脱敏 UDF(封装 dw_base.tracking.mask.apply_mask
  2. 字段拍平 + properties_json + params_json
  3. INSERT OVERWRITE TABLE raw.raw_usr_traces_apd_d PARTITION (dt='${dt}')

4.4 调度

DolphinScheduler 工作流,每日 T+1 跑一次。失败重跑(INSERT OVERWRITE 幂等)。

5. 协作流程的实现

5.1 新事件审计 SQL

每日 raw 跑完后执行(实现待落地):

-- 当日 raw 出现的 event_name diff 配置已知集合
-- 配置已知集合 = conf/tracking-mask.ini 里所有 [event:XXX] section 名
-- 输出:(new_event_names, sample_count) → 推 alerter

5.2 alerter 推送

复用 conf/alerter.ini,新增事件推到数仓内部群(具体 channel key 落地时定)。

6. 落地 checklist

  • manual/ddl/raw/usr/raw_usr_traces_apd_d_create.sql
  • conf/tracking-mask.ini 初版(含至少 1 个含敏事件示例)
  • dw_base/tracking/mask.py 模块 + tests/unit/tracking/test_mask.py 单测
  • 改造 tests/integration/tracking/raw_usr_traces_apd_d.pyjobs/raw/usr/raw_usr_traces_apd_d.py(DB / 路径 / 脱敏逻辑)
  • 验证 ES Storage Handler 在新 CDH 集群可用 + 写映射 SQL 模板
  • 写增量 SQL jobs/raw/usr/raw_usr_traces_apd_d.sql
  • 历史 4 个 dt 文件按新方案重跑(不动现 testbed,写到正式 raw 表)
  • 上 DS 调度
  • 写新事件审计 SQL + alerter 集成

7. 与现 testbed 的关系

test.raw_usr_traces_apd_d(单列 raw_json STRING + TEXTFILE)属冒烟测试遗留,不是本方案 raw 表。本方案上线后两种处置选择:

  • 选择 1:drop 现表 + 历史 4 个 dt 重做按新方案入正式 raw 表
  • 选择 2:现 test 表保留作历史快照,新方案 raw 表重新启动(推荐)

按设计文档 §7.2 暂不处理,留待落地时单独评估。