test_runner.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. # -*- coding:utf-8 -*-
  2. from unittest.mock import patch
  3. import pytest
  4. from dw_base.datax.runner import run_job
  5. class _RC:
  6. def __init__(self, code): self.returncode = code
  7. @patch('dw_base.datax.runner.subprocess.run')
  8. def test_run_job_local_two_subprocess_calls(mock_run, tmp_path):
  9. mock_run.return_value = _RC(0)
  10. rc = run_job(
  11. ini_path=str(tmp_path / 'x.ini'),
  12. start_date='20260422', stop_date='20260423',
  13. worker_host='cdhmaster02', current_host='cdhmaster02',
  14. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  15. datax_home='/opt/datax',
  16. )
  17. assert rc == 0
  18. assert mock_run.call_count == 2 # 生成 json + 执行 datax.py
  19. first_argv = mock_run.call_args_list[0][0][0]
  20. assert first_argv[0] == '/usr/bin/python3'
  21. assert 'dw_base.datax.cli' in first_argv
  22. assert 'gen-json' in first_argv
  23. @patch('dw_base.datax.runner.subprocess.run')
  24. def test_run_job_remote_uses_ssh(mock_run, tmp_path):
  25. mock_run.return_value = _RC(0)
  26. rc = run_job(
  27. ini_path=str(tmp_path / 'x.ini'),
  28. start_date='20260422', stop_date='20260423',
  29. worker_host='cdhnode02', current_host='cdhmaster02',
  30. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  31. datax_home='/opt/datax',
  32. )
  33. assert rc == 0
  34. first_argv = mock_run.call_args_list[0][0][0]
  35. assert first_argv[0] == 'ssh'
  36. assert first_argv[1] == 'cdhnode02'
  37. # remote_cmd 是单字符串参数,cd base_dir && <cmd>
  38. assert first_argv[2].startswith('cd ')
  39. assert 'dw_base.datax.cli' in first_argv[2]
  40. assert 'gen-json' in first_argv[2]
  41. @patch('dw_base.datax.runner.subprocess.run')
  42. def test_run_job_local_injects_pythonpath(mock_run, tmp_path):
  43. """本机 subprocess 必须带 PYTHONPATH=base_dir 让 python -m 能找到 dw_base 包。"""
  44. mock_run.return_value = _RC(0)
  45. run_job(
  46. ini_path=str(tmp_path / 'x.ini'),
  47. start_date='20260422', stop_date='20260423',
  48. worker_host='cdhmaster02', current_host='cdhmaster02',
  49. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  50. datax_home='/opt/datax',
  51. )
  52. call_kwargs = mock_run.call_args_list[0][1]
  53. assert 'env' in call_kwargs
  54. assert str(tmp_path) in call_kwargs['env']['PYTHONPATH']
  55. @patch('dw_base.datax.runner.subprocess.run')
  56. def test_run_job_gen_failure_raises(mock_run, tmp_path):
  57. mock_run.return_value = _RC(1)
  58. with pytest.raises(RuntimeError, match='生成 DataX json 失败'):
  59. run_job(
  60. ini_path=str(tmp_path / 'x.ini'),
  61. start_date='20260422', stop_date='20260423',
  62. worker_host='cdhmaster02', current_host='cdhmaster02',
  63. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  64. datax_home='/opt/datax',
  65. )
  66. @patch('dw_base.datax.runner.subprocess.run')
  67. def test_run_job_skip_datax_only_runs_gen(mock_run, tmp_path):
  68. mock_run.return_value = _RC(0)
  69. rc = run_job(
  70. ini_path=str(tmp_path / 'x.ini'),
  71. start_date='20260422', stop_date='20260423',
  72. worker_host='cdhmaster02', current_host='cdhmaster02',
  73. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  74. datax_home='/opt/datax',
  75. skip_datax=True,
  76. )
  77. assert rc == 0
  78. assert mock_run.call_count == 1