mask.py 3.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. # -*- coding:utf-8 -*-
  2. """
  3. DataX reader 侧的字段级脱敏工具:ini 里声明 [mask] 段,自动生成 querySql。
  4. 合规硬约束:脱敏 SQL 表达式在源 DB 端执行,敏感原值不出业务库。
  5. 见 kb/90 §2.6 高优先级 ADR。
  6. 扩展方式:
  7. - 加新 DB:在 MASK_TEMPLATES / DYNAMIC_MASK_TEMPLATES 顶层加一个 db_type key
  8. - 加新静态类型(固定 SQL):在 MASK_TEMPLATES[db_type] 加一项
  9. - 加新动态类型(带数值参数如 keep_last_{n}):在 DYNAMIC_MASK_TEMPLATES[db_type] 加一项
  10. """
  11. import re
  12. from typing import Callable, Dict, List, Tuple
  13. # 静态脱敏模板:mask_type → SQL 表达式(含 {col} 占位)
  14. MASK_TEMPLATES = {
  15. 'postgresql': {
  16. 'month_trunc': "TO_CHAR({col}, 'YYYY-MM') AS {col}",
  17. 'md5': "MD5({col}::text) AS {col}",
  18. 'mask_middle': "REGEXP_REPLACE({col}::text, '(.{{3}}).+(.{{4}})', '\\1****\\2') AS {col}",
  19. },
  20. } # type: Dict[str, Dict[str, str]]
  21. # 动态脱敏模板:mask_type 形如 keep_last_{n} / keep_first_{n},regex 捕获数字参数
  22. # 每项 (regex, template_fn),template_fn(*groups) 返回含 {col} 占位的模板字符串
  23. DYNAMIC_MASK_TEMPLATES = {
  24. 'postgresql': [
  25. (re.compile(r'^keep_last_(\d+)$'),
  26. lambda n: "'****' || RIGHT({{col}}::text, {n}) AS {{col}}".format(n=n)),
  27. (re.compile(r'^keep_first_(\d+)$'),
  28. lambda n: "LEFT({{col}}::text, {n}) || '****' AS {{col}}".format(n=n)),
  29. ],
  30. } # type: Dict[str, List[Tuple[re.Pattern, Callable[..., str]]]]
  31. # 列名白名单(防 SQL 注入):字母数字下划线,首字母非数字
  32. _COL_NAME_PATTERN = re.compile(r'^[A-Za-z_][A-Za-z0-9_]*$')
  33. def resolve_template(db_type, mask_type):
  34. """
  35. 返回 mask_type 对应的 SQL 模板(含 {col} 占位);找不到抛 ValueError。
  36. """
  37. static = MASK_TEMPLATES.get(db_type, {})
  38. if mask_type in static:
  39. return static[mask_type]
  40. dynamic = DYNAMIC_MASK_TEMPLATES.get(db_type, [])
  41. for pattern, template_fn in dynamic:
  42. m = pattern.match(mask_type)
  43. if m:
  44. return template_fn(*m.groups())
  45. raise ValueError(
  46. '未知脱敏类型 {mask_type!r} (db_type={db_type!r});'
  47. '可用静态类型 {static},动态类型 {dynamic}'.format(
  48. mask_type=mask_type, db_type=db_type,
  49. static=list(static.keys()),
  50. dynamic=[p.pattern for p, _ in dynamic],
  51. )
  52. )
  53. def build_query_sql(db_type, columns, mask_config, table, where):
  54. """
  55. 按 [mask] 声明生成 querySql;脱敏 SQL 在源 DB 执行,敏感原值不出库。
  56. Args:
  57. db_type: 目标 reader 的 DB 类型,如 'postgresql'
  58. columns: reader.column 列表(List[str])
  59. mask_config: [mask] 段 {列名: 脱敏类型},如 {'cert_birthday': 'month_trunc'}
  60. table: 表名(含 schema,如 'public.app_user_cert_info')
  61. where: WHERE 子句内容(不含 "WHERE" 关键字;占位符由调用侧替换)
  62. Returns:
  63. 完整 SELECT 语句字符串
  64. Raises:
  65. ValueError: 非法列名 / 未知脱敏类型 / mask 列不在 column 中
  66. """
  67. for col in columns:
  68. if not _COL_NAME_PATTERN.match(col):
  69. raise ValueError('非法列名 {col!r},只允许 [A-Za-z_][A-Za-z0-9_]*'.format(col=col))
  70. for col, mask_type in mask_config.items():
  71. if col not in columns:
  72. raise ValueError('[mask] 列 {col!r} 不在 reader.column 中'.format(col=col))
  73. resolve_template(db_type, mask_type)
  74. select_parts = []
  75. for col in columns:
  76. if col in mask_config:
  77. template = resolve_template(db_type, mask_config[col])
  78. select_parts.append(template.format(col=col))
  79. else:
  80. select_parts.append(col)
  81. sql = 'SELECT {cols} FROM {table}'.format(cols=', '.join(select_parts), table=table)
  82. if where.strip():
  83. sql += ' WHERE {where}'.format(where=where)
  84. return sql