| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- # -*- coding:utf-8 -*-
- """
- DataX reader 侧的字段级脱敏工具:ini 里声明 [mask] 段,自动生成 querySql。
- 合规硬约束:脱敏 SQL 表达式在源 DB 端执行,敏感原值不出业务库。
- 见 kb/90 §2.6 高优先级 ADR。
- 扩展方式:
- - 加新 DB:在 MASK_TEMPLATES / DYNAMIC_MASK_TEMPLATES 顶层加一个 db_type key
- - 加新静态类型(固定 SQL):在 MASK_TEMPLATES[db_type] 加一项
- - 加新动态类型(带数值参数如 keep_last_{n}):在 DYNAMIC_MASK_TEMPLATES[db_type] 加一项
- """
- import re
- from typing import Callable, Dict, List, Tuple
- # 静态脱敏模板:mask_type → SQL 表达式(含 {col} 占位)
- MASK_TEMPLATES = {
- 'postgresql': {
- 'month_trunc': "TO_CHAR({col}, 'YYYY-MM') AS {col}",
- 'md5': "MD5({col}::text) AS {col}",
- 'mask_middle': "REGEXP_REPLACE({col}::text, '(.{{3}}).+(.{{4}})', '\\1****\\2') AS {col}",
- },
- } # type: Dict[str, Dict[str, str]]
- # 动态脱敏模板:mask_type 形如 keep_last_{n} / keep_first_{n},regex 捕获数字参数
- # 每项 (regex, template_fn),template_fn(*groups) 返回含 {col} 占位的模板字符串
- DYNAMIC_MASK_TEMPLATES = {
- 'postgresql': [
- (re.compile(r'^keep_last_(\d+)$'),
- lambda n: "'****' || RIGHT({{col}}::text, {n}) AS {{col}}".format(n=n)),
- (re.compile(r'^keep_first_(\d+)$'),
- lambda n: "LEFT({{col}}::text, {n}) || '****' AS {{col}}".format(n=n)),
- ],
- } # type: Dict[str, List[Tuple[re.Pattern, Callable[..., str]]]]
- # 列名白名单(防 SQL 注入):字母数字下划线,首字母非数字
- _COL_NAME_PATTERN = re.compile(r'^[A-Za-z_][A-Za-z0-9_]*$')
- def resolve_template(db_type, mask_type):
- """
- 返回 mask_type 对应的 SQL 模板(含 {col} 占位);找不到抛 ValueError。
- """
- static = MASK_TEMPLATES.get(db_type, {})
- if mask_type in static:
- return static[mask_type]
- dynamic = DYNAMIC_MASK_TEMPLATES.get(db_type, [])
- for pattern, template_fn in dynamic:
- m = pattern.match(mask_type)
- if m:
- return template_fn(*m.groups())
- raise ValueError(
- '未知脱敏类型 {mask_type!r} (db_type={db_type!r});'
- '可用静态类型 {static},动态类型 {dynamic}'.format(
- mask_type=mask_type, db_type=db_type,
- static=list(static.keys()),
- dynamic=[p.pattern for p, _ in dynamic],
- )
- )
- def build_query_sql(db_type, columns, mask_config, table, where):
- """
- 按 [mask] 声明生成 querySql;脱敏 SQL 在源 DB 执行,敏感原值不出库。
- Args:
- db_type: 目标 reader 的 DB 类型,如 'postgresql'
- columns: reader.column 列表(List[str])
- mask_config: [mask] 段 {列名: 脱敏类型},如 {'cert_birthday': 'month_trunc'}
- table: 表名(含 schema,如 'public.app_user_cert_info')
- where: WHERE 子句内容(不含 "WHERE" 关键字;占位符由调用侧替换)
- Returns:
- 完整 SELECT 语句字符串
- Raises:
- ValueError: 非法列名 / 未知脱敏类型 / mask 列不在 column 中
- """
- for col in columns:
- if not _COL_NAME_PATTERN.match(col):
- raise ValueError('非法列名 {col!r},只允许 [A-Za-z_][A-Za-z0-9_]*'.format(col=col))
- for col, mask_type in mask_config.items():
- if col not in columns:
- raise ValueError('[mask] 列 {col!r} 不在 reader.column 中'.format(col=col))
- resolve_template(db_type, mask_type)
- select_parts = []
- for col in columns:
- if col in mask_config:
- template = resolve_template(db_type, mask_config[col])
- select_parts.append(template.format(col=col))
- else:
- select_parts.append(col)
- sql = 'SELECT {cols} FROM {table}'.format(cols=', '.join(select_parts), table=table)
- if where.strip():
- sql += ' WHERE {where}'.format(where=where)
- return sql
|