# -*- coding:utf-8 -*- from unittest.mock import MagicMock, patch import pytest from dw_base.ds.api import DSClient @pytest.fixture def client(tmp_path): conf = tmp_path / 'ds.ini' conf.write_text( '[dolphinscheduler]\n' 'base_url = http://example/dolphinscheduler\n' 'token = TEST_TOKEN\n', encoding='utf-8', ) return DSClient(conf_path=str(conf)) def _mock_resp(status_code=200, json_data=None, text=''): """构造 requests.Response mock。json_data=None 表示 .json() 抛 ValueError。""" r = MagicMock(status_code=status_code, text=text) if json_data is None: r.json.side_effect = ValueError('not json') else: r.json.return_value = json_data return r # --- 初始化 --- def test_init_loads_conf(client): assert client.base_url == 'http://example/dolphinscheduler' assert client.token == 'TEST_TOKEN' assert client.session.headers['token'] == 'TEST_TOKEN' def test_init_missing_conf_raises(): with pytest.raises(RuntimeError, match='DS 配置文件不存在'): DSClient(conf_path='/nonexistent/path.ini') def test_init_strips_trailing_slash(tmp_path): conf = tmp_path / 'ds.ini' conf.write_text( '[dolphinscheduler]\n' 'base_url = http://example/dolphinscheduler/\n' 'token = T\n', encoding='utf-8', ) c = DSClient(conf_path=str(conf)) assert c.base_url == 'http://example/dolphinscheduler' # --- GET --- def test_get_2xx_returns_json(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, {'code': 0, 'data': 'ok'}) assert client.get('/projects') == {'code': 0, 'data': 'ok'} # 不传 params 时不应在 kwargs 出现 params mock_req.assert_called_once_with( 'GET', 'http://example/dolphinscheduler/projects') def test_get_with_params(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, {}) client.get('/projects', params={'pageNo': 1, 'pageSize': 10}) mock_req.assert_called_once_with( 'GET', 'http://example/dolphinscheduler/projects', params={'pageNo': 1, 'pageSize': 10}) def test_get_strips_leading_slash(client): """path 带或不带前导 / 都拼成相同 URL。""" with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, {}) client.get('projects') assert mock_req.call_args[0][1] == 'http://example/dolphinscheduler/projects' def test_get_non_2xx_raises(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(500, json_data={}, text='Internal Server Error') # 改 status_code 不影响 json mock mock_req.return_value.status_code = 500 with pytest.raises(RuntimeError, match='GET .*failed rc=500'): client.get('/projects') def test_get_2xx_non_json_raises_friendly(client): """path 错被 SPA fallback 时抛 RuntimeError 含 raw text 前 200 字符。""" with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, json_data=None, text='fallback') with pytest.raises(RuntimeError) as exc: client.get('/wrong/path') msg = str(exc.value) assert '返回非 JSON' in msg assert 'SPA fallback' in msg assert '' in msg def test_get_2xx_non_json_truncates_to_200(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, json_data=None, text='X' * 500) with pytest.raises(RuntimeError) as exc: client.get('/p') assert 'X' * 200 in str(exc.value) assert 'X' * 201 not in str(exc.value) # --- POST --- def test_post_json_body(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, {'ok': True}) assert client.post('/foo', json_body={'k': 'v'}) == {'ok': True} mock_req.assert_called_once_with( 'POST', 'http://example/dolphinscheduler/foo', json={'k': 'v'}) def test_post_form_data_uses_data_kwarg(client): """form_data 走 requests data= kwarg(form-encoded)。""" with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, {'ok': True}) client.post('/foo', form_data={'name': 'x', 'desc': 'hi'}) mock_req.assert_called_once_with( 'POST', 'http://example/dolphinscheduler/foo', data={'name': 'x', 'desc': 'hi'}) def test_post_no_body(client): """不传 body 时 kwargs 不应有 data / json。""" with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, {}) client.post('/foo') mock_req.assert_called_once_with( 'POST', 'http://example/dolphinscheduler/foo') def test_post_2xx_non_json_raises_friendly(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, json_data=None, text='NOT JSON') with pytest.raises(RuntimeError, match='POST.*返回非 JSON'): client.post('/foo') def test_post_non_2xx_raises(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(403, {}, text='Forbidden') mock_req.return_value.status_code = 403 with pytest.raises(RuntimeError, match='POST .*failed rc=403'): client.post('/foo') def test_post_json_and_form_mutex(client): with pytest.raises(ValueError, match='互斥'): client.post('/foo', json_body={'k': 'v'}, form_data={'k': 'v'}) # --- PUT(与 POST 对称) --- def test_put_json_body(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, {'ok': True}) assert client.put('/foo/1', json_body={'k': 'v'}) == {'ok': True} mock_req.assert_called_once_with( 'PUT', 'http://example/dolphinscheduler/foo/1', json={'k': 'v'}) def test_put_form_data(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, {'ok': True}) client.put('/foo/1', form_data={'name': 'x'}) mock_req.assert_called_once_with( 'PUT', 'http://example/dolphinscheduler/foo/1', data={'name': 'x'}) def test_put_non_2xx_raises(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(400, {}, text='Bad Request') mock_req.return_value.status_code = 400 with pytest.raises(RuntimeError, match='PUT .*failed rc=400'): client.put('/foo/1') def test_put_2xx_non_json_raises(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, json_data=None, text='HTML') with pytest.raises(RuntimeError, match='PUT.*返回非 JSON'): client.put('/foo/1') def test_put_json_and_form_mutex(client): with pytest.raises(ValueError, match='互斥'): client.put('/foo/1', json_body={'k': 'v'}, form_data={'k': 'v'}) # --- expect_business_ok:业务码 != 0 抛错 --- def test_business_ok_pass_when_code_zero(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, {'code': 0, 'data': {'id': 1}}) result = client.post('/foo', form_data={}, expect_business_ok=True) assert result == {'code': 0, 'data': {'id': 1}} def test_business_ok_raises_when_code_nonzero(client): """2xx + code != 0 抛 RuntimeError 含完整 response。""" with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp( 200, {'code': 10105, 'msg': "Required request parameter 'name' is not present"}) with pytest.raises(RuntimeError) as exc: client.post('/foo', form_data={}, expect_business_ok=True) msg = str(exc.value) assert 'POST /foo' in msg assert '业务码 != 0' in msg assert '10105' in msg def test_business_ok_off_default_passes_nonzero_code(client): """expect_business_ok=False(默认)时 code != 0 不抛错,原样返。""" with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, {'code': 50036, 'msg': 'oops'}) result = client.post('/foo', form_data={}) assert result == {'code': 50036, 'msg': 'oops'} def test_business_ok_works_for_get(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, {'code': 1, 'msg': 'fail'}) with pytest.raises(RuntimeError, match='GET .*业务码 != 0'): client.get('/projects', expect_business_ok=True) def test_business_ok_works_for_put(client): with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, {'code': 7, 'msg': 'denied'}) with pytest.raises(RuntimeError, match='PUT .*业务码 != 0'): client.put('/foo/1', form_data={}, expect_business_ok=True) def test_business_ok_skips_non_dict_response(client): """响应不是 dict(罕见,如 list)时 business_ok 不应抛错。""" with patch.object(client.session, 'request') as mock_req: mock_req.return_value = _mock_resp(200, [1, 2, 3]) assert client.get('/p', expect_business_ok=True) == [1, 2, 3] # --- project_code([projects] 段映射)--- def test_project_code_loaded_from_conf(tmp_path): conf = tmp_path / 'ds.ini' conf.write_text( '[dolphinscheduler]\n' 'base_url = http://example/dolphinscheduler\n' 'token = T\n' '\n' '[projects]\n' 'poyee-data-warehouse = 171673813105472\n' 'manual = 171673781143360\n' 'test = 171965331686208\n', encoding='utf-8', ) c = DSClient(conf_path=str(conf)) assert c.project_code('poyee-data-warehouse') == 171673813105472 assert c.project_code('manual') == 171673781143360 assert c.project_code('test') == 171965331686208 def test_project_code_unknown_name_raises(tmp_path): conf = tmp_path / 'ds.ini' conf.write_text( '[dolphinscheduler]\n' 'base_url = http://example/dolphinscheduler\n' 'token = T\n' '[projects]\n' 'foo = 1\n', encoding='utf-8', ) c = DSClient(conf_path=str(conf)) with pytest.raises(RuntimeError) as exc: c.project_code('bar') msg = str(exc.value) assert '项目码未注册' in msg assert 'bar' in msg assert '[projects]' in msg def test_project_code_section_optional(client): """[projects] 段缺失时不报错;查询任何 name 抛 '未注册'。""" # fixture 的 ds.ini 没有 [projects] 段 assert client._projects == {} with pytest.raises(RuntimeError, match='项目码未注册'): client.project_code('any')