test_runner.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. # -*- coding:utf-8 -*-
  2. from unittest.mock import patch
  3. import pytest
  4. from dw_base.datax.runner import run_job
  5. class _RC:
  6. def __init__(self, code): self.returncode = code
  7. @patch('dw_base.datax.runner.subprocess.run')
  8. def test_run_job_local_two_subprocess_calls(mock_run, tmp_path):
  9. mock_run.return_value = _RC(0)
  10. rc = run_job(
  11. ini_path=str(tmp_path / 'x.ini'),
  12. start_date='20260422', stop_date='20260423',
  13. worker_host='cdhmaster02', current_host='cdhmaster02',
  14. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  15. datax_home='/opt/datax',
  16. )
  17. assert rc == 0
  18. assert mock_run.call_count == 2 # 生成 json + 执行 datax.py
  19. first_argv = mock_run.call_args_list[0][0][0]
  20. assert first_argv[0] == '/usr/bin/python3'
  21. assert 'datax-job-config-generator.py' in first_argv[2]
  22. @patch('dw_base.datax.runner.subprocess.run')
  23. def test_run_job_remote_uses_ssh(mock_run, tmp_path):
  24. mock_run.return_value = _RC(0)
  25. rc = run_job(
  26. ini_path=str(tmp_path / 'x.ini'),
  27. start_date='20260422', stop_date='20260423',
  28. worker_host='cdhnode02', current_host='cdhmaster02',
  29. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  30. datax_home='/opt/datax',
  31. )
  32. assert rc == 0
  33. first_argv = mock_run.call_args_list[0][0][0]
  34. assert first_argv[0] == 'ssh'
  35. assert first_argv[1] == 'cdhnode02'
  36. # remote_cmd 是单字符串参数
  37. assert 'datax-job-config-generator.py' in first_argv[2]
  38. @patch('dw_base.datax.runner.subprocess.run')
  39. def test_run_job_gen_failure_raises(mock_run, tmp_path):
  40. mock_run.return_value = _RC(1)
  41. with pytest.raises(RuntimeError, match='生成 DataX json 失败'):
  42. run_job(
  43. ini_path=str(tmp_path / 'x.ini'),
  44. start_date='20260422', stop_date='20260423',
  45. worker_host='cdhmaster02', current_host='cdhmaster02',
  46. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  47. datax_home='/opt/datax',
  48. )
  49. @patch('dw_base.datax.runner.subprocess.run')
  50. def test_run_job_skip_datax_only_runs_gen(mock_run, tmp_path):
  51. mock_run.return_value = _RC(0)
  52. rc = run_job(
  53. ini_path=str(tmp_path / 'x.ini'),
  54. start_date='20260422', stop_date='20260423',
  55. worker_host='cdhmaster02', current_host='cdhmaster02',
  56. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  57. datax_home='/opt/datax',
  58. skip_datax=True,
  59. )
  60. assert rc == 0
  61. assert mock_run.call_count == 1