| 123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- # -*- coding:utf-8 -*-
- """
- 埋点入仓脱敏 Spark UDF。
- 注册为 SQL UDF `mask_source(line) -> STRING`:输入单条 ES hit(NDJSON 行),
- 解析 `_source`、按事件脱敏 `properties`(含嵌套 `params`),回吐脱敏后的 `_source` JSON 字符串。
- es_id / event_name 不敏感,在 SQL 侧用 get_json_object 原生取,不经本 UDF。
- 注册机制:spark-sql-starter `-u` 加载本文件,普通函数自动注册为 StringType 返回的 SQL UDF
- (@udf 装饰的 struct UDF 不会被注册,故返回 String 而非 struct)。
- 脱敏配置 conf/tracking-mask.ini 经 SQL `ADD FILE` 分发到 executor,首次调用经 SparkFiles 懒加载。
- """
- import json
- from pyspark import SparkFiles
- from dw_base.tracking import mask
- _CONF = None
- _CONF_NAME = 'tracking-mask.ini'
- def _conf():
- """executor 端懒加载脱敏配置(driver 注册时配置尚未分发,不能 module-level 加载)。"""
- global _CONF
- if _CONF is None:
- _CONF = mask.load_mask_conf(SparkFiles.get(_CONF_NAME))
- return _CONF
- def mask_source(line):
- """ES hit NDJSON 行 → 脱敏后的 _source JSON 字符串;解析失败返回 None。"""
- if not line:
- return None
- try:
- doc = json.loads(line)
- except (TypeError, ValueError):
- return None
- source = doc.get('_source')
- if not isinstance(source, dict):
- return None
- props = source.get('properties')
- if isinstance(props, dict):
- source['properties'] = mask.apply_mask(source.get('event'), props, _conf())
- return json.dumps(source, ensure_ascii=False)
|