test_entry.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. # -*- coding:utf-8 -*-
  2. """
  3. DataxImport / DataxExport 门面类单测。
  4. 不跑真实 subprocess / hive / ssh,全部 mock;只验"流程串联"(调了哪些模块、顺序、参数)。
  5. 真实链路由 tests/integration/datax/hive_import/ 集成测试覆盖。
  6. """
  7. import textwrap
  8. from pathlib import Path
  9. from unittest.mock import patch
  10. import pytest
  11. from dw_base.datax.entry import DataxExport, DataxImport
  12. @pytest.fixture
  13. def fake_env(tmp_path: Path):
  14. """造一个假项目根 + workers.ini + 1 份冒烟 ini。"""
  15. base_dir = tmp_path / 'release' / 'proj'
  16. base_dir.mkdir(parents=True)
  17. workers_ini = tmp_path / 'workers.ini'
  18. workers_ini.write_text(textwrap.dedent('''\
  19. [release]
  20. host = master1
  21. [weights]
  22. master1 = 1
  23. node1 = 3
  24. '''), encoding='utf-8')
  25. ini_dir = base_dir / 'jobs' / 'raw'
  26. ini_dir.mkdir(parents=True)
  27. sample_ini = ini_dir / 'sample.ini'
  28. sample_ini.write_text(textwrap.dedent('''\
  29. [reader]
  30. dataSource = postgresql/dev-x
  31. [writer]
  32. dataSource = hdfs/prd-ha
  33. path = /user/hive/warehouse/test.db/raw_sample/dt=${dt}/
  34. '''), encoding='utf-8')
  35. return dict(
  36. base_dir=str(base_dir),
  37. workers_ini_path=str(workers_ini),
  38. release_user='bigdata',
  39. release_root_dir=str(tmp_path / 'release'),
  40. python3_path='/usr/bin/python3',
  41. datax_home='/opt/datax',
  42. log_root_dir=str(tmp_path / 'log'),
  43. sample_ini=str(sample_ini),
  44. ini_dir=str(ini_dir),
  45. )
  46. @patch('dw_base.datax.entry.runner.run_job', return_value=0)
  47. @patch('dw_base.datax.entry.partition.execute_ddls')
  48. def test_import_calls_partition_then_runner(mock_exec_ddls, mock_run, fake_env):
  49. e = DataxImport(
  50. base_dir=fake_env['base_dir'],
  51. workers_ini_path=fake_env['workers_ini_path'],
  52. release_user=fake_env['release_user'],
  53. release_root_dir=fake_env['release_root_dir'],
  54. python3_path=fake_env['python3_path'],
  55. datax_home=fake_env['datax_home'],
  56. log_root_dir=fake_env['log_root_dir'],
  57. )
  58. failed = e.run(inis=[fake_env['sample_ini']], inis_dirs=[],
  59. start_date='20260422', stop_date='20260423')
  60. assert failed == 0
  61. assert mock_exec_ddls.called # 分区预建被调
  62. ddls_called = mock_exec_ddls.call_args[0][0]
  63. assert any('PARTITION(dt=20260422)' in d for d in ddls_called)
  64. assert mock_run.called
  65. @patch('dw_base.datax.entry.runner.run_job', return_value=0)
  66. @patch('dw_base.datax.entry.partition.execute_ddls')
  67. def test_import_skip_partition_bypasses_ddl(mock_exec_ddls, mock_run, fake_env):
  68. e = DataxImport(
  69. base_dir=fake_env['base_dir'],
  70. workers_ini_path=fake_env['workers_ini_path'],
  71. release_user=fake_env['release_user'],
  72. release_root_dir=fake_env['release_root_dir'],
  73. python3_path=fake_env['python3_path'],
  74. datax_home=fake_env['datax_home'],
  75. log_root_dir=fake_env['log_root_dir'],
  76. )
  77. e.run(inis=[fake_env['sample_ini']], inis_dirs=[],
  78. start_date='20260422', stop_date='20260423',
  79. skip_partition=True)
  80. assert not mock_exec_ddls.called
  81. assert mock_run.called
  82. @patch('dw_base.datax.entry.runner.run_job', return_value=1)
  83. @patch('dw_base.datax.entry.partition.execute_ddls')
  84. def test_import_returns_failure_count(mock_exec_ddls, mock_run, fake_env):
  85. e = DataxImport(
  86. base_dir=fake_env['base_dir'],
  87. workers_ini_path=fake_env['workers_ini_path'],
  88. release_user=fake_env['release_user'],
  89. release_root_dir=fake_env['release_root_dir'],
  90. python3_path=fake_env['python3_path'],
  91. datax_home=fake_env['datax_home'],
  92. log_root_dir=fake_env['log_root_dir'],
  93. )
  94. failed = e.run(inis=[fake_env['sample_ini']], inis_dirs=[],
  95. start_date='20260422', stop_date='20260423')
  96. assert failed == 1
  97. @patch('dw_base.datax.entry.runner.run_job', return_value=0)
  98. @patch('dw_base.datax.entry.partition.execute_ddls')
  99. def test_export_no_partition_phase(mock_exec_ddls, mock_run, fake_env):
  100. e = DataxExport(
  101. base_dir=fake_env['base_dir'],
  102. workers_ini_path=fake_env['workers_ini_path'],
  103. release_user=fake_env['release_user'],
  104. release_root_dir=fake_env['release_root_dir'],
  105. python3_path=fake_env['python3_path'],
  106. datax_home=fake_env['datax_home'],
  107. log_root_dir=fake_env['log_root_dir'],
  108. )
  109. failed = e.run(inis=[fake_env['sample_ini']], inis_dirs=[],
  110. start_date='20260422', stop_date='20260423')
  111. assert failed == 0
  112. assert not mock_exec_ddls.called
  113. assert mock_run.called
  114. @patch('dw_base.datax.entry.runner.run_job', return_value=0)
  115. @patch('dw_base.datax.entry.partition.execute_ddls')
  116. def test_import_expands_ini_dir(mock_exec_ddls, mock_run, fake_env):
  117. e = DataxImport(
  118. base_dir=fake_env['base_dir'],
  119. workers_ini_path=fake_env['workers_ini_path'],
  120. release_user=fake_env['release_user'],
  121. release_root_dir=fake_env['release_root_dir'],
  122. python3_path=fake_env['python3_path'],
  123. datax_home=fake_env['datax_home'],
  124. log_root_dir=fake_env['log_root_dir'],
  125. )
  126. e.run(inis=[], inis_dirs=[fake_env['ini_dir']],
  127. start_date='20260422', stop_date='20260423')
  128. # 目录扫到 sample.ini,runner.run_job 被调
  129. assert mock_run.called
  130. def _make_importer(fake_env):
  131. return DataxImport(
  132. base_dir=fake_env['base_dir'],
  133. workers_ini_path=fake_env['workers_ini_path'],
  134. release_user=fake_env['release_user'],
  135. release_root_dir=fake_env['release_root_dir'],
  136. python3_path=fake_env['python3_path'],
  137. datax_home=fake_env['datax_home'],
  138. log_root_dir=fake_env['log_root_dir'],
  139. )
  140. @patch('dw_base.datax.entry.runner.run_job', return_value=0)
  141. @patch('dw_base.datax.entry.partition.execute_ddls')
  142. def test_backfill_loops_days_inclusive_start_exclusive_stop(mock_exec_ddls, mock_run, fake_env):
  143. """backfill 按 [start, stop) 循环,3 天 → run_job 调 3 次,每次单日语义。"""
  144. e = _make_importer(fake_env)
  145. rc = e.backfill(
  146. inis=[fake_env['sample_ini']], inis_dirs=[],
  147. start_date='20260420', stop_date='20260423',
  148. )
  149. assert rc == 0
  150. # 3 天 × 1 ini = run_job 调 3 次
  151. assert mock_run.call_count == 3
  152. # 取每次调用的 start_date / stop_date 参数验证单日切片
  153. calls = [c.kwargs for c in mock_run.call_args_list]
  154. dates = [(c['start_date'], c['stop_date']) for c in calls]
  155. assert dates == [
  156. ('20260420', '20260421'),
  157. ('20260421', '20260422'),
  158. ('20260422', '20260423'),
  159. ]
  160. @patch('dw_base.datax.entry.runner.run_job', return_value=1) # 每次 run_job 都失败
  161. @patch('dw_base.datax.entry.partition.execute_ddls')
  162. def test_backfill_accumulates_failure_count(mock_exec_ddls, mock_run, fake_env):
  163. """3 天全失败 → 返回 3。"""
  164. e = _make_importer(fake_env)
  165. rc = e.backfill(
  166. inis=[fake_env['sample_ini']], inis_dirs=[],
  167. start_date='20260420', stop_date='20260423',
  168. )
  169. assert rc == 3
  170. @patch('dw_base.datax.entry.runner.run_job', return_value=0)
  171. @patch('dw_base.datax.entry.partition.execute_ddls')
  172. def test_backfill_invalid_range_raises(mock_exec_ddls, mock_run, fake_env):
  173. """stop_date <= start_date 抛 ValueError,不跑任何一天。"""
  174. e = _make_importer(fake_env)
  175. with pytest.raises(ValueError, match='stop_date 必须大于 start_date'):
  176. e.backfill(
  177. inis=[fake_env['sample_ini']], inis_dirs=[],
  178. start_date='20260423', stop_date='20260423',
  179. )
  180. assert mock_run.call_count == 0