test_hive_ddl_gen.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. # -*- coding:utf-8 -*-
  2. """
  3. hive-ddl-gen 渲染 / sync ini 解析 / writer.path 反推单测。
  4. 不连真 PG(fetch_column_comments 走 mock conn)。
  5. 脚本路径含连字符,用 importlib.util 动态加载为模块。
  6. """
  7. import importlib.util
  8. import os
  9. import sys
  10. from unittest.mock import MagicMock
  11. import pytest
  12. PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
  13. SCRIPT_PATH = os.path.join(PROJECT_ROOT, 'bin', 'hive-ddl-gen.py')
  14. def _load_script():
  15. spec = importlib.util.spec_from_file_location('hive_ddl_gen', SCRIPT_PATH)
  16. mod = importlib.util.module_from_spec(spec)
  17. sys.modules['hive_ddl_gen'] = mod
  18. spec.loader.exec_module(mod)
  19. return mod
  20. GEN = _load_script()
  21. def test_reverse_table_name_basic():
  22. assert GEN.reverse_table_name(
  23. '/user/hive/warehouse/raw.db/raw_trd_card_group_order_info_inc_d/dt=${dt}/'
  24. ) == 'raw_trd_card_group_order_info_inc_d'
  25. def test_reverse_table_name_no_trailing_slash():
  26. assert GEN.reverse_table_name(
  27. '/user/hive/warehouse/raw.db/foo/dt=20260429'
  28. ) == 'foo'
  29. def test_reverse_table_name_missing_dt_segment_raises():
  30. with pytest.raises(ValueError, match='dt='):
  31. GEN.reverse_table_name('/user/hive/warehouse/raw.db/foo/')
  32. def test_parse_sync_ini_basic(tmp_path):
  33. p = tmp_path / 'sync.ini'
  34. p.write_text(
  35. '[reader]\n'
  36. 'dataSource = postgresql/prd-poyee\n'
  37. 'table = public.users\n'
  38. 'column = id, name, create_time\n'
  39. '\n'
  40. '[writer]\n'
  41. 'path = /user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/\n',
  42. encoding='utf-8',
  43. )
  44. spec = GEN.parse_sync_ini(str(p))
  45. assert spec == {
  46. 'ds_ref': 'postgresql/prd-poyee',
  47. 'schema': 'public',
  48. 'table': 'users',
  49. 'columns': ['id', 'name', 'create_time'],
  50. 'writer_path': '/user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/',
  51. }
  52. def test_parse_sync_ini_missing_file_raises():
  53. with pytest.raises(FileNotFoundError, match='sync ini 不存在'):
  54. GEN.parse_sync_ini('/nonexistent/x.ini')
  55. def test_parse_sync_ini_missing_writer_section_raises(tmp_path):
  56. p = tmp_path / 'bad.ini'
  57. p.write_text(
  58. '[reader]\ndataSource = a/b\ntable = s.t\ncolumn = id\n',
  59. encoding='utf-8',
  60. )
  61. with pytest.raises(KeyError, match='\\[writer\\]'):
  62. GEN.parse_sync_ini(str(p))
  63. def test_parse_sync_ini_table_without_dot_raises(tmp_path):
  64. p = tmp_path / 'bad.ini'
  65. p.write_text(
  66. '[reader]\ndataSource = a/b\ntable = users\ncolumn = id\n'
  67. '[writer]\npath = /x/dt=${dt}/\n',
  68. encoding='utf-8',
  69. )
  70. with pytest.raises(ValueError, match='schema.table'):
  71. GEN.parse_sync_ini(str(p))
  72. def test_parse_sync_ini_empty_column_raises(tmp_path):
  73. p = tmp_path / 'bad.ini'
  74. p.write_text(
  75. '[reader]\ndataSource = a/b\ntable = s.t\ncolumn =\n'
  76. '[writer]\npath = /x/dt=${dt}/\n',
  77. encoding='utf-8',
  78. )
  79. with pytest.raises(ValueError, match='column'):
  80. GEN.parse_sync_ini(str(p))
  81. def test_render_raw_ddl_field_order_follows_columns():
  82. columns = ['id', 'name', 'create_time']
  83. comments = {'id': 'id', 'name': '姓名', 'create_time': '创建时间'}
  84. out = GEN.render_raw_ddl('raw_usr_users_inc_d', columns, comments)
  85. assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in out
  86. id_idx = out.index("'id'")
  87. name_idx = out.index("'姓名'")
  88. ct_idx = out.index("'创建时间'")
  89. assert id_idx < name_idx < ct_idx
  90. assert 'PARTITIONED BY (dt STRING)' in out
  91. assert 'STORED AS ORC' in out
  92. assert "LOCATION '/user/hive/warehouse/raw.db/raw_usr_users_inc_d';" in out
  93. def test_render_raw_ddl_missing_comment_blank():
  94. out = GEN.render_raw_ddl('t', ['col_no_cmt'], {})
  95. assert "col_no_cmt" in out
  96. assert "COMMENT ''" in out
  97. def test_render_raw_ddl_single_quote_in_comment_escaped():
  98. out = GEN.render_raw_ddl('t', ['col'], {'col': "don't"})
  99. assert "COMMENT 'don''t'" in out
  100. def test_render_raw_ddl_last_column_no_trailing_comma():
  101. out = GEN.render_raw_ddl('t', ['a', 'b'], {})
  102. field_lines = [l for l in out.split('\n') if l.startswith(' ')]
  103. assert len(field_lines) == 2
  104. assert field_lines[0].rstrip().endswith(',')
  105. assert not field_lines[1].rstrip().endswith(',')
  106. # 逗号在 COMMENT 'xxx' 末尾,不在 STRING 后
  107. assert 'STRING,' not in out
  108. assert "COMMENT ''," in field_lines[0]
  109. def test_render_raw_ddl_external_and_drop():
  110. out = GEN.render_raw_ddl('t', ['a'], {})
  111. assert 'DROP TABLE IF EXISTS raw.t;' in out
  112. assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.t (' in out
  113. def _patch_main_dependencies(monkeypatch, tmp_path):
  114. """共享 mock:让 main() 不连真 PG / 真 datasource。"""
  115. sync_ini = tmp_path / 'sync.ini'
  116. sync_ini.write_text(
  117. '[reader]\n'
  118. 'dataSource = postgresql/prd-poyee\n'
  119. 'table = public.users\n'
  120. 'column = id, name\n'
  121. '\n'
  122. '[writer]\n'
  123. 'path = /user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/\n',
  124. encoding='utf-8',
  125. )
  126. fake_ds = MagicMock()
  127. fake_ds.parse.return_value = {
  128. GEN.SYNC_GEN.DS_POSTGRE_SQL_JDBC_URL: 'jdbc:postgresql://10.0.0.1:5432/mydb',
  129. 'username': 'u',
  130. 'password': 'p',
  131. }
  132. monkeypatch.setattr(GEN.SYNC_GEN, 'resolve_datasource', lambda ref: fake_ds)
  133. fake_conn = MagicMock()
  134. fake_cur = fake_conn.cursor.return_value
  135. fake_cur.fetchall.return_value = [
  136. (1, 'id', 'id', 'bigint', 'PK'),
  137. (2, 'name', '姓名', 'character varying', ''),
  138. ]
  139. fake_pg8000 = MagicMock()
  140. fake_pg8000.dbapi.connect.return_value = fake_conn
  141. monkeypatch.setitem(sys.modules, 'pg8000', fake_pg8000)
  142. monkeypatch.setitem(sys.modules, 'pg8000.dbapi', fake_pg8000.dbapi)
  143. return str(sync_ini)
  144. def test_main_stdout_only_when_no_o(monkeypatch, capsys, tmp_path):
  145. sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
  146. monkeypatch.setattr(sys, 'argv', [
  147. 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini,
  148. ])
  149. GEN.main()
  150. captured = capsys.readouterr()
  151. assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out
  152. assert "'姓名'" in captured.out
  153. assert '已写入' not in captured.err
  154. def test_main_stdout_and_disk_when_o_with_dir(monkeypatch, capsys, tmp_path):
  155. sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
  156. out_dir = tmp_path / 'out'
  157. monkeypatch.setattr(sys, 'argv', [
  158. 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, '-o', str(out_dir),
  159. ])
  160. GEN.main()
  161. captured = capsys.readouterr()
  162. assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out
  163. assert '已写入' in captured.err
  164. assert (out_dir / 'raw_usr_users_inc_d_create.sql').exists()
  165. def test_main_stdout_and_disk_when_o_no_value(monkeypatch, capsys, tmp_path):
  166. sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
  167. monkeypatch.setattr(GEN, 'WORKSPACE_DEFAULT', str(tmp_path / 'workspace'))
  168. monkeypatch.setattr(sys, 'argv', [
  169. 'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, '-o',
  170. ])
  171. GEN.main()
  172. captured = capsys.readouterr()
  173. assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out
  174. assert '已写入' in captured.err
  175. # ----------------------------------------------------------------------------
  176. # ods 层测试
  177. # ----------------------------------------------------------------------------
  178. def test_normalize_pg_type_strips_paren_args():
  179. assert GEN.normalize_pg_type('numeric(12,2)') == 'numeric'
  180. assert GEN.normalize_pg_type('character varying(64)') == 'character varying'
  181. def test_normalize_pg_type_strips_timezone_suffix():
  182. assert GEN.normalize_pg_type('timestamp(6) without time zone') == 'timestamp'
  183. assert GEN.normalize_pg_type('timestamp with time zone') == 'timestamp'
  184. def test_normalize_pg_type_lower_strip():
  185. assert GEN.normalize_pg_type(' BIGINT ') == 'bigint'
  186. def test_normalize_pg_type_passthrough_simple():
  187. assert GEN.normalize_pg_type('text') == 'text'
  188. assert GEN.normalize_pg_type('boolean') == 'boolean'
  189. def test_load_type_mapping_basic(tmp_path):
  190. p = tmp_path / 'pg-to-hive-type.ini'
  191. p.write_text(
  192. '[mapping]\ninteger = BIGINT\ntext = STRING\n', encoding='utf-8')
  193. m = GEN.load_type_mapping(str(p))
  194. assert m == {'integer': 'BIGINT', 'text': 'STRING'}
  195. def test_load_type_mapping_missing_file_raises():
  196. with pytest.raises(FileNotFoundError, match='类型映射 conf 不存在'):
  197. GEN.load_type_mapping('/nonexistent.ini')
  198. def test_load_type_mapping_missing_section_raises(tmp_path):
  199. p = tmp_path / 'bad.ini'
  200. p.write_text('[other]\nx = y\n', encoding='utf-8')
  201. with pytest.raises(KeyError, match='\\[mapping\\]'):
  202. GEN.load_type_mapping(str(p))
  203. def test_map_pg_to_hive_hits():
  204. m = {'integer': 'BIGINT', 'numeric': 'DECIMAL(20,4)', 'timestamp': 'TIMESTAMP'}
  205. assert GEN.map_pg_to_hive('integer', m) == 'BIGINT'
  206. assert GEN.map_pg_to_hive('numeric(12,2)', m) == 'DECIMAL(20,4)'
  207. assert GEN.map_pg_to_hive('timestamp(6) without time zone', m) == 'TIMESTAMP'
  208. def test_map_pg_to_hive_miss_raises():
  209. with pytest.raises(KeyError, match='不在 conf'):
  210. GEN.map_pg_to_hive('xml', {'integer': 'BIGINT'})
  211. def test_reverse_ods_table_name_basic():
  212. assert GEN.reverse_ods_table_name('raw_trd_card_group_info_inc_d') == 'ods_trd_card_group_info_inc_d'
  213. def test_reverse_ods_table_name_no_raw_prefix_raises():
  214. with pytest.raises(ValueError, match="raw_"):
  215. GEN.reverse_ods_table_name('ods_x')
  216. def _ods_type_mapping():
  217. return {
  218. 'integer': 'BIGINT', 'bigint': 'BIGINT', 'smallint': 'BIGINT',
  219. 'numeric': 'DECIMAL(20,4)',
  220. 'character varying': 'STRING', 'text': 'STRING',
  221. 'timestamp': 'TIMESTAMP', 'boolean': 'BOOLEAN',
  222. }
  223. def test_render_ods_ddl_field_types_mapped():
  224. columns = ['id', 'amount', 'create_time', 'is_active', 'name']
  225. full_rows = [
  226. (1, 'id', 'id', 'bigint', 'PK'),
  227. (2, 'amount', '金额', 'numeric(12,2)', ''),
  228. (3, 'create_time', '创建时间', 'timestamp(6) without time zone', ''),
  229. (4, 'is_active', '是否启用', 'boolean', ''),
  230. (5, 'name', '姓名', 'character varying(64)', ''),
  231. ]
  232. out = GEN.render_ods_ddl(
  233. 'raw_usr_users_inc_d', columns, full_rows, _ods_type_mapping())
  234. assert 'CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_usr_users_inc_d (' in out
  235. assert 'BIGINT' in out
  236. assert 'DECIMAL(20,4)' in out
  237. assert 'TIMESTAMP' in out
  238. assert 'BOOLEAN' in out
  239. assert 'STRING' in out
  240. assert "LOCATION '/user/hive/warehouse/ods.db/ods_usr_users_inc_d';" in out
  241. def test_render_ods_ddl_appends_is_deleted_at_end():
  242. columns = ['id', 'name']
  243. full_rows = [
  244. (1, 'id', 'id', 'bigint', 'PK'),
  245. (2, 'name', '姓名', 'character varying', ''),
  246. ]
  247. out = GEN.render_ods_ddl(
  248. 'raw_usr_x_inc_d', columns, full_rows, _ods_type_mapping())
  249. name_idx = out.index("'姓名'")
  250. is_deleted_idx = out.index('is_deleted')
  251. assert name_idx < is_deleted_idx
  252. assert 'is_deleted' in out
  253. assert 'BOOLEAN' in out
  254. def test_render_ods_ddl_no_tech_fields():
  255. columns = ['id']
  256. full_rows = [(1, 'id', 'id', 'bigint', 'PK')]
  257. out = GEN.render_ods_ddl(
  258. 'raw_x_inc_d', columns, full_rows, _ods_type_mapping())
  259. assert 'etl_time' not in out
  260. assert 'src_sys' not in out
  261. assert 'src_tbl' not in out
  262. def test_render_ods_ddl_partition_orc_external():
  263. out = GEN.render_ods_ddl(
  264. 'raw_x_inc_d', ['id'], [(1, 'id', '', 'bigint', 'PK')], _ods_type_mapping())
  265. assert 'PARTITIONED BY (dt STRING)' in out
  266. assert 'STORED AS ORC' in out
  267. assert 'CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_x_inc_d (' in out
  268. assert 'DROP TABLE IF EXISTS ods.ods_x_inc_d;' in out
  269. def test_render_ods_ddl_missing_column_pg_meta_raises():
  270. """sync ini reader.column 里有但 PG 元数据没有的字段,应该报错。"""
  271. with pytest.raises(KeyError, match='元数据缺失'):
  272. GEN.render_ods_ddl(
  273. 'raw_x_inc_d', ['id', 'ghost'],
  274. [(1, 'id', '', 'bigint', 'PK')], _ods_type_mapping())
  275. def test_main_l_ods_writes_ods_ddl_with_ods_filename(monkeypatch, capsys, tmp_path):
  276. sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
  277. out_dir = tmp_path / 'out'
  278. type_conf = tmp_path / 'pg-to-hive-type.ini'
  279. type_conf.write_text(
  280. '[mapping]\nbigint = BIGINT\ncharacter varying = STRING\n',
  281. encoding='utf-8',
  282. )
  283. # 让 main 读这个 tmp 的 type conf 而不是项目 conf
  284. monkeypatch.setattr(GEN, 'project_root', str(tmp_path))
  285. (tmp_path / 'conf').mkdir()
  286. (tmp_path / 'conf' / 'pg-to-hive-type.ini').write_text(
  287. type_conf.read_text(encoding='utf-8'), encoding='utf-8')
  288. monkeypatch.setattr(sys, 'argv', [
  289. 'hive-ddl-gen.py', '-l', 'ods', '-ini', sync_ini, '-o', str(out_dir),
  290. ])
  291. GEN.main()
  292. captured = capsys.readouterr()
  293. assert 'CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_usr_users_inc_d (' in captured.out
  294. assert (out_dir / 'ods_usr_users_inc_d_create.sql').exists()