# -*- coding:utf-8 -*- """ hive-ddl-gen 渲染 / sync ini 解析 / writer.path 反推单测。 不连真 PG(fetch_column_comments 走 mock conn)。 脚本路径含连字符,用 importlib.util 动态加载为模块。 """ import importlib.util import os import sys from unittest.mock import MagicMock import pytest PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) SCRIPT_PATH = os.path.join(PROJECT_ROOT, 'bin', 'hive-ddl-gen.py') def _load_script(): spec = importlib.util.spec_from_file_location('hive_ddl_gen', SCRIPT_PATH) mod = importlib.util.module_from_spec(spec) sys.modules['hive_ddl_gen'] = mod spec.loader.exec_module(mod) return mod GEN = _load_script() def test_reverse_table_name_basic(): assert GEN.reverse_table_name( '/user/hive/warehouse/raw.db/raw_trd_card_group_order_info_inc_d/dt=${dt}/' ) == 'raw_trd_card_group_order_info_inc_d' def test_reverse_table_name_no_trailing_slash(): assert GEN.reverse_table_name( '/user/hive/warehouse/raw.db/foo/dt=20260429' ) == 'foo' def test_reverse_table_name_missing_dt_segment_raises(): with pytest.raises(ValueError, match='dt='): GEN.reverse_table_name('/user/hive/warehouse/raw.db/foo/') def test_parse_sync_ini_basic(tmp_path): p = tmp_path / 'sync.ini' p.write_text( '[reader]\n' 'dataSource = postgresql/prd-poyee\n' 'table = public.users\n' 'column = id, name, create_time\n' '\n' '[writer]\n' 'path = /user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/\n', encoding='utf-8', ) spec = GEN.parse_sync_ini(str(p)) assert spec == { 'ds_ref': 'postgresql/prd-poyee', 'schema': 'public', 'table': 'users', 'columns': ['id', 'name', 'create_time'], 'writer_path': '/user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/', } def test_parse_sync_ini_missing_file_raises(): with pytest.raises(FileNotFoundError, match='sync ini 不存在'): GEN.parse_sync_ini('/nonexistent/x.ini') def test_parse_sync_ini_missing_writer_section_raises(tmp_path): p = tmp_path / 'bad.ini' p.write_text( '[reader]\ndataSource = a/b\ntable = s.t\ncolumn = id\n', encoding='utf-8', ) with pytest.raises(KeyError, match='\\[writer\\]'): GEN.parse_sync_ini(str(p)) def test_parse_sync_ini_table_without_dot_raises(tmp_path): p = tmp_path / 'bad.ini' p.write_text( '[reader]\ndataSource = a/b\ntable = users\ncolumn = id\n' '[writer]\npath = /x/dt=${dt}/\n', encoding='utf-8', ) with pytest.raises(ValueError, match='schema.table'): GEN.parse_sync_ini(str(p)) def test_parse_sync_ini_empty_column_raises(tmp_path): p = tmp_path / 'bad.ini' p.write_text( '[reader]\ndataSource = a/b\ntable = s.t\ncolumn =\n' '[writer]\npath = /x/dt=${dt}/\n', encoding='utf-8', ) with pytest.raises(ValueError, match='column'): GEN.parse_sync_ini(str(p)) def test_render_raw_ddl_field_order_follows_columns(): columns = ['id', 'name', 'create_time'] comments = {'id': 'id', 'name': '姓名', 'create_time': '创建时间'} out = GEN.render_raw_ddl('raw_usr_users_inc_d', columns, comments) assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in out id_idx = out.index("'id'") name_idx = out.index("'姓名'") ct_idx = out.index("'创建时间'") assert id_idx < name_idx < ct_idx assert 'PARTITIONED BY (dt STRING)' in out assert 'STORED AS ORC' in out assert "LOCATION '/user/hive/warehouse/raw.db/raw_usr_users_inc_d';" in out def test_render_raw_ddl_missing_comment_blank(): out = GEN.render_raw_ddl('t', ['col_no_cmt'], {}) assert "col_no_cmt" in out assert "COMMENT ''" in out def test_render_raw_ddl_single_quote_in_comment_escaped(): out = GEN.render_raw_ddl('t', ['col'], {'col': "don't"}) assert "COMMENT 'don''t'" in out def test_render_raw_ddl_last_column_no_trailing_comma(): out = GEN.render_raw_ddl('t', ['a', 'b'], {}) field_lines = [l for l in out.split('\n') if l.startswith(' ')] assert len(field_lines) == 2 assert field_lines[0].rstrip().endswith(',') assert not field_lines[1].rstrip().endswith(',') # 逗号在 COMMENT 'xxx' 末尾,不在 STRING 后 assert 'STRING,' not in out assert "COMMENT ''," in field_lines[0] def test_render_raw_ddl_external_and_drop(): out = GEN.render_raw_ddl('t', ['a'], {}) assert 'DROP TABLE IF EXISTS raw.t;' in out assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.t (' in out def _patch_main_dependencies(monkeypatch, tmp_path): """共享 mock:让 main() 不连真 PG / 真 datasource。""" sync_ini = tmp_path / 'sync.ini' sync_ini.write_text( '[reader]\n' 'dataSource = postgresql/prd-poyee\n' 'table = public.users\n' 'column = id, name\n' '\n' '[writer]\n' 'path = /user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/\n', encoding='utf-8', ) fake_ds = MagicMock() fake_ds.parse.return_value = { GEN.SYNC_GEN.DS_POSTGRE_SQL_JDBC_URL: 'jdbc:postgresql://10.0.0.1:5432/mydb', 'username': 'u', 'password': 'p', } monkeypatch.setattr(GEN.SYNC_GEN, 'resolve_datasource', lambda ref: fake_ds) fake_conn = MagicMock() fake_cur = fake_conn.cursor.return_value fake_cur.fetchall.return_value = [ (1, 'id', 'id', 'bigint', 'PK'), (2, 'name', '姓名', 'character varying', ''), ] fake_pg8000 = MagicMock() fake_pg8000.dbapi.connect.return_value = fake_conn monkeypatch.setitem(sys.modules, 'pg8000', fake_pg8000) monkeypatch.setitem(sys.modules, 'pg8000.dbapi', fake_pg8000.dbapi) return str(sync_ini) def test_main_l_ods_raises_not_implemented(monkeypatch, tmp_path): sync_ini = _patch_main_dependencies(monkeypatch, tmp_path) monkeypatch.setattr(sys, 'argv', [ 'hive-ddl-gen.py', '-l', 'ods', '-ini', sync_ini, ]) with pytest.raises(NotImplementedError, match='ods'): GEN.main() def test_main_stdout_only_when_no_o(monkeypatch, capsys, tmp_path): sync_ini = _patch_main_dependencies(monkeypatch, tmp_path) monkeypatch.setattr(sys, 'argv', [ 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, ]) GEN.main() captured = capsys.readouterr() assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out assert "'姓名'" in captured.out assert '已写入' not in captured.err def test_main_stdout_and_disk_when_o_with_dir(monkeypatch, capsys, tmp_path): sync_ini = _patch_main_dependencies(monkeypatch, tmp_path) out_dir = tmp_path / 'out' monkeypatch.setattr(sys, 'argv', [ 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, '-o', str(out_dir), ]) GEN.main() captured = capsys.readouterr() assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out assert '已写入' in captured.err assert (out_dir / 'raw_usr_users_inc_d_create.sql').exists() def test_main_stdout_and_disk_when_o_no_value(monkeypatch, capsys, tmp_path): sync_ini = _patch_main_dependencies(monkeypatch, tmp_path) monkeypatch.setattr(GEN, 'WORKSPACE_DEFAULT', str(tmp_path / 'workspace')) monkeypatch.setattr(sys, 'argv', [ 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, '-o', ]) GEN.main() captured = capsys.readouterr() assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out assert '已写入' in captured.err