Przeglądaj źródła

feat(datax): 新增 mask 模块 + PG 5 典型脱敏(3 静态 + 2 动态)

静态:month_trunc / md5 / mask_middle
动态:keep_last_{n} / keep_first_{n}(正则捕获 n)
列名 + mask_type 双白名单校验防 SQL 注入
9 条单测覆盖静态/动态/综合/列名非法/未知类型/列不在 column

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 2 tygodni temu
rodzic
commit
b791d049d9
3 zmienionych plików z 156 dodań i 0 usunięć
  1. 97 0
      dw_base/datax/mask.py
  2. 0 0
      tests/unit/datax/__init__.py
  3. 59 0
      tests/unit/datax/test_mask.py

+ 97 - 0
dw_base/datax/mask.py

@@ -0,0 +1,97 @@
+# -*- coding:utf-8 -*-
+"""
+DataX reader 侧的字段级脱敏工具:ini 里声明 [mask] 段,自动生成 querySql。
+
+合规硬约束:脱敏 SQL 表达式在源 DB 端执行,敏感原值不出业务库。
+见 kb/90 §2.6 高优先级 ADR。
+
+扩展方式:
+- 加新 DB:在 MASK_TEMPLATES / DYNAMIC_MASK_TEMPLATES 顶层加一个 db_type key
+- 加新静态类型(固定 SQL):在 MASK_TEMPLATES[db_type] 加一项
+- 加新动态类型(带数值参数如 keep_last_{n}):在 DYNAMIC_MASK_TEMPLATES[db_type] 加一项
+"""
+import re
+from typing import Callable, Dict, List, Tuple
+
+# 静态脱敏模板:mask_type → SQL 表达式(含 {col} 占位)
+MASK_TEMPLATES = {
+    'postgresql': {
+        'month_trunc': "TO_CHAR({col}, 'YYYY-MM') AS {col}",
+        'md5': "MD5({col}::text) AS {col}",
+        'mask_middle': "REGEXP_REPLACE({col}::text, '(.{{3}}).+(.{{4}})', '\\1****\\2') AS {col}",
+    },
+}  # type: Dict[str, Dict[str, str]]
+
+# 动态脱敏模板:mask_type 形如 keep_last_{n} / keep_first_{n},regex 捕获数字参数
+# 每项 (regex, template_fn),template_fn(*groups) 返回含 {col} 占位的模板字符串
+DYNAMIC_MASK_TEMPLATES = {
+    'postgresql': [
+        (re.compile(r'^keep_last_(\d+)$'),
+         lambda n: "'****' || RIGHT({{col}}::text, {n}) AS {{col}}".format(n=n)),
+        (re.compile(r'^keep_first_(\d+)$'),
+         lambda n: "LEFT({{col}}::text, {n}) || '****' AS {{col}}".format(n=n)),
+    ],
+}  # type: Dict[str, List[Tuple[re.Pattern, Callable[..., str]]]]
+
+# 列名白名单(防 SQL 注入):字母数字下划线,首字母非数字
+_COL_NAME_PATTERN = re.compile(r'^[A-Za-z_][A-Za-z0-9_]*$')
+
+
+def resolve_template(db_type, mask_type):
+    """
+    返回 mask_type 对应的 SQL 模板(含 {col} 占位);找不到抛 ValueError。
+    """
+    static = MASK_TEMPLATES.get(db_type, {})
+    if mask_type in static:
+        return static[mask_type]
+    dynamic = DYNAMIC_MASK_TEMPLATES.get(db_type, [])
+    for pattern, template_fn in dynamic:
+        m = pattern.match(mask_type)
+        if m:
+            return template_fn(*m.groups())
+    raise ValueError(
+        '未知脱敏类型 {mask_type!r} (db_type={db_type!r});'
+        '可用静态类型 {static},动态类型 {dynamic}'.format(
+            mask_type=mask_type, db_type=db_type,
+            static=list(static.keys()),
+            dynamic=[p.pattern for p, _ in dynamic],
+        )
+    )
+
+
+def build_query_sql(db_type, columns, mask_config, table, where):
+    """
+    按 [mask] 声明生成 querySql;脱敏 SQL 在源 DB 执行,敏感原值不出库。
+
+    Args:
+        db_type: 目标 reader 的 DB 类型,如 'postgresql'
+        columns: reader.column 列表(List[str])
+        mask_config: [mask] 段 {列名: 脱敏类型},如 {'cert_birthday': 'month_trunc'}
+        table: 表名(含 schema,如 'public.app_user_cert_info')
+        where: WHERE 子句内容(不含 "WHERE" 关键字;占位符由调用侧替换)
+
+    Returns:
+        完整 SELECT 语句字符串
+
+    Raises:
+        ValueError: 非法列名 / 未知脱敏类型 / mask 列不在 column 中
+    """
+    for col in columns:
+        if not _COL_NAME_PATTERN.match(col):
+            raise ValueError('非法列名 {col!r},只允许 [A-Za-z_][A-Za-z0-9_]*'.format(col=col))
+    for col, mask_type in mask_config.items():
+        if col not in columns:
+            raise ValueError('[mask] 列 {col!r} 不在 reader.column 中'.format(col=col))
+        resolve_template(db_type, mask_type)
+
+    select_parts = []
+    for col in columns:
+        if col in mask_config:
+            template = resolve_template(db_type, mask_config[col])
+            select_parts.append(template.format(col=col))
+        else:
+            select_parts.append(col)
+    sql = 'SELECT {cols} FROM {table}'.format(cols=', '.join(select_parts), table=table)
+    if where.strip():
+        sql += ' WHERE {where}'.format(where=where)
+    return sql

