# -*- coding:utf-8 -*- """ 埋点入仓脱敏 Spark UDF。 注册为 SQL UDF `mask_source(line) -> STRING`:输入单条 ES hit(NDJSON 行), 解析 `_source`、按事件脱敏 `properties`(含嵌套 `params`),回吐脱敏后的 `_source` JSON 字符串。 es_id / event_name 不敏感,在 SQL 侧用 get_json_object 原生取,不经本 UDF。 注册机制:spark-sql-starter `-u` 加载本文件,普通函数自动注册为 StringType 返回的 SQL UDF (@udf 装饰的 struct UDF 不会被注册,故返回 String 而非 struct)。 脱敏配置 conf/tracking-mask.ini 经 SQL `ADD FILE` 分发到 executor,首次调用经 SparkFiles 懒加载。 """ import json from pyspark import SparkFiles from dw_base.tracking import mask _CONF = None _CONF_NAME = 'tracking-mask.ini' def _conf(): """executor 端懒加载脱敏配置(driver 注册时配置尚未分发,不能 module-level 加载)。""" global _CONF if _CONF is None: _CONF = mask.load_mask_conf(SparkFiles.get(_CONF_NAME)) return _CONF def mask_source(line): """ES hit NDJSON 行 → 脱敏后的 _source JSON 字符串;解析失败返回 None。""" if not line: return None try: doc = json.loads(line) except (TypeError, ValueError): return None source = doc.get('_source') if not isinstance(source, dict): return None props = source.get('properties') if isinstance(props, dict): source['properties'] = mask.apply_mask(source.get('event'), props, _conf()) return json.dumps(source, ensure_ascii=False)