test_hive_ddl_gen.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. # -*- coding:utf-8 -*-
  2. """
  3. hive-ddl-gen 渲染 / sync ini 解析 / writer.path 反推单测。
  4. 不连真 PG(fetch_column_comments 走 mock conn)。
  5. 脚本路径含连字符,用 importlib.util 动态加载为模块。
  6. """
  7. import importlib.util
  8. import os
  9. import sys
  10. from unittest.mock import MagicMock
  11. import pytest
  12. PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
  13. SCRIPT_PATH = os.path.join(PROJECT_ROOT, 'bin', 'hive-ddl-gen.py')
  14. def _load_script():
  15. spec = importlib.util.spec_from_file_location('hive_ddl_gen', SCRIPT_PATH)
  16. mod = importlib.util.module_from_spec(spec)
  17. sys.modules['hive_ddl_gen'] = mod
  18. spec.loader.exec_module(mod)
  19. return mod
  20. GEN = _load_script()
  21. def test_reverse_table_name_basic():
  22. assert GEN.reverse_table_name(
  23. '/user/hive/warehouse/raw.db/raw_trd_card_group_order_info_inc_d/dt=${dt}/'
  24. ) == 'raw_trd_card_group_order_info_inc_d'
  25. def test_reverse_table_name_no_trailing_slash():
  26. assert GEN.reverse_table_name(
  27. '/user/hive/warehouse/raw.db/foo/dt=20260429'
  28. ) == 'foo'
  29. def test_reverse_table_name_missing_dt_segment_raises():
  30. with pytest.raises(ValueError, match='dt='):
  31. GEN.reverse_table_name('/user/hive/warehouse/raw.db/foo/')
  32. def test_parse_sync_ini_basic(tmp_path):
  33. p = tmp_path / 'sync.ini'
  34. p.write_text(
  35. '[reader]\n'
  36. 'dataSource = postgresql/prd-poyee\n'
  37. 'table = public.users\n'
  38. 'column = id, name, create_time\n'
  39. '\n'
  40. '[writer]\n'
  41. 'path = /user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/\n',
  42. encoding='utf-8',
  43. )
  44. spec = GEN.parse_sync_ini(str(p))
  45. assert spec == {
  46. 'ds_ref': 'postgresql/prd-poyee',
  47. 'schema': 'public',
  48. 'table': 'users',
  49. 'columns': ['id', 'name', 'create_time'],
  50. 'writer_path': '/user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/',
  51. }
  52. def test_parse_sync_ini_missing_file_raises():
  53. with pytest.raises(FileNotFoundError, match='sync ini 不存在'):
  54. GEN.parse_sync_ini('/nonexistent/x.ini')
  55. def test_parse_sync_ini_missing_writer_section_raises(tmp_path):
  56. p = tmp_path / 'bad.ini'
  57. p.write_text(
  58. '[reader]\ndataSource = a/b\ntable = s.t\ncolumn = id\n',
  59. encoding='utf-8',
  60. )
  61. with pytest.raises(KeyError, match='\\[writer\\]'):
  62. GEN.parse_sync_ini(str(p))
  63. def test_parse_sync_ini_table_without_dot_raises(tmp_path):
  64. p = tmp_path / 'bad.ini'
  65. p.write_text(
  66. '[reader]\ndataSource = a/b\ntable = users\ncolumn = id\n'
  67. '[writer]\npath = /x/dt=${dt}/\n',
  68. encoding='utf-8',
  69. )
  70. with pytest.raises(ValueError, match='schema.table'):
  71. GEN.parse_sync_ini(str(p))
  72. def test_parse_sync_ini_empty_column_raises(tmp_path):
  73. p = tmp_path / 'bad.ini'
  74. p.write_text(
  75. '[reader]\ndataSource = a/b\ntable = s.t\ncolumn =\n'
  76. '[writer]\npath = /x/dt=${dt}/\n',
  77. encoding='utf-8',
  78. )
  79. with pytest.raises(ValueError, match='column'):
  80. GEN.parse_sync_ini(str(p))
  81. def test_render_raw_ddl_field_order_follows_columns():
  82. columns = ['id', 'name', 'create_time']
  83. comments = {'id': 'id', 'name': '姓名', 'create_time': '创建时间'}
  84. out = GEN.render_raw_ddl('raw_usr_users_inc_d', columns, comments)
  85. assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in out
  86. id_idx = out.index("'id'")
  87. name_idx = out.index("'姓名'")
  88. ct_idx = out.index("'创建时间'")
  89. assert id_idx < name_idx < ct_idx
  90. assert 'PARTITIONED BY (dt STRING)' in out
  91. assert 'STORED AS ORC' in out
  92. assert "LOCATION '/user/hive/warehouse/raw.db/raw_usr_users_inc_d';" in out
  93. def test_render_raw_ddl_missing_comment_blank():
  94. out = GEN.render_raw_ddl('t', ['col_no_cmt'], {})
  95. assert "col_no_cmt" in out
  96. assert "COMMENT ''" in out
  97. def test_render_raw_ddl_single_quote_in_comment_escaped():
  98. out = GEN.render_raw_ddl('t', ['col'], {'col': "don't"})
  99. assert "COMMENT 'don''t'" in out
  100. def test_render_raw_ddl_last_column_no_trailing_comma():
  101. out = GEN.render_raw_ddl('t', ['a', 'b'], {})
  102. field_lines = [l for l in out.split('\n') if l.startswith(' ')]
  103. assert len(field_lines) == 2
  104. assert field_lines[0].rstrip().endswith(',')
  105. assert not field_lines[1].rstrip().endswith(',')
  106. # 逗号在 COMMENT 'xxx' 末尾,不在 STRING 后
  107. assert 'STRING,' not in out
  108. assert "COMMENT ''," in field_lines[0]
  109. def test_render_raw_ddl_external_and_drop():
  110. out = GEN.render_raw_ddl('t', ['a'], {})
  111. assert 'DROP TABLE IF EXISTS raw.t;' in out
  112. assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.t (' in out
  113. def _patch_main_dependencies(monkeypatch, tmp_path):
  114. """共享 mock:让 main() 不连真 PG / 真 datasource。"""
  115. sync_ini = tmp_path / 'sync.ini'
  116. sync_ini.write_text(
  117. '[reader]\n'
  118. 'dataSource = postgresql/prd-poyee\n'
  119. 'table = public.users\n'
  120. 'column = id, name\n'
  121. '\n'
  122. '[writer]\n'
  123. 'path = /user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/\n',
  124. encoding='utf-8',
  125. )
  126. fake_ds = MagicMock()
  127. fake_ds.parse.return_value = {
  128. GEN.SYNC_GEN.DS_POSTGRE_SQL_JDBC_URL: 'jdbc:postgresql://10.0.0.1:5432/mydb',
  129. 'username': 'u',
  130. 'password': 'p',
  131. }
  132. monkeypatch.setattr(GEN.SYNC_GEN, 'resolve_datasource', lambda ref: fake_ds)
  133. fake_conn = MagicMock()
  134. fake_cur = fake_conn.cursor.return_value
  135. fake_cur.fetchall.return_value = [
  136. (1, 'id', 'id', 'bigint', 'PK'),
  137. (2, 'name', '姓名', 'character varying', ''),
  138. ]
  139. fake_pg8000 = MagicMock()
  140. fake_pg8000.dbapi.connect.return_value = fake_conn
  141. monkeypatch.setitem(sys.modules, 'pg8000', fake_pg8000)
  142. monkeypatch.setitem(sys.modules, 'pg8000.dbapi', fake_pg8000.dbapi)
  143. return str(sync_ini)
  144. def test_main_l_ods_raises_not_implemented(monkeypatch, tmp_path):
  145. sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
  146. monkeypatch.setattr(sys, 'argv', [
  147. 'hive-ddl-gen.py', '-l', 'ods', '-ini', sync_ini,
  148. ])
  149. with pytest.raises(NotImplementedError, match='ods'):
  150. GEN.main()
  151. def test_main_stdout_only_when_no_o(monkeypatch, capsys, tmp_path):
  152. sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
  153. monkeypatch.setattr(sys, 'argv', [
  154. 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini,
  155. ])
  156. GEN.main()
  157. captured = capsys.readouterr()
  158. assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out
  159. assert "'姓名'" in captured.out
  160. assert '已写入' not in captured.err
  161. def test_main_stdout_and_disk_when_o_with_dir(monkeypatch, capsys, tmp_path):
  162. sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
  163. out_dir = tmp_path / 'out'
  164. monkeypatch.setattr(sys, 'argv', [
  165. 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, '-o', str(out_dir),
  166. ])
  167. GEN.main()
  168. captured = capsys.readouterr()
  169. assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out
  170. assert '已写入' in captured.err
  171. assert (out_dir / 'raw_usr_users_inc_d_create.sql').exists()
  172. def test_main_stdout_and_disk_when_o_no_value(monkeypatch, capsys, tmp_path):
  173. sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
  174. monkeypatch.setattr(GEN, 'WORKSPACE_DEFAULT', str(tmp_path / 'workspace'))
  175. monkeypatch.setattr(sys, 'argv', [
  176. 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, '-o',
  177. ])
  178. GEN.main()
  179. captured = capsys.readouterr()
  180. assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out
  181. assert '已写入' in captured.err