spark_traces_udf.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. # -*- coding:utf-8 -*-
  2. """
  3. 埋点入仓脱敏 Spark UDF。
  4. 注册为 SQL UDF `mask_source(line) -> STRING`:输入单条 ES hit(NDJSON 行),
  5. 解析 `_source`、按事件脱敏 `properties`(含嵌套 `params`),回吐脱敏后的 `_source` JSON 字符串。
  6. es_id / event_name 不敏感,在 SQL 侧用 get_json_object 原生取,不经本 UDF。
  7. 注册机制:spark-sql-starter `-u` 加载本文件,普通函数自动注册为 StringType 返回的 SQL UDF
  8. (@udf 装饰的 struct UDF 不会被注册,故返回 String 而非 struct)。
  9. 脱敏配置 conf/tracking-mask.ini 经 SQL `ADD FILE` 分发到 executor,首次调用经 SparkFiles 懒加载。
  10. """
  11. import json
  12. from pyspark import SparkFiles
  13. from dw_base.tracking import mask
  14. _CONF = None
  15. _CONF_NAME = 'tracking-mask.ini'
  16. def _conf():
  17. """executor 端懒加载脱敏配置(driver 注册时配置尚未分发,不能 module-level 加载)。"""
  18. global _CONF
  19. if _CONF is None:
  20. _CONF = mask.load_mask_conf(SparkFiles.get(_CONF_NAME))
  21. return _CONF
  22. def mask_source(line):
  23. """ES hit NDJSON 行 → 脱敏后的 _source JSON 字符串;解析失败返回 None。"""
  24. if not line:
  25. return None
  26. try:
  27. doc = json.loads(line)
  28. except (TypeError, ValueError):
  29. return None
  30. source = doc.get('_source')
  31. if not isinstance(source, dict):
  32. return None
  33. props = source.get('properties')
  34. if isinstance(props, dict):
  35. source['properties'] = mask.apply_mask(source.get('event'), props, _conf())
  36. return json.dumps(source, ensure_ascii=False)