import logging import threading import time from io import BytesIO from typing import Any, Optional import cv2 from PIL import Image from app.core.config import CameraSettings, settings logger = logging.getLogger(__name__) class CameraUnavailableError(RuntimeError): """相机不可用,例如依赖缺失或硬件初始化失败。""" class CameraCaptureError(RuntimeError): """相机抓拍失败,例如采集帧失败或图像编码失败。""" class CameraService: """ 树莓派相机服务。 设计上把硬件访问封装到这个类里,而不是直接写在接口函数中, 有几个明显好处: 1. 路由层只关心 HTTP 请求和响应,职责更清晰。 2. 以后如果要增加录像、参数调节、图片落盘等功能,可以继续在这里扩展。 3. 相机对象是重量级资源,做成服务之后更容易复用和统一关闭。 """ def __init__(self, settings: CameraSettings) -> None: self._settings = settings self._lock = threading.RLock() self._camera: Optional[Any] = None @property def is_initialized(self) -> bool: """用于健康检查,判断相机对象是否已经初始化完成。""" return self._camera is not None def initialize(self) -> None: """ 显式初始化相机。 这个方法适合在 FastAPI 启动阶段预热相机, 也适合你以后做手动重连、后台巡检时复用。 """ with self._lock: self._ensure_camera() def capture_jpeg(self) -> bytes: """ 抓拍一张图片,并直接返回 JPEG 二进制内容。 """ with self._lock: camera = self._ensure_camera() try: frame = camera.capture_array() frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) except Exception as exc: raise CameraCaptureError("相机抓拍失败,请检查相机连接和占用状态。") from exc # 当前设备输出 shape=(H, W, 3),PIL 直接接收前3通道颜色正确 if getattr(frame, "ndim", 0) != 3 or frame.shape[2] < 3: raise CameraCaptureError( f"相机返回了无法识别的帧结构: shape={getattr(frame, 'shape', None)}" ) try: image = Image.fromarray(frame[:, :, :3]) # image.save("temp.jpg") buffer = BytesIO() image.save( buffer, format="JPEG", quality=self._settings.jpeg_quality, ) return buffer.getvalue() except Exception as exc: raise CameraCaptureError("图片编码为 JPEG 失败。") from exc def close(self) -> None: """ 关闭相机资源。 FastAPI 服务退出时应调用这个方法,避免相机句柄残留, 否则后续重新启动服务时可能出现资源被占用的问题。 """ with self._lock: if self._camera is None: return try: self._camera.stop() except Exception: logger.exception("停止相机时发生异常。") finally: try: self._camera.close() except Exception: logger.exception("关闭相机句柄时发生异常。") finally: self._camera = None def _ensure_camera(self) -> Any: """ 延迟初始化相机。 """ if self._camera is not None: return self._camera camera = None try: from libcamera import controls from picamera2 import Picamera2 except ImportError as exc: raise CameraUnavailableError( "当前环境缺少树莓派相机依赖,请确认已安装 picamera2 和 libcamera。" ) from exc try: camera = Picamera2(camera_num=self._settings.camera_index) still_configuration = self._build_still_configuration(camera) camera.configure(still_configuration) camera.start() self._apply_optional_controls(camera, controls) time.sleep(self._settings.warmup_seconds) except Exception as exc: if camera is not None: try: camera.stop() except Exception: logger.exception("初始化失败后停止相机时发生异常。") try: camera.close() except Exception: logger.exception("初始化失败后关闭相机句柄时发生异常。") raise CameraUnavailableError( "树莓派相机初始化失败,请检查相机排线、驱动和权限配置。" ) from exc self._camera = camera return camera def _build_still_configuration(self, camera: Any) -> Any: """ 创建静态拍照配置。 默认优先尝试使用 `RGB888`,这样返回的数组通常就是 3 通道 RGB, 后续编码 JPEG 会更直接。 如果目标设备不支持指定格式,则自动回退到默认配置。 """ main_config = { "size": (self._settings.width, self._settings.height), } if self._settings.frame_format: main_config["format"] = self._settings.frame_format try: return camera.create_still_configuration(main=main_config) except Exception: if "format" not in main_config: raise logger.warning( "相机不支持指定的帧格式 %s,已回退到默认格式。", self._settings.frame_format, ) main_config.pop("format") return camera.create_still_configuration(main=main_config) def _apply_optional_controls(self, camera: Any, controls: Any) -> None: """ 应用可选相机参数。 对焦和白平衡不一定所有摄像头都支持,所以这里采用“尽力而为”的策略: 支持就启用,不支持则记录日志,但不阻断整个服务启动。 """ if self._settings.enable_continuous_autofocus: try: camera.set_controls({"AfMode": controls.AfModeEnum.Continuous}) except Exception: logger.warning("当前摄像头不支持连续自动对焦,已跳过。", exc_info=True) if self._settings.enable_auto_white_balance: try: camera.set_controls({"AwbMode": controls.AwbModeEnum.Auto}) except Exception: logger.warning("当前摄像头不支持自动白平衡,已跳过。", exc_info=True) _camera_service: Optional[CameraService] = None _camera_service_lock = threading.Lock() def get_camera_service() -> CameraService: """ 返回全局单例相机服务。 相机通常只需要维护一个共享实例,否则多个请求反复创建/关闭相机, 会明显增加拍照延迟,也更容易触发硬件占用问题。 """ global _camera_service if _camera_service is not None: return _camera_service with _camera_service_lock: if _camera_service is None: _camera_service = CameraService(settings.camera) return _camera_service