+ 0 - 0
tests/unit/datax/__init__.py


+ 59 - 0
tests/unit/datax/test_mask.py

@@ -0,0 +1,59 @@
+# -*- coding:utf-8 -*-
+import pytest
+
+from dw_base.datax.mask import build_query_sql, resolve_template
+
+
+def test_static_month_trunc():
+    tpl = resolve_template('postgresql', 'month_trunc')
+    assert tpl == "TO_CHAR({col}, 'YYYY-MM') AS {col}"
+
+
+def test_static_md5():
+    tpl = resolve_template('postgresql', 'md5')
+    assert tpl == 'MD5({col}::text) AS {col}'
+
+
+def test_dynamic_keep_last_4():
+    tpl = resolve_template('postgresql', 'keep_last_4')
+    assert tpl == "'****' || RIGHT({col}::text, 4) AS {col}"
+
+
+def test_dynamic_keep_last_10():
+    tpl = resolve_template('postgresql', 'keep_last_10')
+    assert 'RIGHT({col}::text, 10)' in tpl
+
+
+def test_dynamic_keep_first_3():
+    tpl = resolve_template('postgresql', 'keep_first_3')
+    assert tpl == "LEFT({col}::text, 3) || '****' AS {col}"
+
+
+def test_build_query_sql_mixed():
+    sql = build_query_sql(
+        db_type='postgresql',
+        columns=['id', 'cert_birthday', 'phone'],
+        mask_config={'cert_birthday': 'month_trunc', 'phone': 'keep_last_4'},
+        table='public.t',
+        where="create_time >= '${start_date}'",
+    )
+    assert 'SELECT id,' in sql
+    assert "TO_CHAR(cert_birthday, 'YYYY-MM') AS cert_birthday" in sql
+    assert "RIGHT(phone::text, 4)" in sql
+    assert 'FROM public.t' in sql
+    assert "WHERE create_time >= '${start_date}'" in sql
+
+
+def test_invalid_col_name_rejects_injection():
+    with pytest.raises(ValueError, match='非法列名'):
+        build_query_sql('postgresql', ['id; DROP TABLE x'], {}, 'public.t', '')
+
+
+def test_unknown_mask_type():
+    with pytest.raises(ValueError, match='未知脱敏类型'):
+        build_query_sql('postgresql', ['phone'], {'phone': 'bogus_mask'}, 'public.t', '')
+
+
+def test_mask_col_not_in_columns():
+    with pytest.raises(ValueError, match='不在 reader.column 中'):
+        build_query_sql('postgresql', ['id'], {'phone': 'md5'}, 'public.t', '')