import logging import threading import time from io import BytesIO from pathlib import Path from typing import Any, Optional, Union import cv2 from PIL import Image from app.core.config import CameraSettings, settings logger = logging.getLogger(__name__) class CameraUnavailableError(RuntimeError): """相机不可用。""" class CameraCaptureError(RuntimeError): """相机抓拍失败。""" class CameraService: """树莓派相机服务。""" def __init__(self, config: CameraSettings) -> None: self._config = config self._camera: Optional[Any] = None self._lock = threading.RLock() @property def is_initialized(self) -> bool: return self._camera is not None def initialize(self) -> None: with self._lock: self._get_camera() def capture_jpeg(self) -> bytes: with self._lock: try: frame = self._get_camera().capture_array() except Exception as exc: raise CameraCaptureError("相机抓拍失败,请检查相机连接状态。") from exc if getattr(frame, "ndim", 0) != 3 or frame.shape[2] < 3: raise CameraCaptureError( f"相机返回了异常图像数据: shape={getattr(frame, 'shape', None)}" ) try: rgb_frame = cv2.cvtColor(frame[:, :, :3], cv2.COLOR_BGR2RGB) image = Image.fromarray(rgb_frame) buffer = BytesIO() image.save(buffer, format="JPEG", quality=self._config.jpeg_quality) return buffer.getvalue() except Exception as exc: raise CameraCaptureError("图片编码失败。") from exc def capture_to_file(self, output_path: Union[str, Path]) -> Path: output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_bytes(self.capture_jpeg()) return output_path def close(self) -> None: with self._lock: camera = self._camera self._camera = None if camera is None: return try: camera.stop() except Exception: logger.warning("停止相机时发生异常。", exc_info=True) try: camera.close() except Exception: logger.warning("关闭相机时发生异常。", exc_info=True) def _get_camera(self) -> Any: if self._camera is not None: return self._camera try: from libcamera import controls from picamera2 import Picamera2 except ImportError as exc: raise CameraUnavailableError( "当前环境缺少 picamera2 或 libcamera。" ) from exc camera = None try: camera = Picamera2(camera_num=self._config.camera_index) try: camera.configure( camera.create_still_configuration( main={ "size": (self._config.width, self._config.height), "format": self._config.frame_format, } ) ) except Exception: logger.warning("相机不支持 %s,已回退默认格式。", self._config.frame_format) camera.configure( camera.create_still_configuration( main={"size": (self._config.width, self._config.height)} ) ) camera.start() if self._config.enable_continuous_autofocus: try: camera.set_controls({"AfMode": controls.AfModeEnum.Continuous}) except Exception: logger.warning("当前摄像头不支持自动对焦。", exc_info=True) if self._config.enable_auto_white_balance: try: camera.set_controls({"AwbMode": controls.AwbModeEnum.Auto}) except Exception: logger.warning("当前摄像头不支持自动白平衡。", exc_info=True) time.sleep(self._config.warmup_seconds) except Exception as exc: if camera is not None: try: camera.close() except Exception: logger.warning("相机初始化失败后关闭资源时发生异常。", exc_info=True) raise CameraUnavailableError("树莓派相机初始化失败。") from exc self._camera = camera return camera _camera_service: Optional[CameraService] = None _camera_service_lock = threading.Lock() def get_camera_service() -> CameraService: global _camera_service if _camera_service is not None: return _camera_service with _camera_service_lock: if _camera_service is None: _camera_service = CameraService(settings.camera) return _camera_service