| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- import logging
- import threading
- import time
- from io import BytesIO
- from typing import Any, Optional
- import cv2
- from PIL import Image
- from app.core.config import CameraSettings, settings
- logger = logging.getLogger(__name__)
- class CameraUnavailableError(RuntimeError):
- """相机不可用,例如依赖缺失或硬件初始化失败。"""
- class CameraCaptureError(RuntimeError):
- """相机抓拍失败,例如采集帧失败或图像编码失败。"""
- class CameraService:
- """
- 树莓派相机服务。
- 设计上把硬件访问封装到这个类里,而不是直接写在接口函数中,
- 有几个明显好处:
- 1. 路由层只关心 HTTP 请求和响应,职责更清晰。
- 2. 以后如果要增加录像、参数调节、图片落盘等功能,可以继续在这里扩展。
- 3. 相机对象是重量级资源,做成服务之后更容易复用和统一关闭。
- """
- def __init__(self, settings: CameraSettings) -> None:
- self._settings = settings
- self._lock = threading.RLock()
- self._camera: Optional[Any] = None
- @property
- def is_initialized(self) -> bool:
- """用于健康检查,判断相机对象是否已经初始化完成。"""
- return self._camera is not None
- def initialize(self) -> None:
- """
- 显式初始化相机。
- 这个方法适合在 FastAPI 启动阶段预热相机,
- 也适合你以后做手动重连、后台巡检时复用。
- """
- with self._lock:
- self._ensure_camera()
- def capture_jpeg(self) -> bytes:
- """
- 抓拍一张图片,并直接返回 JPEG 二进制内容。
- """
- with self._lock:
- camera = self._ensure_camera()
- try:
- frame = camera.capture_array()
- frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
- except Exception as exc:
- raise CameraCaptureError("相机抓拍失败,请检查相机连接和占用状态。") from exc
- # 当前设备输出 shape=(H, W, 3),PIL 直接接收前3通道颜色正确
- if getattr(frame, "ndim", 0) != 3 or frame.shape[2] < 3:
- raise CameraCaptureError(
- f"相机返回了无法识别的帧结构: shape={getattr(frame, 'shape', None)}"
- )
- try:
- image = Image.fromarray(frame[:, :, :3])
- # image.save("temp.jpg")
- buffer = BytesIO()
- image.save(
- buffer,
- format="JPEG",
- quality=self._settings.jpeg_quality,
- )
- return buffer.getvalue()
- except Exception as exc:
- raise CameraCaptureError("图片编码为 JPEG 失败。") from exc
- def close(self) -> None:
- """
- 关闭相机资源。
- FastAPI 服务退出时应调用这个方法,避免相机句柄残留,
- 否则后续重新启动服务时可能出现资源被占用的问题。
- """
- with self._lock:
- if self._camera is None:
- return
- try:
- self._camera.stop()
- except Exception:
- logger.exception("停止相机时发生异常。")
- finally:
- try:
- self._camera.close()
- except Exception:
- logger.exception("关闭相机句柄时发生异常。")
- finally:
- self._camera = None
- def _ensure_camera(self) -> Any:
- """
- 延迟初始化相机。
- """
- if self._camera is not None:
- return self._camera
- camera = None
- try:
- from libcamera import controls
- from picamera2 import Picamera2
- except ImportError as exc:
- raise CameraUnavailableError(
- "当前环境缺少树莓派相机依赖,请确认已安装 picamera2 和 libcamera。"
- ) from exc
- try:
- camera = Picamera2(camera_num=self._settings.camera_index)
- still_configuration = self._build_still_configuration(camera)
- camera.configure(still_configuration)
- camera.start()
- self._apply_optional_controls(camera, controls)
- time.sleep(self._settings.warmup_seconds)
- except Exception as exc:
- if camera is not None:
- try:
- camera.stop()
- except Exception:
- logger.exception("初始化失败后停止相机时发生异常。")
- try:
- camera.close()
- except Exception:
- logger.exception("初始化失败后关闭相机句柄时发生异常。")
- raise CameraUnavailableError(
- "树莓派相机初始化失败,请检查相机排线、驱动和权限配置。"
- ) from exc
- self._camera = camera
- return camera
- def _build_still_configuration(self, camera: Any) -> Any:
- """
- 创建静态拍照配置。
- 默认优先尝试使用 `RGB888`,这样返回的数组通常就是 3 通道 RGB,
- 后续编码 JPEG 会更直接。
- 如果目标设备不支持指定格式,则自动回退到默认配置。
- """
- main_config = {
- "size": (self._settings.width, self._settings.height),
- }
- if self._settings.frame_format:
- main_config["format"] = self._settings.frame_format
- try:
- return camera.create_still_configuration(main=main_config)
- except Exception:
- if "format" not in main_config:
- raise
- logger.warning(
- "相机不支持指定的帧格式 %s,已回退到默认格式。",
- self._settings.frame_format,
- )
- main_config.pop("format")
- return camera.create_still_configuration(main=main_config)
- def _apply_optional_controls(self, camera: Any, controls: Any) -> None:
- """
- 应用可选相机参数。
- 对焦和白平衡不一定所有摄像头都支持,所以这里采用“尽力而为”的策略:
- 支持就启用,不支持则记录日志,但不阻断整个服务启动。
- """
- if self._settings.enable_continuous_autofocus:
- try:
- camera.set_controls({"AfMode": controls.AfModeEnum.Continuous})
- except Exception:
- logger.warning("当前摄像头不支持连续自动对焦,已跳过。", exc_info=True)
- if self._settings.enable_auto_white_balance:
- try:
- camera.set_controls({"AwbMode": controls.AwbModeEnum.Auto})
- except Exception:
- logger.warning("当前摄像头不支持自动白平衡,已跳过。", exc_info=True)
- _camera_service: Optional[CameraService] = None
- _camera_service_lock = threading.Lock()
- def get_camera_service() -> CameraService:
- """
- 返回全局单例相机服务。
- 相机通常只需要维护一个共享实例,否则多个请求反复创建/关闭相机,
- 会明显增加拍照延迟,也更容易触发硬件占用问题。
- """
- global _camera_service
- if _camera_service is not None:
- return _camera_service
- with _camera_service_lock:
- if _camera_service is None:
- _camera_service = CameraService(settings.camera)
- return _camera_service
|