Просмотр исходного кода

feat(bin): datax-sync-template-gen 加 --mask-conf(按 mask 配置生成几乎可用 ini)

新增 --mask-conf <PATH> 选项:
- 不传:当前行为,全字段 ini + md(脱敏类型列空白)
- 传:读 [mask] section 拿 {field: method},trim 字段从 reader.column 剔除
  (保持 PG 原顺序),其他 method 渲染 [mask] 段;md 脱敏类型列填好

配置格式(jobs/raw/{域}/{table}.mask.ini):
  [mask]
  field1 = trim
  field2 = mask_middle
  field3 = md5

method ∈ trim / md5 / month_trunc / mask_middle / keep_first_n / keep_last_n。
trim = 字段不入 raw(reader column 不查询),其他 = 字段入 raw 由
dw_base.datax.mask 在 reader 端脱敏。

md 列名"裁剪类型"改为"脱敏类型"(值含 trim 或具体 mask 方法),
不再区分明性/隐性(过渡概念退场)。
tianyu.chu 1 неделя назад
Родитель
Сommit
40fc29456c
2 измененных файлов с 118 добавлено и 15 удалено
  1. 59 12
      bin/datax-sync-template-gen.py
  2. 59 3
      tests/unit/datax/test_sync_template_gen.py

+ 59 - 12
bin/datax-sync-template-gen.py

@@ -28,6 +28,7 @@ import argparse
 import os
 import re
 import sys
+from configparser import ConfigParser
 from datetime import datetime
 
 project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@@ -93,28 +94,65 @@ def query_columns_full(conn, schema, table):
     return cur.fetchall()
 
 
-def render_schema_md(rows):
-    """输出 markdown 表格:序号 / 字段名 / 中文名 / 数据类型 / 主键标识 / 裁剪类型(空,开发者填)"""
+def load_mask_conf(path):
+    """读 mask 配置 ini,返回 {field: method} dict。
+
+    格式(与 jobs/raw/{域}/{table}.mask.ini 同款):
+        [mask]
+        field1 = method1
+        field2 = method2
+
+    method ∈ trim / md5 / month_trunc / mask_middle / keep_first_n / keep_last_n
+    - trim:整字段不入 raw(reader column 不查询)
+    - 其他:字段入 raw,由 dw_base.datax.mask 在 reader 端脱敏
+    """
+    cp = ConfigParser()
+    cp.read(path, encoding='utf-8')
+    if not cp.has_section('mask'):
+        return {}
+    return dict(cp.items('mask'))
+
+
+def render_schema_md(rows, mask_dict=None):
+    """输出 markdown 表格:序号 / 字段名 / 中文名 / 数据类型 / 主键标识 / 脱敏类型。
+
+    mask_dict 不传时脱敏类型列为空白;传入时填字段对应的 method(含 trim)。
+    """
     lines = [
-        '| 序号 | 字段名 | 中文名 | 数据类型 | 主键标识 | 裁剪类型 |',
+        '| 序号 | 字段名 | 中文名 | 数据类型 | 主键标识 | 脱敏类型 |',
         '| --- | --- | --- | --- | --- | --- |',
     ]
+    methods = mask_dict or {}
     for num, name, comment, typ, pk in rows:
-        lines.append('| {} | `{}` | {} | {} | {} |  |'.format(
-            num, name, comment or '', typ, pk))
+        method = methods.get(name, '')
+        lines.append('| {} | `{}` | {} | {} | {} | {} |'.format(
+            num, name, comment or '', typ, pk, method))
     return '\n'.join(lines) + '\n'
 
 
-def render_template(ds_ref, database, schema, table, columns, pk):
+def render_template(ds_ref, database, schema, table, columns, pk, mask_methods=None):
+    """渲染 sync ini 模板。
+
+    columns: [(name, comment), ...] 已剔除 trim 字段,保持 PG 原顺序
+    mask_methods: {field: method} 仅含非 trim 方法(mask_middle / month_trunc 等),
+                  渲染 [mask] 段;空 dict 或 None 时不渲染 [mask] 段
+    """
     column_str = ','.join(c for c, _ in columns)
     today = datetime.now().strftime('%Y-%m-%d')
