| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377 |
- # -*- coding:utf-8 -*-
- """
- hive-ddl-gen 渲染 / sync ini 解析 / writer.path 反推单测。
- 不连真 PG(fetch_column_comments 走 mock conn)。
- 脚本路径含连字符,用 importlib.util 动态加载为模块。
- """
- import importlib.util
- import os
- import sys
- from unittest.mock import MagicMock
- import pytest
- PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
- SCRIPT_PATH = os.path.join(PROJECT_ROOT, 'bin', 'hive-ddl-gen.py')
- def _load_script():
- spec = importlib.util.spec_from_file_location('hive_ddl_gen', SCRIPT_PATH)
- mod = importlib.util.module_from_spec(spec)
- sys.modules['hive_ddl_gen'] = mod
- spec.loader.exec_module(mod)
- return mod
- GEN = _load_script()
- def test_reverse_table_name_basic():
- assert GEN.reverse_table_name(
- '/user/hive/warehouse/raw.db/raw_trd_card_group_order_info_inc_d/dt=${dt}/'
- ) == 'raw_trd_card_group_order_info_inc_d'
- def test_reverse_table_name_no_trailing_slash():
- assert GEN.reverse_table_name(
- '/user/hive/warehouse/raw.db/foo/dt=20260429'
- ) == 'foo'
- def test_reverse_table_name_missing_dt_segment_raises():
- with pytest.raises(ValueError, match='dt='):
- GEN.reverse_table_name('/user/hive/warehouse/raw.db/foo/')
- def test_parse_sync_ini_basic(tmp_path):
- p = tmp_path / 'sync.ini'
- p.write_text(
- '[reader]\n'
- 'dataSource = postgresql/prd-poyee\n'
- 'table = public.users\n'
- 'column = id, name, create_time\n'
- '\n'
- '[writer]\n'
- 'path = /user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/\n',
- encoding='utf-8',
- )
- spec = GEN.parse_sync_ini(str(p))
- assert spec == {
- 'ds_ref': 'postgresql/prd-poyee',
- 'schema': 'public',
- 'table': 'users',
- 'columns': ['id', 'name', 'create_time'],
- 'writer_path': '/user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/',
- }
- def test_parse_sync_ini_missing_file_raises():
- with pytest.raises(FileNotFoundError, match='sync ini 不存在'):
- GEN.parse_sync_ini('/nonexistent/x.ini')
- def test_parse_sync_ini_missing_writer_section_raises(tmp_path):
- p = tmp_path / 'bad.ini'
- p.write_text(
- '[reader]\ndataSource = a/b\ntable = s.t\ncolumn = id\n',
- encoding='utf-8',
- )
- with pytest.raises(KeyError, match='\\[writer\\]'):
- GEN.parse_sync_ini(str(p))
- def test_parse_sync_ini_table_without_dot_raises(tmp_path):
- p = tmp_path / 'bad.ini'
- p.write_text(
- '[reader]\ndataSource = a/b\ntable = users\ncolumn = id\n'
- '[writer]\npath = /x/dt=${dt}/\n',
- encoding='utf-8',
- )
- with pytest.raises(ValueError, match='schema.table'):
- GEN.parse_sync_ini(str(p))
- def test_parse_sync_ini_empty_column_raises(tmp_path):
- p = tmp_path / 'bad.ini'
- p.write_text(
- '[reader]\ndataSource = a/b\ntable = s.t\ncolumn =\n'
- '[writer]\npath = /x/dt=${dt}/\n',
- encoding='utf-8',
- )
- with pytest.raises(ValueError, match='column'):
- GEN.parse_sync_ini(str(p))
- def test_render_raw_ddl_field_order_follows_columns():
- columns = ['id', 'name', 'create_time']
- comments = {'id': 'id', 'name': '姓名', 'create_time': '创建时间'}
- out = GEN.render_raw_ddl('raw_usr_users_inc_d', columns, comments)
- assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in out
- id_idx = out.index("'id'")
- name_idx = out.index("'姓名'")
- ct_idx = out.index("'创建时间'")
- assert id_idx < name_idx < ct_idx
- assert 'PARTITIONED BY (dt STRING)' in out
- assert 'STORED AS ORC' in out
- assert "LOCATION '/user/hive/warehouse/raw.db/raw_usr_users_inc_d';" in out
- def test_render_raw_ddl_missing_comment_blank():
- out = GEN.render_raw_ddl('t', ['col_no_cmt'], {})
- assert "col_no_cmt" in out
- assert "COMMENT ''" in out
- def test_render_raw_ddl_single_quote_in_comment_escaped():
- out = GEN.render_raw_ddl('t', ['col'], {'col': "don't"})
- assert "COMMENT 'don''t'" in out
- def test_render_raw_ddl_last_column_no_trailing_comma():
- out = GEN.render_raw_ddl('t', ['a', 'b'], {})
- field_lines = [l for l in out.split('\n') if l.startswith(' ')]
- assert len(field_lines) == 2
- assert field_lines[0].rstrip().endswith(',')
- assert not field_lines[1].rstrip().endswith(',')
- # 逗号在 COMMENT 'xxx' 末尾,不在 STRING 后
- assert 'STRING,' not in out
- assert "COMMENT ''," in field_lines[0]
- def test_render_raw_ddl_external_and_drop():
- out = GEN.render_raw_ddl('t', ['a'], {})
- assert 'DROP TABLE IF EXISTS raw.t;' in out
- assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.t (' in out
- def _patch_main_dependencies(monkeypatch, tmp_path):
- """共享 mock:让 main() 不连真 PG / 真 datasource。"""
- sync_ini = tmp_path / 'sync.ini'
- sync_ini.write_text(
- '[reader]\n'
- 'dataSource = postgresql/prd-poyee\n'
- 'table = public.users\n'
- 'column = id, name\n'
- '\n'
- '[writer]\n'
- 'path = /user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/\n',
- encoding='utf-8',
- )
- fake_ds = MagicMock()
- fake_ds.parse.return_value = {
- GEN.SYNC_GEN.DS_POSTGRE_SQL_JDBC_URL: 'jdbc:postgresql://10.0.0.1:5432/mydb',
- 'username': 'u',
- 'password': 'p',
- }
- monkeypatch.setattr(GEN.SYNC_GEN, 'resolve_datasource', lambda ref: fake_ds)
- fake_conn = MagicMock()
- fake_cur = fake_conn.cursor.return_value
- fake_cur.fetchall.return_value = [
- (1, 'id', 'id', 'bigint', 'PK'),
- (2, 'name', '姓名', 'character varying', ''),
- ]
- fake_pg8000 = MagicMock()
- fake_pg8000.dbapi.connect.return_value = fake_conn
- monkeypatch.setitem(sys.modules, 'pg8000', fake_pg8000)
- monkeypatch.setitem(sys.modules, 'pg8000.dbapi', fake_pg8000.dbapi)
- return str(sync_ini)
- def test_main_stdout_only_when_no_o(monkeypatch, capsys, tmp_path):
- sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
- monkeypatch.setattr(sys, 'argv', [
- 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini,
- ])
- GEN.main()
- captured = capsys.readouterr()
- assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out
- assert "'姓名'" in captured.out
- assert '已写入' not in captured.err
- def test_main_stdout_and_disk_when_o_with_dir(monkeypatch, capsys, tmp_path):
- sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
- out_dir = tmp_path / 'out'
- monkeypatch.setattr(sys, 'argv', [
- 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, '-o', str(out_dir),
- ])
- GEN.main()
- captured = capsys.readouterr()
- assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out
- assert '已写入' in captured.err
- assert (out_dir / 'raw_usr_users_inc_d_create.sql').exists()
- def test_main_stdout_and_disk_when_o_no_value(monkeypatch, capsys, tmp_path):
- sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
- monkeypatch.setattr(GEN, 'WORKSPACE_DEFAULT', str(tmp_path / 'workspace'))
- monkeypatch.setattr(sys, 'argv', [
- 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, '-o',
- ])
- GEN.main()
- captured = capsys.readouterr()
- assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out
- assert '已写入' in captured.err
- # ----------------------------------------------------------------------------
- # ods 层测试
- # ----------------------------------------------------------------------------
- def test_normalize_pg_type_strips_paren_args():
- assert GEN.normalize_pg_type('numeric(12,2)') == 'numeric'
- assert GEN.normalize_pg_type('character varying(64)') == 'character varying'
- def test_normalize_pg_type_strips_timezone_suffix():
- assert GEN.normalize_pg_type('timestamp(6) without time zone') == 'timestamp'
- assert GEN.normalize_pg_type('timestamp with time zone') == 'timestamp'
- def test_normalize_pg_type_lower_strip():
- assert GEN.normalize_pg_type(' BIGINT ') == 'bigint'
- def test_normalize_pg_type_passthrough_simple():
- assert GEN.normalize_pg_type('text') == 'text'
- assert GEN.normalize_pg_type('boolean') == 'boolean'
- def test_load_type_mapping_basic(tmp_path):
- p = tmp_path / 'pg-to-hive-type.ini'
- p.write_text(
- '[mapping]\ninteger = BIGINT\ntext = STRING\n', encoding='utf-8')
- m = GEN.load_type_mapping(str(p))
- assert m == {'integer': 'BIGINT', 'text': 'STRING'}
- def test_load_type_mapping_missing_file_raises():
- with pytest.raises(FileNotFoundError, match='类型映射 conf 不存在'):
- GEN.load_type_mapping('/nonexistent.ini')
- def test_load_type_mapping_missing_section_raises(tmp_path):
- p = tmp_path / 'bad.ini'
- p.write_text('[other]\nx = y\n', encoding='utf-8')
- with pytest.raises(KeyError, match='\\[mapping\\]'):
- GEN.load_type_mapping(str(p))
- def test_map_pg_to_hive_hits():
- m = {'integer': 'BIGINT', 'numeric': 'DECIMAL(20,4)', 'timestamp': 'TIMESTAMP'}
- assert GEN.map_pg_to_hive('integer', m) == 'BIGINT'
- assert GEN.map_pg_to_hive('numeric(12,2)', m) == 'DECIMAL(20,4)'
- assert GEN.map_pg_to_hive('timestamp(6) without time zone', m) == 'TIMESTAMP'
- def test_map_pg_to_hive_miss_raises():
- with pytest.raises(KeyError, match='不在 conf'):
- GEN.map_pg_to_hive('xml', {'integer': 'BIGINT'})
- def test_reverse_ods_table_name_basic():
- assert GEN.reverse_ods_table_name('raw_trd_card_group_info_inc_d') == 'ods_trd_card_group_info_inc_d'
- def test_reverse_ods_table_name_no_raw_prefix_raises():
- with pytest.raises(ValueError, match="raw_"):
- GEN.reverse_ods_table_name('ods_x')
- def _ods_type_mapping():
- return {
- 'integer': 'BIGINT', 'bigint': 'BIGINT', 'smallint': 'BIGINT',
- 'numeric': 'DECIMAL(20,4)',
- 'character varying': 'STRING', 'text': 'STRING',
- 'timestamp': 'TIMESTAMP', 'boolean': 'BOOLEAN',
- }
- def test_render_ods_ddl_field_types_mapped():
- columns = ['id', 'amount', 'create_time', 'is_active', 'name']
- full_rows = [
- (1, 'id', 'id', 'bigint', 'PK'),
- (2, 'amount', '金额', 'numeric(12,2)', ''),
- (3, 'create_time', '创建时间', 'timestamp(6) without time zone', ''),
- (4, 'is_active', '是否启用', 'boolean', ''),
- (5, 'name', '姓名', 'character varying(64)', ''),
- ]
- out = GEN.render_ods_ddl(
- 'raw_usr_users_inc_d', columns, full_rows, _ods_type_mapping())
- assert 'CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_usr_users_inc_d (' in out
- assert 'BIGINT' in out
- assert 'DECIMAL(20,4)' in out
- assert 'TIMESTAMP' in out
- assert 'BOOLEAN' in out
- assert 'STRING' in out
- assert "LOCATION '/user/hive/warehouse/ods.db/ods_usr_users_inc_d';" in out
- def test_render_ods_ddl_appends_is_deleted_at_end():
- columns = ['id', 'name']
- full_rows = [
- (1, 'id', 'id', 'bigint', 'PK'),
- (2, 'name', '姓名', 'character varying', ''),
- ]
- out = GEN.render_ods_ddl(
- 'raw_usr_x_inc_d', columns, full_rows, _ods_type_mapping())
- name_idx = out.index("'姓名'")
- is_deleted_idx = out.index('is_deleted')
- assert name_idx < is_deleted_idx
- assert 'is_deleted' in out
- assert 'BOOLEAN' in out
- def test_render_ods_ddl_no_tech_fields():
- columns = ['id']
- full_rows = [(1, 'id', 'id', 'bigint', 'PK')]
- out = GEN.render_ods_ddl(
- 'raw_x_inc_d', columns, full_rows, _ods_type_mapping())
- assert 'etl_time' not in out
- assert 'src_sys' not in out
- assert 'src_tbl' not in out
- def test_render_ods_ddl_partition_orc_external():
- out = GEN.render_ods_ddl(
- 'raw_x_inc_d', ['id'], [(1, 'id', '', 'bigint', 'PK')], _ods_type_mapping())
- assert 'PARTITIONED BY (dt STRING)' in out
- assert 'STORED AS ORC' in out
- assert 'CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_x_inc_d (' in out
- assert 'DROP TABLE IF EXISTS ods.ods_x_inc_d;' in out
- def test_render_ods_ddl_missing_column_pg_meta_raises():
- """sync ini reader.column 里有但 PG 元数据没有的字段,应该报错。"""
- with pytest.raises(KeyError, match='元数据缺失'):
- GEN.render_ods_ddl(
- 'raw_x_inc_d', ['id', 'ghost'],
- [(1, 'id', '', 'bigint', 'PK')], _ods_type_mapping())
- def test_main_l_ods_writes_ods_ddl_with_ods_filename(monkeypatch, capsys, tmp_path):
- sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
- out_dir = tmp_path / 'out'
- type_conf = tmp_path / 'pg-to-hive-type.ini'
- type_conf.write_text(
- '[mapping]\nbigint = BIGINT\ncharacter varying = STRING\n',
- encoding='utf-8',
- )
- # 让 main 读这个 tmp 的 type conf 而不是项目 conf
- monkeypatch.setattr(GEN, 'project_root', str(tmp_path))
- (tmp_path / 'conf').mkdir()
- (tmp_path / 'conf' / 'pg-to-hive-type.ini').write_text(
- type_conf.read_text(encoding='utf-8'), encoding='utf-8')
- monkeypatch.setattr(sys, 'argv', [
- 'hive-ddl-gen.py', '-l', 'ods', '-ini', sync_ini, '-o', str(out_dir),
- ])
- GEN.main()
- captured = capsys.readouterr()
- assert 'CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_usr_users_inc_d (' in captured.out
- assert (out_dir / 'ods_usr_users_inc_d_create.sql').exists()
|