test_runner.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. # -*- coding:utf-8 -*-
  2. from unittest.mock import patch
  3. import pytest
  4. from dw_base.datax.runner import run_job
  5. class _RC:
  6. def __init__(self, code): self.returncode = code
  7. @patch('dw_base.datax.runner.subprocess.run')
  8. def test_run_job_local_two_subprocess_calls(mock_run, tmp_path):
  9. mock_run.return_value = _RC(0)
  10. rc = run_job(
  11. ini_path=str(tmp_path / 'x.ini'),
  12. start_date='20260422', stop_date='20260423',
  13. worker_host='cdhmaster02', current_host='cdhmaster02',
  14. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  15. datax_home='/opt/datax',
  16. )
  17. assert rc == 0
  18. assert mock_run.call_count == 2 # 生成 json + 执行 datax.py
  19. first_argv = mock_run.call_args_list[0][0][0]
  20. assert first_argv[0] == '/usr/bin/python3'
  21. assert 'dw_base.datax.cli' in first_argv
  22. assert 'gen-json' in first_argv
  23. @patch('dw_base.datax.runner.subprocess.run')
  24. def test_run_job_remote_uses_ssh(mock_run, tmp_path):
  25. mock_run.return_value = _RC(0)
  26. rc = run_job(
  27. ini_path=str(tmp_path / 'x.ini'),
  28. start_date='20260422', stop_date='20260423',
  29. worker_host='cdhnode02', current_host='cdhmaster02',
  30. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  31. datax_home='/opt/datax',
  32. )
  33. assert rc == 0
  34. first_argv = mock_run.call_args_list[0][0][0]
  35. assert first_argv[0] == 'ssh'
  36. assert first_argv[1] == 'cdhnode02'
  37. # remote_cmd 是单字符串参数,cd base_dir && <cmd>
  38. assert first_argv[2].startswith('cd ')
  39. assert 'dw_base.datax.cli' in first_argv[2]
  40. assert 'gen-json' in first_argv[2]
  41. @patch('dw_base.datax.runner.subprocess.run')
  42. def test_run_job_local_injects_pythonpath(mock_run, tmp_path):
  43. """本机 subprocess 必须带 PYTHONPATH=base_dir 让 python -m 能找到 dw_base 包。"""
  44. mock_run.return_value = _RC(0)
  45. run_job(
  46. ini_path=str(tmp_path / 'x.ini'),
  47. start_date='20260422', stop_date='20260423',
  48. worker_host='cdhmaster02', current_host='cdhmaster02',
  49. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  50. datax_home='/opt/datax',
  51. )
  52. call_kwargs = mock_run.call_args_list[0][1]
  53. assert 'env' in call_kwargs
  54. assert str(tmp_path) in call_kwargs['env']['PYTHONPATH']
  55. @patch('dw_base.datax.runner.subprocess.run')
  56. def test_run_job_gen_failure_raises(mock_run, tmp_path):
  57. mock_run.return_value = _RC(1)
  58. with pytest.raises(RuntimeError, match='生成 DataX json 失败'):
  59. run_job(
  60. ini_path=str(tmp_path / 'x.ini'),
  61. start_date='20260422', stop_date='20260423',
  62. worker_host='cdhmaster02', current_host='cdhmaster02',
  63. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  64. datax_home='/opt/datax',
  65. )
  66. @patch('dw_base.datax.runner.subprocess.run')
  67. def test_run_job_skip_datax_only_runs_gen(mock_run, tmp_path):
  68. mock_run.return_value = _RC(0)
  69. rc = run_job(
  70. ini_path=str(tmp_path / 'x.ini'),
  71. start_date='20260422', stop_date='20260423',
  72. worker_host='cdhmaster02', current_host='cdhmaster02',
  73. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  74. datax_home='/opt/datax',
  75. skip_datax=True,
  76. )
  77. assert rc == 0
  78. assert mock_run.call_count == 1
  79. import textwrap
  80. from dw_base.datax.runner import _hdfs_src_check
  81. def _ini_with_reader(tmp_path, reader_body):
  82. p = tmp_path / 'reader.ini'
  83. p.write_text(textwrap.dedent(reader_body), encoding='utf-8')
  84. return str(p)
  85. def test_hdfs_src_check_na_for_non_hdfs_reader(tmp_path):
  86. ini = _ini_with_reader(tmp_path, '''\
  87. [reader]
  88. dataSource = postgresql/dev-x
  89. path = /some/path
  90. ''')
  91. assert _hdfs_src_check(ini, '20260422') == 'n/a'
  92. def test_hdfs_src_check_na_when_no_reader(tmp_path):
  93. ini = tmp_path / 'empty.ini'
  94. ini.write_text('', encoding='utf-8')
  95. assert _hdfs_src_check(str(ini), '20260422') == 'n/a'
  96. @patch('dw_base.datax.runner.subprocess.run')
  97. def test_hdfs_src_check_missing(mock_run, tmp_path):
  98. # hadoop fs -test -e 返回非 0 → missing
  99. mock_run.return_value = _RC(1)
  100. ini = _ini_with_reader(tmp_path, '''\
  101. [reader]
  102. dataSource = hdfs/prd-ha
  103. path = /user/hive/warehouse/x/dt=${dt}/
  104. ''')
  105. assert _hdfs_src_check(ini, '20260422') == 'missing'
  106. @patch('dw_base.datax.runner.subprocess.run')
  107. def test_hdfs_src_check_empty_when_du_size_zero(mock_run, tmp_path):
  108. # 第一次调用 test -e → 0 成功,第二次调 du -s 返回 "0 .." → empty
  109. call_count = {'n': 0}
  110. def side_effect(*args, **kwargs):
  111. call_count['n'] += 1
  112. if call_count['n'] == 1:
  113. return _RC(0)
  114. return type('R', (), {'returncode': 0, 'stdout': b'0 0 /path\n', 'stderr': b''})()
  115. mock_run.side_effect = side_effect
  116. ini = _ini_with_reader(tmp_path, '''\
  117. [reader]
  118. dataSource = hdfs/prd-ha
  119. path = /user/hive/warehouse/x/dt=${dt}/
  120. ''')
  121. assert _hdfs_src_check(ini, '20260422') == 'empty'
  122. @patch('dw_base.datax.runner._hdfs_src_check', return_value='missing')
  123. @patch('dw_base.datax.runner.subprocess.run')
  124. def test_run_job_fails_when_hdfs_src_missing(mock_run, _mock_check, tmp_path):
  125. """默认 HDFS missing → rc=1(失败,不 silent skip)。"""
  126. rc = run_job(
  127. ini_path=str(tmp_path / 'x.ini'),
  128. start_date='20260422', stop_date='20260423',
  129. worker_host='cdhmaster02', current_host='cdhmaster02',
  130. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  131. datax_home='/opt/datax',
  132. )
  133. assert rc == 1
  134. # 直接失败 → 不调 gen / exec
  135. assert mock_run.call_count == 0
  136. @patch('dw_base.datax.runner._hdfs_src_check', return_value='empty')
  137. @patch('dw_base.datax.runner.subprocess.run')
  138. def test_run_job_fails_when_hdfs_src_empty(mock_run, _mock_check, tmp_path):
  139. """默认 HDFS empty → rc=1(失败)。"""
  140. mock_run.return_value = _RC(0)
  141. rc = run_job(
  142. ini_path=str(tmp_path / 'x.ini'),
  143. start_date='20260422', stop_date='20260423',
  144. worker_host='cdhmaster02', current_host='cdhmaster02',
  145. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  146. datax_home='/opt/datax',
  147. )
  148. assert rc == 1
  149. assert mock_run.call_count == 0
  150. @patch('dw_base.datax.runner._hdfs_src_check')
  151. @patch('dw_base.datax.runner.subprocess.run')
  152. def test_run_job_skip_check_bypasses_hdfs_check(mock_run, mock_check, tmp_path):
  153. """skip_check=True → 不调 _hdfs_src_check,直接 gen-json + 跑 datax。"""
  154. mock_run.return_value = _RC(0)
  155. rc = run_job(
  156. ini_path=str(tmp_path / 'x.ini'),
  157. start_date='20260422', stop_date='20260423',
  158. worker_host='cdhmaster02', current_host='cdhmaster02',
  159. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  160. datax_home='/opt/datax',
  161. skip_check=True,
  162. )
  163. assert rc == 0
  164. mock_check.assert_not_called()
  165. assert mock_run.call_count == 2 # gen + exec 都照跑