| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- from datetime import datetime
- from fastapi import APIRouter, HTTPException, Query, Request, Response
- from app.core.config import settings
- from app.services import (
- CameraCaptureError,
- CameraUnavailableError,
- MqttCaptureBusyError,
- MqttCaptureError,
- get_camera_service,
- get_mqtt_capture_service,
- )
- router = APIRouter(
- prefix="/camera",
- tags=["camera"],
- )
- @router.api_route("/capture", methods=["GET"], summary="拍照并直接返回图片")
- def capture_image(
- download: bool = Query(
- default=False,
- description="是否以附件下载方式返回图片。默认 False,浏览器会直接预览。",
- ),
- ) -> Response:
- """
- 拍照接口。
- 调用这个接口时,服务会即时触发树莓派相机拍照,
- 然后把最新拍到的图片直接作为 HTTP 响应返回。
- """
- camera_service = get_camera_service()
- try:
- image_bytes = camera_service.capture_jpeg()
- except (CameraUnavailableError, CameraCaptureError) as exc:
- raise HTTPException(status_code=503, detail=str(exc)) from exc
- filename = f"capture_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
- disposition = "attachment" if download else "inline"
- return Response(
- content=image_bytes,
- media_type="image/jpeg",
- headers={
- "Content-Disposition": f'{disposition}; filename="{filename}"',
- "Cache-Control": "no-store",
- },
- )
- @router.api_route(
- "/capture-pair",
- methods=["GET", "POST"],
- summary="通过 MQTT 联动拍摄正反两张图片",
- )
- def capture_pair_via_mqtt(request: Request) -> dict[str, object]:
- """
- 机械臂联动拍照接口。
- 调用流程:
- 1. 本接口收到请求后,临时连接 MQTT。
- 2. 发送 start 指令给机械臂。
- 3. 收到 `id=1` / `id=2` 的拍照指令后,分别覆盖保存到静态目录。
- 4. 等待 `status=4` 作为流程结束信号。
- 5. 断开 MQTT,返回两张图片的静态访问地址。
- 返回 JSON 而不是直接返回图片,是因为这条流程一次会产出两张图,
- 更适合用 URL 的方式交给前端或其他服务继续处理。
- """
- mqtt_capture_service = get_mqtt_capture_service()
- try:
- result = mqtt_capture_service.capture_pair()
- except MqttCaptureBusyError as exc:
- raise HTTPException(status_code=409, detail=str(exc)) from exc
- except MqttCaptureError as exc:
- raise HTTPException(status_code=503, detail=str(exc)) from exc
- front_url = str(
- request.url_for("static", path=settings.static.front_relative_path.as_posix())
- )
- back_url = str(
- request.url_for("static", path=settings.static.back_relative_path.as_posix())
- )
- return {
- "request_id": result["request_id"],
- "status": result["status"],
- "front_url": front_url,
- "back_url": back_url,
- }
|