| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- # -*- coding:utf-8 -*-
- """
- 埋点入仓的字段级脱敏:在已解析的 properties dict 上按事件应用 drop / mask。
- 与 dw_base/datax/mask.py 的区别:
- - datax/mask.py 在源 PG 端生成脱敏 querySql,SQL 在业务库执行(敏感原值不出业务库);
- - 本模块作用在已解析的 JSON dict 上 —— 埋点无源库,gz 文件已从固定服务器导出,
- 脱敏在入 raw 前的 Spark UDF 里做(kb/13 §4 合规破例)。
- 两者共用同一套方法语义,单一真值见 conf/templates/datax/mask/mask.template.ini。
- 兜底:未在配置声明的事件 → properties 原样返回,靠协作流程补含敏事件(kb/13 §6)。
- """
- import hashlib
- import os
- import re
- from configparser import ConfigParser
- from copy import deepcopy
- _EVENT_PREFIX = 'event:'
- _KEEP_FIRST = re.compile(r'^keep_first_(\d+)$')
- _KEEP_LAST = re.compile(r'^keep_last_(\d+)$')
- _STATIC_METHODS = ('md5', 'month_trunc', 'mask_middle')
- def _to_text(value):
- return value if isinstance(value, str) else str(value)
- def _known_method(method):
- return method in _STATIC_METHODS or bool(_KEEP_FIRST.match(method)) or bool(_KEEP_LAST.match(method))
- def apply_method(method, value):
- """按方法名脱敏单个值;value 为 None 原样返回;未知方法抛 ValueError。"""
- if value is None:
- return None
- if method == 'md5':
- return hashlib.md5(_to_text(value).encode('utf-8')).hexdigest()
- if method == 'month_trunc':
- # 'YYYY-MM-DD...' / 'YYYY/MM/...' 截断到 'YYYY-MM',不匹配则原样返回
- m = re.match(r'^(\d{4})[-/](\d{2})', _to_text(value))
- return '{0}-{1}'.format(m.group(1), m.group(2)) if m else _to_text(value)
- if method == 'mask_middle':
- # 对齐 datax 正则 (.{3}).+(.{4}):长度 < 8 不脱敏
- text = _to_text(value)
- return text[:3] + '****' + text[-4:] if len(text) >= 8 else text
- m = _KEEP_FIRST.match(method)
- if m:
- n = int(m.group(1))
- return _to_text(value)[:n] + '****'
- m = _KEEP_LAST.match(method)
- if m:
- n = int(m.group(1))
- return '****' + (_to_text(value)[-n:] if n else '')
- raise ValueError(
- '未知脱敏方法 {0!r}(可用 md5 / month_trunc / mask_middle / keep_first_n / keep_last_n)'.format(method)
- )
- def apply_mask(event_name, properties, conf):
- """
- 对单条事件的 properties dict 应用脱敏,返回脱敏后的新 dict(不修改入参)。
- - 同时作用于 properties 顶层与嵌套的 properties['params']
- - drop 字段整删,mask 字段调对应 method
- - 同一字段同时在 drop 和 mask 时 drop 优先
- - event_name 未在 conf → properties 原样返回(兜底)
- """
- rule = conf.get(event_name)
- if not rule:
- return properties
- result = deepcopy(properties)
- layers = [result]
- params = result.get('params')
- if isinstance(params, dict):
- layers.append(params)
- for field in rule.get('drop', ()):
- for layer in layers:
- layer.pop(field, None)
- for field, method in rule.get('mask', {}).items():
- for layer in layers:
- if field in layer:
- layer[field] = apply_method(method, layer[field])
- return result
- def load_mask_conf(path):
- """
- 解析脱敏配置 → {event_name: {'drop': [...field], 'mask': {field: method}}}。
- 文件不存在直接抛错:脱敏配置缺失会导致敏感数据原样入 raw,不可静默
- (ConfigParser.read 对缺失文件静默返回,故先显式校验存在性)。
- """
- if not os.path.isfile(path):
- raise FileNotFoundError('脱敏配置不存在:{0}'.format(path))
- parser = ConfigParser()
- parser.optionxform = str # 保留字段名大小写(埋点字段是驼峰)
- parser.read(path, encoding='utf-8')
- conf = {}
- for section in parser.sections():
- if not section.startswith(_EVENT_PREFIX):
- continue
- event = section[len(_EVENT_PREFIX):]
- rule = {}
- if parser.has_option(section, 'drop'):
- rule['drop'] = [f.strip() for f in parser.get(section, 'drop').split(',') if f.strip()]
- if parser.has_option(section, 'mask'):
- mask = {}
- for item in parser.get(section, 'mask').split(','):
- item = item.strip()
- if not item:
- continue
- if ':' not in item:
- raise ValueError('[mask] 项格式应为 field:method,实际 {0!r}(section [{1}])'.format(item, section))
- field, method = item.split(':', 1)
- field, method = field.strip(), method.strip()
- if not _known_method(method):
- raise ValueError(
- '未知脱敏方法 {0!r}(section [{1}],可用 md5 / month_trunc / '
- 'mask_middle / keep_first_n / keep_last_n)'.format(method, section)
- )
- mask[field] = method
- rule['mask'] = mask
- conf[event] = rule
- return conf
|