+
+    if mask_methods:
+        mask_lines = '\n'.join('{} = {}'.format(f, m) for f, m in mask_methods.items())
+        mask_section = '[mask]\n' + mask_lines + '\n\n'
+    else:
+        mask_section = ''
+
     return (
         '; 作者:<TODO>\n'
         '; 日期:{today}\n'
         '; 工单:<TODO>\n'
         '; 目的:PG {database}.{schema}.{table} → Hive raw.<TODO> 同步模板\n'
         '; 状态:[待执行]\n'
-        '; 备注:自动生成的全字段参考模板。开发者按需裁剪字段 / 改 where / 加 [mask] /\n'
+        '; 备注:自动生成的全字段参考模板。开发者按需裁剪字段 / 改 where / 加 mask 段 /\n'
         ';       调 splitPk / 改 writer.path 表名后缀(_inc_d / _his_o 等)\n'
         ';\n'
         '; 配套 DDL:manual/ddl/raw/<TODO_domain>/raw_<TODO>_create.sql\n'
@@ -130,6 +168,7 @@ def render_template(ds_ref, database, schema, table, columns, pk):
         'splitPk = {pk}\n'
         'fetchSize = 1000\n'
         '\n'
+        '{mask_section}'
         '[writer]\n'
         'dataSource = hdfs/<TODO>\n'
         'path = /user/hive/warehouse/raw.db/{table}_TODO_d/dt=${{dt}}/\n'
@@ -142,7 +181,7 @@ def render_template(ds_ref, database, schema, table, columns, pk):
         'fieldDelimiter = \\t\n'
     ).format(
         today=today, ds_ref=ds_ref, database=database, schema=schema,
-        table=table, column_str=column_str, pk=pk,
+        table=table, column_str=column_str, pk=pk, mask_section=mask_section,
     )
 
 
@@ -157,6 +196,8 @@ def main():
                         help='schema 限定的表名(如 public.card_group_order_info)')
     parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None, metavar='DIR',
                         help='输出目录(不传 stdout 同时打印 md 表 + ini 模板;不带值 workspace/{yyyymmdd}/ 写两文件;带值自定义目录写两文件)')
+    parser.add_argument('--mask-conf', default=None, metavar='PATH',
+                        help='mask 配置 ini 路径({table}.mask.ini)。传入时按配置剔除 trim 字段 + 渲染 [mask] 段,md 脱敏类型列填好;不传时全字段输出,md 脱敏类型列空白')
     args = parser.parse_args()
 
     if '.' not in args.t:
@@ -184,12 +225,18 @@ def main():
         conn.close()
 
     # full_rows: [(attnum, attname, comment, pg_type, pk_flag), ...]
-    columns = [(r[1], r[2] or '') for r in full_rows]
+    mask_dict = load_mask_conf(args.mask_conf) if args.mask_conf else {}
+    trim_set = {f for f, m in mask_dict.items() if m == 'trim'}
+    non_trim_mask = {f: m for f, m in mask_dict.items() if m != 'trim'}
+
+    # 已剔除 trim 字段的 column 列表,保持 PG 原顺序(attnum 升序)
+    columns = [(r[1], r[2] or '') for r in full_rows if r[1] not in trim_set]
+
     pk_names = [r[1] for r in full_rows if r[4] == 'PK']
-    pk = pk_names[0] if len(pk_names) == 1 else ''  # 复合主键 / 无主键 → 空
+    pk = pk_names[0] if len(pk_names) == 1 and pk_names[0] not in trim_set else ''
 
-    md_content = render_schema_md(full_rows)
-    ini_content = render_template(args.ds, database, schema, table, columns, pk)
+    md_content = render_schema_md(full_rows, mask_dict)
+    ini_content = render_template(args.ds, database, schema, table, columns, pk, non_trim_mask)
 
     if args.o is None:
         # stdout:先 md 表后 ini 模板

+ 59 - 3
tests/unit/datax/test_sync_template_gen.py

