mask.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. # -*- coding:utf-8 -*-
  2. """
  3. 埋点入仓的字段级脱敏:在已解析的 properties dict 上按事件应用 drop / mask。
  4. 与 dw_base/datax/mask.py 的区别:
  5. - datax/mask.py 在源 PG 端生成脱敏 querySql,SQL 在业务库执行(敏感原值不出业务库);
  6. - 本模块作用在已解析的 JSON dict 上 —— 埋点无源库,gz 文件已从固定服务器导出,
  7. 脱敏在入 raw 前的 Spark UDF 里做(kb/13 §4 合规破例)。
  8. 两者共用同一套方法语义,单一真值见 conf/templates/datax/mask/mask.template.ini。
  9. 兜底:未在配置声明的事件 → properties 原样返回,靠协作流程补含敏事件(kb/13 §6)。
  10. """
  11. import hashlib
  12. import os
  13. import re
  14. from configparser import ConfigParser
  15. from copy import deepcopy
  16. _EVENT_PREFIX = 'event:'
  17. _KEEP_FIRST = re.compile(r'^keep_first_(\d+)$')
  18. _KEEP_LAST = re.compile(r'^keep_last_(\d+)$')
  19. _STATIC_METHODS = ('md5', 'month_trunc', 'mask_middle')
  20. def _to_text(value):
  21. return value if isinstance(value, str) else str(value)
  22. def _known_method(method):
  23. return method in _STATIC_METHODS or bool(_KEEP_FIRST.match(method)) or bool(_KEEP_LAST.match(method))
  24. def apply_method(method, value):
  25. """按方法名脱敏单个值;value 为 None 原样返回;未知方法抛 ValueError。"""
  26. if value is None:
  27. return None
  28. if method == 'md5':
  29. return hashlib.md5(_to_text(value).encode('utf-8')).hexdigest()
  30. if method == 'month_trunc':
  31. # 'YYYY-MM-DD...' / 'YYYY/MM/...' 截断到 'YYYY-MM',不匹配则原样返回
  32. m = re.match(r'^(\d{4})[-/](\d{2})', _to_text(value))
  33. return '{0}-{1}'.format(m.group(1), m.group(2)) if m else _to_text(value)
  34. if method == 'mask_middle':
  35. # 对齐 datax 正则 (.{3}).+(.{4}):长度 < 8 不脱敏
  36. text = _to_text(value)
  37. return text[:3] + '****' + text[-4:] if len(text) >= 8 else text
  38. m = _KEEP_FIRST.match(method)
  39. if m:
  40. n = int(m.group(1))
  41. return _to_text(value)[:n] + '****'
  42. m = _KEEP_LAST.match(method)
  43. if m:
  44. n = int(m.group(1))
  45. return '****' + (_to_text(value)[-n:] if n else '')
  46. raise ValueError(
  47. '未知脱敏方法 {0!r}(可用 md5 / month_trunc / mask_middle / keep_first_n / keep_last_n)'.format(method)
  48. )
  49. def apply_mask(event_name, properties, conf):
  50. """
  51. 对单条事件的 properties dict 应用脱敏,返回脱敏后的新 dict(不修改入参)。
  52. - 同时作用于 properties 顶层与嵌套的 properties['params']
  53. - drop 字段整删,mask 字段调对应 method
  54. - 同一字段同时在 drop 和 mask 时 drop 优先
  55. - event_name 未在 conf → properties 原样返回(兜底)
  56. """
  57. rule = conf.get(event_name)
  58. if not rule:
  59. return properties
  60. result = deepcopy(properties)
  61. layers = [result]
  62. params = result.get('params')
  63. if isinstance(params, dict):
  64. layers.append(params)
  65. for field in rule.get('drop', ()):
  66. for layer in layers:
  67. layer.pop(field, None)
  68. for field, method in rule.get('mask', {}).items():
  69. for layer in layers:
  70. if field in layer:
  71. layer[field] = apply_method(method, layer[field])
  72. return result
  73. def load_mask_conf(path):
  74. """
  75. 解析脱敏配置 → {event_name: {'drop': [...field], 'mask': {field: method}}}。
  76. 文件不存在直接抛错:脱敏配置缺失会导致敏感数据原样入 raw,不可静默
  77. (ConfigParser.read 对缺失文件静默返回,故先显式校验存在性)。
  78. """
  79. if not os.path.isfile(path):
  80. raise FileNotFoundError('脱敏配置不存在:{0}'.format(path))
  81. parser = ConfigParser()
  82. parser.optionxform = str # 保留字段名大小写(埋点字段是驼峰)
  83. parser.read(path, encoding='utf-8')
  84. conf = {}
  85. for section in parser.sections():
  86. if not section.startswith(_EVENT_PREFIX):
  87. continue
  88. event = section[len(_EVENT_PREFIX):]
  89. rule = {}
  90. if parser.has_option(section, 'drop'):
  91. rule['drop'] = [f.strip() for f in parser.get(section, 'drop').split(',') if f.strip()]
  92. if parser.has_option(section, 'mask'):
  93. mask = {}
  94. for item in parser.get(section, 'mask').split(','):
  95. item = item.strip()
  96. if not item:
  97. continue
  98. if ':' not in item:
  99. raise ValueError('[mask] 项格式应为 field:method,实际 {0!r}(section [{1}])'.format(item, section))
  100. field, method = item.split(':', 1)
  101. field, method = field.strip(), method.strip()
  102. if not _known_method(method):
  103. raise ValueError(
  104. '未知脱敏方法 {0!r}(section [{1}],可用 md5 / month_trunc / '
  105. 'mask_middle / keep_first_n / keep_last_n)'.format(method, section)
  106. )
  107. mask[field] = method
  108. rule['mask'] = mask
  109. conf[event] = rule
  110. return conf