test_runner.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. # -*- coding:utf-8 -*-
  2. from unittest.mock import patch
  3. import pytest
  4. from dw_base.datax.runner import run_job
  5. class _RC:
  6. def __init__(self, code): self.returncode = code
  7. @patch('dw_base.datax.runner.subprocess.run')
  8. def test_run_job_local_two_subprocess_calls(mock_run, tmp_path):
  9. mock_run.return_value = _RC(0)
  10. rc = run_job(
  11. ini_path=str(tmp_path / 'x.ini'),
  12. start_date='20260422', stop_date='20260423',
  13. worker_host='cdhmaster02', current_host='cdhmaster02',
  14. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  15. datax_home='/opt/datax',
  16. )
  17. assert rc == 0
  18. assert mock_run.call_count == 2 # 生成 json + 执行 datax.py
  19. first_argv = mock_run.call_args_list[0][0][0]
  20. assert first_argv[0] == '/usr/bin/python3'
  21. assert 'dw_base.datax.cli' in first_argv
  22. assert 'gen-json' in first_argv
  23. @patch('dw_base.datax.runner.subprocess.run')
  24. def test_run_job_remote_uses_ssh(mock_run, tmp_path):
  25. mock_run.return_value = _RC(0)
  26. rc = run_job(
  27. ini_path=str(tmp_path / 'x.ini'),
  28. start_date='20260422', stop_date='20260423',
  29. worker_host='cdhnode02', current_host='cdhmaster02',
  30. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  31. datax_home='/opt/datax',
  32. )
  33. assert rc == 0
  34. first_argv = mock_run.call_args_list[0][0][0]
  35. assert first_argv[0] == 'ssh'
  36. assert first_argv[1] == 'cdhnode02'
  37. # remote_cmd 是单字符串参数
  38. assert 'dw_base.datax.cli' in first_argv[2]
  39. assert 'gen-json' in first_argv[2]
  40. @patch('dw_base.datax.runner.subprocess.run')
  41. def test_run_job_gen_failure_raises(mock_run, tmp_path):
  42. mock_run.return_value = _RC(1)
  43. with pytest.raises(RuntimeError, match='生成 DataX json 失败'):
  44. run_job(
  45. ini_path=str(tmp_path / 'x.ini'),
  46. start_date='20260422', stop_date='20260423',
  47. worker_host='cdhmaster02', current_host='cdhmaster02',
  48. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  49. datax_home='/opt/datax',
  50. )
  51. @patch('dw_base.datax.runner.subprocess.run')
  52. def test_run_job_skip_datax_only_runs_gen(mock_run, tmp_path):
  53. mock_run.return_value = _RC(0)
  54. rc = run_job(
  55. ini_path=str(tmp_path / 'x.ini'),
  56. start_date='20260422', stop_date='20260423',
  57. worker_host='cdhmaster02', current_host='cdhmaster02',
  58. base_dir=str(tmp_path), python3_path='/usr/bin/python3',
  59. datax_home='/opt/datax',
  60. skip_datax=True,
  61. )
  62. assert rc == 0
  63. assert mock_run.call_count == 1