@@ -64,6 +64,25 @@ def test_render_template_includes_required_fields():
     assert "where = update_time >= '${start_date}' AND update_time < '${stop_date}'" in out
     assert 'path = /user/hive/warehouse/raw.db/users_TODO_d/dt=${dt}/' in out
     assert 'fileName = users_TODO_d' in out
+    # 不传 mask_methods 时不渲染 [mask] section header
+    assert '\n[mask]\n' not in out
+
+
+def test_render_template_with_mask_methods():
+    columns = [('id', 'id'), ('user_name', '用户名'), ('phone', '手机号')]
+    out = GEN.render_template(
+        ds_ref='postgresql/prod-hobby', database='db', schema='public',
+        table='users', columns=columns, pk='id',
+        mask_methods={'user_name': 'mask_middle', 'phone': 'md5'},
+    )
+    # [mask] section header 在 [reader] 后 [writer] 前
+    assert '\n[mask]\n' in out
+    assert 'user_name = mask_middle' in out
+    assert 'phone = md5' in out
+    reader_idx = out.index('\n[reader]\n')
+    mask_idx = out.index('\n[mask]\n')
+    writer_idx = out.index('\n[writer]\n')
+    assert reader_idx < mask_idx < writer_idx
 
 
 def test_query_columns_full_returns_full_metadata():
@@ -80,19 +99,56 @@ def test_query_columns_full_returns_full_metadata():
     ]
 
 
-def test_render_schema_md_table_format():
+def test_render_schema_md_no_mask_dict_blank_column():
     rows = [
         (1, 'id', 'id', 'bigint', 'PK'),
         (2, 'user_name', '用户名', 'character varying', ''),
-        (3, 'create_time', None, 'timestamp without time zone', ''),  # 无注释
+        (3, 'create_time', None, 'timestamp without time zone', ''),
     ]
     out = GEN.render_schema_md(rows)
-    assert '| 序号 | 字段名 | 中文名 | 数据类型 | 主键标识 | 裁剪类型 |' in out
+    assert '| 序号 | 字段名 | 中文名 | 数据类型 | 主键标识 | 脱敏类型 |' in out
     assert '| 1 | `id` | id | bigint | PK |  |' in out
     assert '| 2 | `user_name` | 用户名 | character varying |  |  |' in out
     assert '| 3 | `create_time` |  | timestamp without time zone |  |  |' in out
 
 
+def test_render_schema_md_with_mask_dict():
+    rows = [
+        (1, 'id', 'id', 'bigint', 'PK'),
+        (2, 'user_name', '用户名', 'character varying', ''),
+        (3, 'phone', '手机号', 'character varying', ''),
+        (4, 'merchant_open', '商家代开', 'smallint', ''),
+    ]
+    mask_dict = {'phone': 'md5', 'merchant_open': 'trim', 'user_name': 'mask_middle'}
+    out = GEN.render_schema_md(rows, mask_dict)
+    assert '| 1 | `id` | id | bigint | PK |  |' in out
+    assert '| 2 | `user_name` | 用户名 | character varying |  | mask_middle |' in out
+    assert '| 3 | `phone` | 手机号 | character varying |  | md5 |' in out
+    assert '| 4 | `merchant_open` | 商家代开 | smallint |  | trim |' in out
+
+
+def test_load_mask_conf_basic(tmp_path):
+    p = tmp_path / 't.mask.ini'
+    p.write_text(
+        '[mask]\n'
+        'payment_num = trim\n'
+        'phone = md5\n'
+        'name = mask_middle\n',
+        encoding='utf-8',
+    )
+    assert GEN.load_mask_conf(str(p)) == {
+        'payment_num': 'trim',
+        'phone': 'md5',
+        'name': 'mask_middle',
+    }
+
+
+def test_load_mask_conf_no_section_returns_empty(tmp_path):
+    p = tmp_path / 't.mask.ini'
+    p.write_text('[other]\nfoo = bar\n', encoding='utf-8')
+    assert GEN.load_mask_conf(str(p)) == {}
+
+
 def test_render_template_empty_pk():
     out = GEN.render_template(
         ds_ref='postgresql/prod-hobby', database='db', schema='public',