# -*- coding:utf-8 -*- """ 埋点入仓的字段级脱敏:在已解析的 properties dict 上按事件应用 drop / mask。 与 dw_base/datax/mask.py 的区别: - datax/mask.py 在源 PG 端生成脱敏 querySql,SQL 在业务库执行(敏感原值不出业务库); - 本模块作用在已解析的 JSON dict 上 —— 埋点无源库,gz 文件已从固定服务器导出, 脱敏在入 raw 前的 Spark UDF 里做(kb/13 §4 合规破例)。 两者共用同一套方法语义,单一真值见 conf/templates/datax/mask/mask.template.ini。 兜底:未在配置声明的事件 → properties 原样返回,靠协作流程补含敏事件(kb/13 §6)。 """ import hashlib import os import re from configparser import ConfigParser from copy import deepcopy _EVENT_PREFIX = 'event:' _KEEP_FIRST = re.compile(r'^keep_first_(\d+)$') _KEEP_LAST = re.compile(r'^keep_last_(\d+)$') _STATIC_METHODS = ('md5', 'month_trunc', 'mask_middle') def _to_text(value): return value if isinstance(value, str) else str(value) def _known_method(method): return method in _STATIC_METHODS or bool(_KEEP_FIRST.match(method)) or bool(_KEEP_LAST.match(method)) def apply_method(method, value): """按方法名脱敏单个值;value 为 None 原样返回;未知方法抛 ValueError。""" if value is None: return None if method == 'md5': return hashlib.md5(_to_text(value).encode('utf-8')).hexdigest() if method == 'month_trunc': # 'YYYY-MM-DD...' / 'YYYY/MM/...' 截断到 'YYYY-MM',不匹配则原样返回 m = re.match(r'^(\d{4})[-/](\d{2})', _to_text(value)) return '{0}-{1}'.format(m.group(1), m.group(2)) if m else _to_text(value) if method == 'mask_middle': # 对齐 datax 正则 (.{3}).+(.{4}):长度 < 8 不脱敏 text = _to_text(value) return text[:3] + '****' + text[-4:] if len(text) >= 8 else text m = _KEEP_FIRST.match(method) if m: n = int(m.group(1)) return _to_text(value)[:n] + '****' m = _KEEP_LAST.match(method) if m: n = int(m.group(1)) return '****' + (_to_text(value)[-n:] if n else '') raise ValueError( '未知脱敏方法 {0!r}(可用 md5 / month_trunc / mask_middle / keep_first_n / keep_last_n)'.format(method) ) def apply_mask(event_name, properties, conf): """ 对单条事件的 properties dict 应用脱敏,返回脱敏后的新 dict(不修改入参)。 - 同时作用于 properties 顶层与嵌套的 properties['params'] - drop 字段整删,mask 字段调对应 method - 同一字段同时在 drop 和 mask 时 drop 优先 - event_name 未在 conf → properties 原样返回(兜底) """ rule = conf.get(event_name) if not rule: return properties result = deepcopy(properties) layers = [result] params = result.get('params') if isinstance(params, dict): layers.append(params) for field in rule.get('drop', ()): for layer in layers: layer.pop(field, None) for field, method in rule.get('mask', {}).items(): for layer in layers: if field in layer: layer[field] = apply_method(method, layer[field]) return result def load_mask_conf(path): """ 解析脱敏配置 → {event_name: {'drop': [...field], 'mask': {field: method}}}。 文件不存在直接抛错:脱敏配置缺失会导致敏感数据原样入 raw,不可静默 (ConfigParser.read 对缺失文件静默返回,故先显式校验存在性)。 """ if not os.path.isfile(path): raise FileNotFoundError('脱敏配置不存在:{0}'.format(path)) parser = ConfigParser() parser.optionxform = str # 保留字段名大小写(埋点字段是驼峰) parser.read(path, encoding='utf-8') conf = {} for section in parser.sections(): if not section.startswith(_EVENT_PREFIX): continue event = section[len(_EVENT_PREFIX):] rule = {} if parser.has_option(section, 'drop'): rule['drop'] = [f.strip() for f in parser.get(section, 'drop').split(',') if f.strip()] if parser.has_option(section, 'mask'): mask = {} for item in parser.get(section, 'mask').split(','): item = item.strip() if not item: continue if ':' not in item: raise ValueError('[mask] 项格式应为 field:method,实际 {0!r}(section [{1}])'.format(item, section)) field, method = item.split(':', 1) field, method = field.strip(), method.strip() if not _known_method(method): raise ValueError( '未知脱敏方法 {0!r}(section [{1}],可用 md5 / month_trunc / ' 'mask_middle / keep_first_n / keep_last_n)'.format(method, section) ) mask[field] = method rule['mask'] = mask conf[event] = rule return conf