# -*- coding:utf-8 -*- """ DataX reader 侧的字段级脱敏工具:ini 里声明 [mask] 段,自动生成 querySql。 合规硬约束:脱敏 SQL 表达式在源 DB 端执行,敏感原值不出业务库。 见 kb/90 §2.6 高优先级 ADR。 扩展方式: - 加新 DB:在 MASK_TEMPLATES / DYNAMIC_MASK_TEMPLATES 顶层加一个 db_type key - 加新静态类型(固定 SQL):在 MASK_TEMPLATES[db_type] 加一项 - 加新动态类型(带数值参数如 keep_last_{n}):在 DYNAMIC_MASK_TEMPLATES[db_type] 加一项 """ import re from typing import Callable, Dict, List, Tuple # 静态脱敏模板:mask_type → SQL 表达式(含 {col} 占位) MASK_TEMPLATES = { 'postgresql': { 'month_trunc': "TO_CHAR({col}, 'YYYY-MM') AS {col}", 'md5': "MD5({col}::text) AS {col}", 'mask_middle': "REGEXP_REPLACE({col}::text, '(.{{3}}).+(.{{4}})', '\\1****\\2') AS {col}", }, } # type: Dict[str, Dict[str, str]] # 动态脱敏模板:mask_type 形如 keep_last_{n} / keep_first_{n},regex 捕获数字参数 # 每项 (regex, template_fn),template_fn(*groups) 返回含 {col} 占位的模板字符串 DYNAMIC_MASK_TEMPLATES = { 'postgresql': [ (re.compile(r'^keep_last_(\d+)$'), lambda n: "'****' || RIGHT({{col}}::text, {n}) AS {{col}}".format(n=n)), (re.compile(r'^keep_first_(\d+)$'), lambda n: "LEFT({{col}}::text, {n}) || '****' AS {{col}}".format(n=n)), ], } # type: Dict[str, List[Tuple[re.Pattern, Callable[..., str]]]] # 列名白名单(防 SQL 注入):字母数字下划线,首字母非数字 _COL_NAME_PATTERN = re.compile(r'^[A-Za-z_][A-Za-z0-9_]*$') def resolve_template(db_type, mask_type): """ 返回 mask_type 对应的 SQL 模板(含 {col} 占位);找不到抛 ValueError。 """ static = MASK_TEMPLATES.get(db_type, {}) if mask_type in static: return static[mask_type] dynamic = DYNAMIC_MASK_TEMPLATES.get(db_type, []) for pattern, template_fn in dynamic: m = pattern.match(mask_type) if m: return template_fn(*m.groups()) raise ValueError( '未知脱敏类型 {mask_type!r} (db_type={db_type!r});' '可用静态类型 {static},动态类型 {dynamic}'.format( mask_type=mask_type, db_type=db_type, static=list(static.keys()), dynamic=[p.pattern for p, _ in dynamic], ) ) def build_query_sql(db_type, columns, mask_config, table, where): """ 按 [mask] 声明生成 querySql;脱敏 SQL 在源 DB 执行,敏感原值不出库。 Args: db_type: 目标 reader 的 DB 类型,如 'postgresql' columns: reader.column 列表(List[str]) mask_config: [mask] 段 {列名: 脱敏类型},如 {'cert_birthday': 'month_trunc'} table: 表名(含 schema,如 'public.app_user_cert_info') where: WHERE 子句内容(不含 "WHERE" 关键字;占位符由调用侧替换) Returns: 完整 SELECT 语句字符串 Raises: ValueError: 非法列名 / 未知脱敏类型 / mask 列不在 column 中 """ for col in columns: if not _COL_NAME_PATTERN.match(col): raise ValueError('非法列名 {col!r},只允许 [A-Za-z_][A-Za-z0-9_]*'.format(col=col)) for col, mask_type in mask_config.items(): if col not in columns: raise ValueError('[mask] 列 {col!r} 不在 reader.column 中'.format(col=col)) resolve_template(db_type, mask_type) select_parts = [] for col in columns: if col in mask_config: template = resolve_template(db_type, mask_config[col]) select_parts.append(template.format(col=col)) else: select_parts.append(col) sql = 'SELECT {cols} FROM {table}'.format(cols=', '.join(select_parts), table=table) if where.strip(): sql += ' WHERE {where}'.format(where=where) return sql