raw.raw_usr_traces_his_o 历史埋点表
raw.raw_usr_traces_apd_d 增量埋点表
事件核心字段拍平 + 长尾字段(properties_json / params_json)兜底:
CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_traces_apd_d (
-- 事件核心
event_name STRING COMMENT '事件名',
event_time TIMESTAMP COMMENT '事件发生时间(_source.time 毫秒转)',
flush_time TIMESTAMP COMMENT 'SDK 上报时间(_source.flushTime)',
evt_type STRING COMMENT '事件类型(track / track_signup)',
-- 用户标识
login_id STRING COMMENT '登录用户 ID(_source.loginId)',
anonymous_id STRING COMMENT '匿名设备 ID(_source.anonymousId)',
distinct_id STRING COMMENT 'SDK 唯一标识',
user_id STRING COMMENT '业务用户 ID(properties.userId)',
user_name STRING COMMENT '用户昵称',
user_lvl STRING COMMENT '用户等级',
-- SDK
lib_name STRING COMMENT 'SDK 端类型',
lib_version STRING COMMENT 'SDK 版本',
lib_method STRING COMMENT 'SDK 触发方式',
-- 设备
os STRING COMMENT '操作系统',
os_version STRING COMMENT '系统版本',
device_id STRING COMMENT '设备 ID',
model STRING COMMENT '设备型号',
manufacturer STRING COMMENT '厂商',
network_type STRING COMMENT '网络类型',
screen_width INT COMMENT '屏幕宽',
screen_height INT COMMENT '屏幕高',
-- 应用
app_id STRING COMMENT 'App 包名',
app_name STRING COMMENT 'App 名称',
app_version STRING COMMENT 'App 版本',
wgt_version STRING COMMENT 'wgt 版本',
is_first_day BOOLEAN COMMENT '是否首日',
-- 长尾兜底(已脱敏后版本)
properties_json STRING COMMENT 'properties 整体 JSON(含已拍平字段,便于回查)',
params_json STRING COMMENT 'properties.params JSON(按事件不同 schema)',
-- 技术字段
src_sys STRING COMMENT '源系统标识',
src_tbl STRING COMMENT '源表名(ES index)',
etl_time TIMESTAMP COMMENT 'ETL 处理时间'
)
COMMENT '埋点 raw 层(脱敏后)'
PARTITIONED BY (dt STRING COMMENT 'yyyymmdd')
STORED AS ORC
LOCATION '/user/hive/warehouse/raw.db/raw_usr_traces_apd_d';
DDL 文件落点:manual/ddl/raw/usr/raw_usr_traces_apd_d_create.sql + manual/ddl/raw/usr/raw_usr_traces_his_o_create.sql。raw.raw_usr_traces_his_o schema 同上,仅表名和 LOCATION(/user/hive/warehouse/raw.db/raw_usr_traces_his_o)不同。
与 §8.1 default ORC 一致;与现 testbed
test.raw_usr_traces_apd_d的单列 STRING + TEXTFILE 不同——testbed 是冒烟阶段产物,本方案不复用其 schema。
conf/tracking-mask.ini(入库,与 alerter / workers / datax-tuning 同级别)
格式(按选项 3 不加固,仅声明含敏事件):
# 仅声明含敏事件;未在此文件出现的事件 → 全字段原样入 raw(兜底)
# 配置中已声明事件的字段:
# drop = field1, field2 → 整字段删除(不入 raw)
# mask = field3:method, field4:method → 字段脱敏(method 见 dw_base/datax/mask.py)
# 同一字段同时出现在 drop 和 mask 时,drop 优先
[event:OrderPay]
drop = receiver_address, receiver_phone
mask = receiver_name:mask_middle, order_amount:keep_first_2
[event:Refund]
drop = bank_account
dw_base/tracking/mask.py(参考 dw_base/datax/mask.py 模式,复用其 5 种脱敏函数)。
接口签名:
def load_mask_conf(path: str = 'conf/tracking-mask.ini') -> Dict[str, Dict]:
"""
返回 {event_name: {'drop': [...field], 'mask': {field: method_str}}}
"""
def apply_mask(event_name: str, properties: Dict, conf: Dict) -> Dict:
"""
对单条事件的 properties dict 应用脱敏:
- drop 字段 del 掉
- mask 字段调对应 method 处理
- 未在 conf 的 event_name → properties 原样返回(兜底)
返回脱敏后的 dict(不修改入参)
"""
复用 dw_base/datax/mask.py 的 5 种脱敏函数:md5 / month_trunc / mask_middle / keep_first_n / keep_last_n。
单测:tests/unit/tracking/test_mask.py(覆盖 drop / mask / 未声明事件兜底 / drop+mask 同字段优先级)
dw_base/udf/business/spark_traces_udf.py:包装 apply_mask 为 Spark UDF parse_and_mask(raw_es_hit_json STRING) → STRUCT<...>,输入 ES hit 整行 JSON,输出拍平字段 + properties_json + params_json(已脱敏)。两条入仓路径(历史 / 增量)共用同一份 UDF,差异仅在数据源 SQL。
jobs/raw/usr/raw_usr_traces_his_o.sql:Spark SQL,USING text 读 HDFS gz 临时目录 → 调 §2.3 parse_and_mask UDF → INSERT OVERWRITE 写 raw his_o 分区。
jobs/raw/usr/raw_usr_traces_his_o.py:极薄包装脚本,三步:
hdfs dfs -put 本地 .gz 到 HDFS 临时目录(如 /tmp/raw_usr_traces_his/{dt}/)bin/spark-sql-starter.py -f 跑 SQL继承现脚本 -dt YYYYMMDD 4 种形式(单日 / 20260407- / 区间 / 离散);复用 dw_base.utils.datetime_utils.get_date_range 解析。
CDH 6.3.2 上有过往使用经验,可用。具体安装 / Hive 配置 / jar 路径在落地时补本节。
每日 SQL 创建一个 ES external table 指向当日索引(用完即丢):
CREATE TEMPORARY EXTERNAL TABLE traces_es_${dt} (
raw_doc STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES (
'es.resource' = 'traces-${dt}',
'es.nodes' = '...',
-- 其他配置详见落地时补
);
jobs/raw/usr/raw_usr_traces_apd_d.sql(落地时补完整 SQL)
关键点:
parse_and_mask UDF,字段拍平 + properties_json + params_jsonINSERT OVERWRITE TABLE raw.raw_usr_traces_apd_d PARTITION (dt='${dt}')DolphinScheduler 工作流,每日 T+1 跑一次。失败重跑(INSERT OVERWRITE 幂等)。
manual/ddl/raw/usr/raw_usr_traces_{his_o,apd_d}_create.sqlconf/tracking-mask.ini 初版(含至少 1 个含敏事件示例)dw_base/tracking/mask.py 模块 + tests/unit/tracking/test_mask.py 单测dw_base/udf/business/spark_traces_udf.py(封装 mask 为 Spark UDF)jobs/raw/usr/raw_usr_traces_his_o.sql + 包装脚本 .py(hdfs put + 调 starter + 清 tmp)jobs/raw/usr/raw_usr_traces_apd_d.sql(CREATE TEMP VIEW ES + INSERT OVERWRITE)