| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- # -*- coding:utf-8 -*-
- """
- datax-sync-template-gen 模板渲染 + JDBC URL 解析单测。
- 不连真 PG(query_columns_full 走 mock conn)。
- 脚本路径含连字符,用 importlib.util 动态加载为模块。
- """
- import importlib.util
- import os
- import sys
- from unittest.mock import MagicMock
- import pytest
- PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
- SCRIPT_PATH = os.path.join(PROJECT_ROOT, 'bin', 'datax-sync-template-gen.py')
- def _load_script():
- spec = importlib.util.spec_from_file_location('datax_sync_template_gen', SCRIPT_PATH)
- mod = importlib.util.module_from_spec(spec)
- sys.modules['datax_sync_template_gen'] = mod
- spec.loader.exec_module(mod)
- return mod
- GEN = _load_script()
- def test_parse_jdbc_url_with_port():
- host, port, db = GEN.parse_jdbc_url('jdbc:postgresql://10.0.0.1:5433/hobby_stocks')
- assert host == '10.0.0.1'
- assert port == 5433
- assert db == 'hobby_stocks'
- def test_parse_jdbc_url_default_port():
- host, port, db = GEN.parse_jdbc_url('jdbc:postgresql://pg.example.com/mydb')
- assert host == 'pg.example.com'
- assert port == 5432
- assert db == 'mydb'
- def test_parse_jdbc_url_invalid():
- with pytest.raises(ValueError, match='无法解析'):
- GEN.parse_jdbc_url('mysql://10.0.0.1:3306/foo')
- def test_render_template_includes_required_fields():
- columns = [('id', 'id'), ('name', '姓名'), ('create_time', '创建时间')]
- out = GEN.render_template(
- ds_ref='postgresql/prod-hobby',
- database='hobby_stocks',
- schema='public',
- table='users',
- columns=columns,
- pk='id',
- )
- assert 'dataSource = postgresql/prod-hobby' in out
- assert 'database = hobby_stocks' in out
- assert 'table = public.users' in out
- assert 'column = id,name,create_time' in out
- assert 'splitPk = id' in out
- assert "where = update_time >= '${start_date}' AND update_time < '${stop_date}'" in out
- assert 'path = /user/hive/warehouse/raw.db/users_TODO_d/dt=${dt}/' in out
- assert 'fileName = users_TODO_d' in out
- # 不传 mask_methods 时不渲染 [mask] section header
- assert '\n[mask]\n' not in out
- def test_render_template_with_mask_methods():
- columns = [('id', 'id'), ('user_name', '用户名'), ('phone', '手机号')]
- out = GEN.render_template(
- ds_ref='postgresql/prod-hobby', database='db', schema='public',
- table='users', columns=columns, pk='id',
- mask_methods={'user_name': 'mask_middle', 'phone': 'md5'},
- )
- # [mask] section header 在 [reader] 后 [writer] 前
- assert '\n[mask]\n' in out
- assert 'user_name = mask_middle' in out
- assert 'phone = md5' in out
- reader_idx = out.index('\n[reader]\n')
- mask_idx = out.index('\n[mask]\n')
- writer_idx = out.index('\n[writer]\n')
- assert reader_idx < mask_idx < writer_idx
- def test_query_columns_full_returns_full_metadata():
- conn = MagicMock()
- cur = conn.cursor.return_value
- cur.fetchall.return_value = [
- (1, 'id', 'id', 'bigint', 'PK'),
- (2, 'name', '名称', 'character varying', ''),
- ]
- rows = GEN.query_columns_full(conn, 'public', 'orders')
- assert rows == [
- (1, 'id', 'id', 'bigint', 'PK'),
- (2, 'name', '名称', 'character varying', ''),
- ]
- def test_render_schema_md_no_mask_dict_blank_column():
- rows = [
- (1, 'id', 'id', 'bigint', 'PK'),
- (2, 'user_name', '用户名', 'character varying', ''),
- (3, 'create_time', None, 'timestamp without time zone', ''),
- ]
- out = GEN.render_schema_md(rows)
- assert '| 序号 | 字段名 | 中文名 | 数据类型 | 主键标识 | 脱敏类型 |' in out
- assert '| 1 | `id` | id | bigint | PK | |' in out
- assert '| 2 | `user_name` | 用户名 | character varying | | |' in out
- assert '| 3 | `create_time` | | timestamp without time zone | | |' in out
- def test_render_schema_md_with_mask_dict():
- rows = [
- (1, 'id', 'id', 'bigint', 'PK'),
- (2, 'user_name', '用户名', 'character varying', ''),
- (3, 'phone', '手机号', 'character varying', ''),
- (4, 'merchant_open', '商家代开', 'smallint', ''),
- ]
- mask_dict = {'phone': 'md5', 'merchant_open': 'trim', 'user_name': 'mask_middle'}
- out = GEN.render_schema_md(rows, mask_dict)
- assert '| 1 | `id` | id | bigint | PK | |' in out
- assert '| 2 | `user_name` | 用户名 | character varying | | mask_middle |' in out
- assert '| 3 | `phone` | 手机号 | character varying | | md5 |' in out
- assert '| 4 | `merchant_open` | 商家代开 | smallint | | trim |' in out
- def test_load_mask_conf_basic(tmp_path):
- p = tmp_path / 't.mask.ini'
- p.write_text(
- '[mask]\n'
- 'payment_num = trim\n'
- 'phone = md5\n'
- 'name = mask_middle\n',
- encoding='utf-8',
- )
- assert GEN.load_mask_conf(str(p)) == {
- 'payment_num': 'trim',
- 'phone': 'md5',
- 'name': 'mask_middle',
- }
- def test_load_mask_conf_no_section_returns_empty(tmp_path):
- p = tmp_path / 't.mask.ini'
- p.write_text('[other]\nfoo = bar\n', encoding='utf-8')
- assert GEN.load_mask_conf(str(p)) == {}
- def test_load_mask_conf_missing_file_raises():
- with pytest.raises(FileNotFoundError, match='mask 配置不存在'):
- GEN.load_mask_conf('/nonexistent/path/x.mask.ini')
- def test_resolve_to_project_root_absolute_pass_through():
- abs_path = os.path.abspath('/abs/path/x.ini')
- assert GEN._resolve_to_project_root(abs_path) == abs_path
- def test_resolve_to_project_root_relative_joins_project_root():
- result = GEN._resolve_to_project_root('jobs/raw/trd/x.mask.ini')
- assert os.path.isabs(result)
- assert result.endswith('jobs/raw/trd/x.mask.ini')
- def test_render_template_empty_pk():
- out = GEN.render_template(
- ds_ref='postgresql/prod-hobby', database='db', schema='public',
- table='t', columns=[('a', '')], pk='',
- )
- assert 'splitPk = \n' in out
- def _patch_main_dependencies(monkeypatch):
- """共享 mock:让 main() 不连真 PG / 真 datasource。"""
- fake_ds = MagicMock()
- fake_ds.parse.return_value = {
- GEN.DS_POSTGRE_SQL_JDBC_URL: 'jdbc:postgresql://10.0.0.1:5432/mydb',
- 'username': 'u',
- 'password': 'p',
- }
- monkeypatch.setattr(GEN, 'resolve_datasource', lambda ref: fake_ds)
- fake_conn = MagicMock()
- fake_cur = fake_conn.cursor.return_value
- fake_cur.fetchall.return_value = [
- (1, 'id', 'id', 'bigint', 'PK'),
- (2, 'name', '名称', 'character varying', ''),
- ]
- fake_pg8000 = MagicMock()
- fake_pg8000.dbapi.connect.return_value = fake_conn
- monkeypatch.setitem(sys.modules, 'pg8000', fake_pg8000)
- monkeypatch.setitem(sys.modules, 'pg8000.dbapi', fake_pg8000.dbapi)
- def test_main_stdout_only_when_no_o(monkeypatch, capsys):
- _patch_main_dependencies(monkeypatch)
- monkeypatch.setattr(sys, 'argv', [
- 'datax-sync-template-gen.py',
- '-ds', 'postgresql/prod-hobby',
- '-t', 'public.users',
- ])
- GEN.main()
- captured = capsys.readouterr()
- assert '| 序号 | 字段名 |' in captured.out
- assert '[reader]' in captured.out
- assert '已写入' not in captured.err
- def test_main_stdout_and_disk_when_o_with_dir(monkeypatch, capsys, tmp_path):
- _patch_main_dependencies(monkeypatch)
- out_dir = tmp_path / 'out'
- monkeypatch.setattr(sys, 'argv', [
- 'datax-sync-template-gen.py',
- '-ds', 'postgresql/prod-hobby',
- '-t', 'public.users',
- '-o', str(out_dir),
- ])
- GEN.main()
- captured = capsys.readouterr()
- assert '| 序号 | 字段名 |' in captured.out
- assert '[reader]' in captured.out
- assert '已写入' in captured.err
- assert (out_dir / 'users.md').exists()
- assert (out_dir / 'users.ini').exists()
- def test_main_stdout_and_disk_when_o_no_value(monkeypatch, capsys, tmp_path):
- _patch_main_dependencies(monkeypatch)
- monkeypatch.setattr(GEN, 'WORKSPACE_DEFAULT', str(tmp_path / 'workspace'))
- monkeypatch.setattr(sys, 'argv', [
- 'datax-sync-template-gen.py',
- '-ds', 'postgresql/prod-hobby',
- '-t', 'public.users',
- '-o',
- ])
- GEN.main()
- captured = capsys.readouterr()
- assert '| 序号 | 字段名 |' in captured.out
- assert '[reader]' in captured.out
- assert '已写入' in captured.err
|