# -*- coding:utf-8 -*- """ hive-ddl-gen 渲染 / sync ini 解析 / writer.path 反推单测。 不连真 PG(fetch_column_comments 走 mock conn)。 脚本路径含连字符,用 importlib.util 动态加载为模块。 """ import importlib.util import os import sys from unittest.mock import MagicMock import pytest PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) SCRIPT_PATH = os.path.join(PROJECT_ROOT, 'bin', 'hive-ddl-gen.py') def _load_script(): spec = importlib.util.spec_from_file_location('hive_ddl_gen', SCRIPT_PATH) mod = importlib.util.module_from_spec(spec) sys.modules['hive_ddl_gen'] = mod spec.loader.exec_module(mod) return mod GEN = _load_script() def test_reverse_table_name_basic(): assert GEN.reverse_table_name( '/user/hive/warehouse/raw.db/raw_trd_card_group_order_info_inc_d/dt=${dt}/' ) == 'raw_trd_card_group_order_info_inc_d' def test_reverse_table_name_no_trailing_slash(): assert GEN.reverse_table_name( '/user/hive/warehouse/raw.db/foo/dt=20260429' ) == 'foo' def test_reverse_table_name_missing_dt_segment_raises(): with pytest.raises(ValueError, match='dt='): GEN.reverse_table_name('/user/hive/warehouse/raw.db/foo/') def test_parse_sync_ini_basic(tmp_path): p = tmp_path / 'sync.ini' p.write_text( '[reader]\n' 'dataSource = postgresql/prd-poyee\n' 'table = public.users\n' 'column = id, name, create_time\n' '\n' '[writer]\n' 'path = /user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/\n', encoding='utf-8', ) spec = GEN.parse_sync_ini(str(p)) assert spec == { 'ds_ref': 'postgresql/prd-poyee', 'schema': 'public', 'table': 'users', 'columns': ['id', 'name', 'create_time'], 'writer_path': '/user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/', } def test_parse_sync_ini_missing_file_raises(): with pytest.raises(FileNotFoundError, match='sync ini 不存在'): GEN.parse_sync_ini('/nonexistent/x.ini') def test_parse_sync_ini_missing_writer_section_raises(tmp_path): p = tmp_path / 'bad.ini' p.write_text( '[reader]\ndataSource = a/b\ntable = s.t\ncolumn = id\n', encoding='utf-8', ) with pytest.raises(KeyError, match='\\[writer\\]'): GEN.parse_sync_ini(str(p)) def test_parse_sync_ini_table_without_dot_raises(tmp_path): p = tmp_path / 'bad.ini' p.write_text( '[reader]\ndataSource = a/b\ntable = users\ncolumn = id\n' '[writer]\npath = /x/dt=${dt}/\n', encoding='utf-8', ) with pytest.raises(ValueError, match='schema.table'): GEN.parse_sync_ini(str(p)) def test_parse_sync_ini_empty_column_raises(tmp_path): p = tmp_path / 'bad.ini' p.write_text( '[reader]\ndataSource = a/b\ntable = s.t\ncolumn =\n' '[writer]\npath = /x/dt=${dt}/\n', encoding='utf-8', ) with pytest.raises(ValueError, match='column'): GEN.parse_sync_ini(str(p)) def test_render_raw_ddl_field_order_follows_columns(): columns = ['id', 'name', 'create_time'] comments = {'id': 'id', 'name': '姓名', 'create_time': '创建时间'} out = GEN.render_raw_ddl('raw_usr_users_inc_d', columns, comments) assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in out id_idx = out.index("'id'") name_idx = out.index("'姓名'") ct_idx = out.index("'创建时间'") assert id_idx < name_idx < ct_idx assert 'PARTITIONED BY (dt STRING)' in out assert 'STORED AS ORC' in out assert "LOCATION '/user/hive/warehouse/raw.db/raw_usr_users_inc_d';" in out def test_render_raw_ddl_missing_comment_blank(): out = GEN.render_raw_ddl('t', ['col_no_cmt'], {}) assert "col_no_cmt" in out assert "COMMENT ''" in out def test_render_raw_ddl_single_quote_in_comment_escaped(): out = GEN.render_raw_ddl('t', ['col'], {'col': "don't"}) assert "COMMENT 'don''t'" in out def test_render_raw_ddl_last_column_no_trailing_comma(): out = GEN.render_raw_ddl('t', ['a', 'b'], {}) field_lines = [l for l in out.split('\n') if l.startswith(' ')] assert len(field_lines) == 2 assert field_lines[0].rstrip().endswith(',') assert not field_lines[1].rstrip().endswith(',') # 逗号在 COMMENT 'xxx' 末尾,不在 STRING 后 assert 'STRING,' not in out assert "COMMENT ''," in field_lines[0] def test_render_raw_ddl_external_and_drop(): out = GEN.render_raw_ddl('t', ['a'], {}) assert 'DROP TABLE IF EXISTS raw.t;' in out assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.t (' in out def _patch_main_dependencies(monkeypatch, tmp_path): """共享 mock:让 main() 不连真 PG / 真 datasource。""" sync_ini = tmp_path / 'sync.ini' sync_ini.write_text( '[reader]\n' 'dataSource = postgresql/prd-poyee\n' 'table = public.users\n' 'column = id, name\n' '\n' '[writer]\n' 'path = /user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/\n', encoding='utf-8', ) fake_ds = MagicMock() fake_ds.parse.return_value = { GEN.SYNC_GEN.DS_POSTGRE_SQL_JDBC_URL: 'jdbc:postgresql://10.0.0.1:5432/mydb', 'username': 'u', 'password': 'p', } monkeypatch.setattr(GEN.SYNC_GEN, 'resolve_datasource', lambda ref: fake_ds) fake_conn = MagicMock() fake_cur = fake_conn.cursor.return_value fake_cur.fetchall.return_value = [ (1, 'id', 'id', 'bigint', 'PK'), (2, 'name', '姓名', 'character varying', ''), ] fake_pg8000 = MagicMock() fake_pg8000.dbapi.connect.return_value = fake_conn monkeypatch.setitem(sys.modules, 'pg8000', fake_pg8000) monkeypatch.setitem(sys.modules, 'pg8000.dbapi', fake_pg8000.dbapi) return str(sync_ini) def test_main_stdout_only_when_no_o(monkeypatch, capsys, tmp_path): sync_ini = _patch_main_dependencies(monkeypatch, tmp_path) monkeypatch.setattr(sys, 'argv', [ 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, ]) GEN.main() captured = capsys.readouterr() assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out assert "'姓名'" in captured.out assert '已写入' not in captured.err def test_main_stdout_and_disk_when_o_with_dir(monkeypatch, capsys, tmp_path): sync_ini = _patch_main_dependencies(monkeypatch, tmp_path) out_dir = tmp_path / 'out' monkeypatch.setattr(sys, 'argv', [ 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, '-o', str(out_dir), ]) GEN.main() captured = capsys.readouterr() assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out assert '已写入' in captured.err assert (out_dir / 'raw_usr_users_inc_d_create.sql').exists() def test_main_stdout_and_disk_when_o_no_value(monkeypatch, capsys, tmp_path): sync_ini = _patch_main_dependencies(monkeypatch, tmp_path) monkeypatch.setattr(GEN, 'WORKSPACE_DEFAULT', str(tmp_path / 'workspace')) monkeypatch.setattr(sys, 'argv', [ 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, '-o', ]) GEN.main() captured = capsys.readouterr() assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out assert '已写入' in captured.err # ---------------------------------------------------------------------------- # ods 层测试 # ---------------------------------------------------------------------------- def test_normalize_pg_type_strips_paren_args(): assert GEN.normalize_pg_type('numeric(12,2)') == 'numeric' assert GEN.normalize_pg_type('character varying(64)') == 'character varying' def test_normalize_pg_type_strips_timezone_suffix(): assert GEN.normalize_pg_type('timestamp(6) without time zone') == 'timestamp' assert GEN.normalize_pg_type('timestamp with time zone') == 'timestamp' def test_normalize_pg_type_lower_strip(): assert GEN.normalize_pg_type(' BIGINT ') == 'bigint' def test_normalize_pg_type_passthrough_simple(): assert GEN.normalize_pg_type('text') == 'text' assert GEN.normalize_pg_type('boolean') == 'boolean' def test_load_type_mapping_basic(tmp_path): p = tmp_path / 'pg-to-hive-type.ini' p.write_text( '[mapping]\ninteger = BIGINT\ntext = STRING\n', encoding='utf-8') m = GEN.load_type_mapping(str(p)) assert m == {'integer': 'BIGINT', 'text': 'STRING'} def test_load_type_mapping_missing_file_raises(): with pytest.raises(FileNotFoundError, match='类型映射 conf 不存在'): GEN.load_type_mapping('/nonexistent.ini') def test_load_type_mapping_missing_section_raises(tmp_path): p = tmp_path / 'bad.ini' p.write_text('[other]\nx = y\n', encoding='utf-8') with pytest.raises(KeyError, match='\\[mapping\\]'): GEN.load_type_mapping(str(p)) def test_map_pg_to_hive_hits(): m = {'integer': 'BIGINT', 'numeric': 'DECIMAL(20,4)', 'timestamp': 'TIMESTAMP'} assert GEN.map_pg_to_hive('integer', m) == 'BIGINT' assert GEN.map_pg_to_hive('numeric(12,2)', m) == 'DECIMAL(20,4)' assert GEN.map_pg_to_hive('timestamp(6) without time zone', m) == 'TIMESTAMP' def test_map_pg_to_hive_miss_raises(): with pytest.raises(KeyError, match='不在 conf'): GEN.map_pg_to_hive('xml', {'integer': 'BIGINT'}) def test_reverse_ods_table_name_basic(): assert GEN.reverse_ods_table_name('raw_trd_card_group_info_inc_d') == 'ods_trd_card_group_info_inc_d' def test_reverse_ods_table_name_no_raw_prefix_raises(): with pytest.raises(ValueError, match="raw_"): GEN.reverse_ods_table_name('ods_x') def _ods_type_mapping(): return { 'integer': 'BIGINT', 'bigint': 'BIGINT', 'smallint': 'BIGINT', 'numeric': 'DECIMAL(20,4)', 'character varying': 'STRING', 'text': 'STRING', 'timestamp': 'TIMESTAMP', 'boolean': 'BOOLEAN', } def test_render_ods_ddl_field_types_mapped(): columns = ['id', 'amount', 'create_time', 'is_active', 'name'] full_rows = [ (1, 'id', 'id', 'bigint', 'PK'), (2, 'amount', '金额', 'numeric(12,2)', ''), (3, 'create_time', '创建时间', 'timestamp(6) without time zone', ''), (4, 'is_active', '是否启用', 'boolean', ''), (5, 'name', '姓名', 'character varying(64)', ''), ] out = GEN.render_ods_ddl( 'raw_usr_users_inc_d', columns, full_rows, _ods_type_mapping()) assert 'CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_usr_users_inc_d (' in out assert 'BIGINT' in out assert 'DECIMAL(20,4)' in out assert 'TIMESTAMP' in out assert 'BOOLEAN' in out assert 'STRING' in out assert "LOCATION '/user/hive/warehouse/ods.db/ods_usr_users_inc_d';" in out def test_render_ods_ddl_appends_is_deleted_at_end(): columns = ['id', 'name'] full_rows = [ (1, 'id', 'id', 'bigint', 'PK'), (2, 'name', '姓名', 'character varying', ''), ] out = GEN.render_ods_ddl( 'raw_usr_x_inc_d', columns, full_rows, _ods_type_mapping()) name_idx = out.index("'姓名'") is_deleted_idx = out.index('is_deleted') assert name_idx < is_deleted_idx assert 'is_deleted' in out assert 'BOOLEAN' in out def test_render_ods_ddl_no_tech_fields(): columns = ['id'] full_rows = [(1, 'id', 'id', 'bigint', 'PK')] out = GEN.render_ods_ddl( 'raw_x_inc_d', columns, full_rows, _ods_type_mapping()) assert 'etl_time' not in out assert 'src_sys' not in out assert 'src_tbl' not in out def test_render_ods_ddl_partition_orc_external(): out = GEN.render_ods_ddl( 'raw_x_inc_d', ['id'], [(1, 'id', '', 'bigint', 'PK')], _ods_type_mapping()) assert 'PARTITIONED BY (dt STRING)' in out assert 'STORED AS ORC' in out assert 'CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_x_inc_d (' in out assert 'DROP TABLE IF EXISTS ods.ods_x_inc_d;' in out def test_render_ods_ddl_missing_column_pg_meta_raises(): """sync ini reader.column 里有但 PG 元数据没有的字段,应该报错。""" with pytest.raises(KeyError, match='元数据缺失'): GEN.render_ods_ddl( 'raw_x_inc_d', ['id', 'ghost'], [(1, 'id', '', 'bigint', 'PK')], _ods_type_mapping()) def test_main_l_ods_writes_ods_ddl_with_ods_filename(monkeypatch, capsys, tmp_path): sync_ini = _patch_main_dependencies(monkeypatch, tmp_path) out_dir = tmp_path / 'out' type_conf = tmp_path / 'pg-to-hive-type.ini' type_conf.write_text( '[mapping]\nbigint = BIGINT\ncharacter varying = STRING\n', encoding='utf-8', ) # 让 main 读这个 tmp 的 type conf 而不是项目 conf monkeypatch.setattr(GEN, 'project_root', str(tmp_path)) (tmp_path / 'conf').mkdir() (tmp_path / 'conf' / 'pg-to-hive-type.ini').write_text( type_conf.read_text(encoding='utf-8'), encoding='utf-8') monkeypatch.setattr(sys, 'argv', [ 'hive-ddl-gen.py', '-l', 'ods', '-ini', sync_ini, '-o', str(out_dir), ]) GEN.main() captured = capsys.readouterr() assert 'CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_usr_users_inc_d (' in captured.out assert (out_dir / 'ods_usr_users_inc_d_create.sql').exists()