# -*- coding:utf-8 -*- """ datax-sync-template-gen 模板渲染 + JDBC URL 解析单测。 不连真 PG(query_columns / query_primary_key 走 mock conn)。 脚本路径含连字符,用 importlib.util 动态加载为模块。 """ import importlib.util import os import sys from unittest.mock import MagicMock import pytest PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) SCRIPT_PATH = os.path.join(PROJECT_ROOT, 'bin', 'datax-sync-template-gen.py') def _load_script(): spec = importlib.util.spec_from_file_location('datax_sync_template_gen', SCRIPT_PATH) mod = importlib.util.module_from_spec(spec) sys.modules['datax_sync_template_gen'] = mod spec.loader.exec_module(mod) return mod GEN = _load_script() def test_parse_jdbc_url_with_port(): host, port, db = GEN.parse_jdbc_url('jdbc:postgresql://10.0.0.1:5433/hobby_stocks') assert host == '10.0.0.1' assert port == 5433 assert db == 'hobby_stocks' def test_parse_jdbc_url_default_port(): host, port, db = GEN.parse_jdbc_url('jdbc:postgresql://pg.example.com/mydb') assert host == 'pg.example.com' assert port == 5432 assert db == 'mydb' def test_parse_jdbc_url_invalid(): with pytest.raises(ValueError, match='无法解析'): GEN.parse_jdbc_url('mysql://10.0.0.1:3306/foo') def test_query_columns_returns_name_and_comment(): conn = MagicMock() cur = conn.cursor.return_value cur.fetchall.return_value = [ ('id', 'id'), ('user_id', '用户id'), ('create_time', None), # 无注释 ] cols = GEN.query_columns(conn, 'public', 'orders') assert cols == [('id', 'id'), ('user_id', '用户id'), ('create_time', '')] def test_query_columns_empty_raises(): conn = MagicMock() cur = conn.cursor.return_value cur.fetchall.return_value = [] with pytest.raises(ValueError, match='表不存在或无字段'): GEN.query_columns(conn, 'public', 'no_such_table') def test_query_primary_key_single(): conn = MagicMock() cur = conn.cursor.return_value cur.fetchall.return_value = [('id',)] assert GEN.query_primary_key(conn, 'public', 'orders') == 'id' def test_query_primary_key_composite_returns_empty(): conn = MagicMock() cur = conn.cursor.return_value cur.fetchall.return_value = [('id1',), ('id2',)] assert GEN.query_primary_key(conn, 'public', 'orders') == '' def test_query_primary_key_none_returns_empty(): conn = MagicMock() cur = conn.cursor.return_value cur.fetchall.return_value = [] assert GEN.query_primary_key(conn, 'public', 'orders') == '' def test_render_template_includes_required_fields(): columns = [('id', 'id'), ('name', '姓名'), ('create_time', '创建时间')] out = GEN.render_template( ds_ref='postgresql/prod-hobby', database='hobby_stocks', schema='public', table='users', columns=columns, pk='id', ) assert 'dataSource = postgresql/prod-hobby' in out assert 'database = hobby_stocks' in out assert 'table = public.users' in out assert 'column = id,name,create_time' in out assert 'splitPk = id' in out assert "where = update_time >= '${start_date}' AND update_time < '${stop_date}'" in out assert 'path = /user/hive/warehouse/raw.db/users_TODO_d/dt=${dt}/' in out assert 'fileName = users_TODO_d' in out def test_render_template_empty_pk(): out = GEN.render_template( ds_ref='postgresql/prod-hobby', database='db', schema='public', table='t', columns=[('a', '')], pk='', ) assert 'splitPk = \n' in out