| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- # -*- coding:utf-8 -*-
- """
- datax-sync-template-gen 模板渲染 + JDBC URL 解析单测。
- 不连真 PG(query_columns / query_primary_key 走 mock conn)。
- 脚本路径含连字符,用 importlib.util 动态加载为模块。
- """
- import importlib.util
- import os
- import sys
- from unittest.mock import MagicMock
- import pytest
- PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
- SCRIPT_PATH = os.path.join(PROJECT_ROOT, 'bin', 'datax-sync-template-gen.py')
- def _load_script():
- spec = importlib.util.spec_from_file_location('datax_sync_template_gen', SCRIPT_PATH)
- mod = importlib.util.module_from_spec(spec)
- sys.modules['datax_sync_template_gen'] = mod
- spec.loader.exec_module(mod)
- return mod
- GEN = _load_script()
- def test_parse_jdbc_url_with_port():
- host, port, db = GEN.parse_jdbc_url('jdbc:postgresql://10.0.0.1:5433/hobby_stocks')
- assert host == '10.0.0.1'
- assert port == 5433
- assert db == 'hobby_stocks'
- def test_parse_jdbc_url_default_port():
- host, port, db = GEN.parse_jdbc_url('jdbc:postgresql://pg.example.com/mydb')
- assert host == 'pg.example.com'
- assert port == 5432
- assert db == 'mydb'
- def test_parse_jdbc_url_invalid():
- with pytest.raises(ValueError, match='无法解析'):
- GEN.parse_jdbc_url('mysql://10.0.0.1:3306/foo')
- def test_query_columns_returns_name_and_comment():
- conn = MagicMock()
- cur = conn.cursor.return_value
- cur.fetchall.return_value = [
- ('id', 'id'),
- ('user_id', '用户id'),
- ('create_time', None), # 无注释
- ]
- cols = GEN.query_columns(conn, 'public', 'orders')
- assert cols == [('id', 'id'), ('user_id', '用户id'), ('create_time', '')]
- def test_query_columns_empty_raises():
- conn = MagicMock()
- cur = conn.cursor.return_value
- cur.fetchall.return_value = []
- with pytest.raises(ValueError, match='表不存在或无字段'):
- GEN.query_columns(conn, 'public', 'no_such_table')
- def test_query_primary_key_single():
- conn = MagicMock()
- cur = conn.cursor.return_value
- cur.fetchall.return_value = [('id',)]
- assert GEN.query_primary_key(conn, 'public', 'orders') == 'id'
- def test_query_primary_key_composite_returns_empty():
- conn = MagicMock()
- cur = conn.cursor.return_value
- cur.fetchall.return_value = [('id1',), ('id2',)]
- assert GEN.query_primary_key(conn, 'public', 'orders') == ''
- def test_query_primary_key_none_returns_empty():
- conn = MagicMock()
- cur = conn.cursor.return_value
- cur.fetchall.return_value = []
- assert GEN.query_primary_key(conn, 'public', 'orders') == ''
- def test_render_template_includes_required_fields():
- columns = [('id', 'id'), ('name', '姓名'), ('create_time', '创建时间')]
- out = GEN.render_template(
- ds_ref='postgresql/prod-hobby',
- database='hobby_stocks',
- schema='public',
- table='users',
- columns=columns,
- pk='id',
- )
- assert 'dataSource = postgresql/prod-hobby' in out
- assert 'database = hobby_stocks' in out
- assert 'table = public.users' in out
- assert 'column = id,name,create_time' in out
- assert 'splitPk = id' in out
- assert "where = update_time >= '${start_date}' AND update_time < '${stop_date}'" in out
- assert 'path = /user/hive/warehouse/raw.db/users_TODO_d/dt=${dt}/' in out
- assert 'fileName = users_TODO_d' in out
- def test_render_template_empty_pk():
- out = GEN.render_template(
- ds_ref='postgresql/prod-hobby', database='db', schema='public',
- table='t', columns=[('a', '')], pk='',
- )
- assert 'splitPk = \n' in out
|