| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- import logging
- import threading
- import time
- from io import BytesIO
- from pathlib import Path
- from typing import Any, Optional, Union
- import cv2
- from PIL import Image
- from app.core.config import CameraSettings, settings
- logger = logging.getLogger(__name__)
- class CameraUnavailableError(RuntimeError):
- """相机不可用。"""
- class CameraCaptureError(RuntimeError):
- """相机抓拍失败。"""
- class CameraService:
- """树莓派相机服务。"""
- def __init__(self, config: CameraSettings) -> None:
- self._config = config
- self._camera: Optional[Any] = None
- self._lock = threading.RLock()
- @property
- def is_initialized(self) -> bool:
- return self._camera is not None
- def initialize(self) -> None:
- with self._lock:
- self._get_camera()
- def capture_jpeg(self) -> bytes:
- with self._lock:
- try:
- frame = self._get_camera().capture_array()
- except Exception as exc:
- raise CameraCaptureError("相机抓拍失败,请检查相机连接状态。") from exc
- if getattr(frame, "ndim", 0) != 3 or frame.shape[2] < 3:
- raise CameraCaptureError(
- f"相机返回了异常图像数据: shape={getattr(frame, 'shape', None)}"
- )
- try:
- rgb_frame = cv2.cvtColor(frame[:, :, :3], cv2.COLOR_BGR2RGB)
- image = Image.fromarray(rgb_frame)
- buffer = BytesIO()
- image.save(buffer, format="JPEG", quality=self._config.jpeg_quality)
- return buffer.getvalue()
- except Exception as exc:
- raise CameraCaptureError("图片编码失败。") from exc
- def capture_to_file(self, output_path: Union[str, Path]) -> Path:
- output_path = Path(output_path)
- output_path.parent.mkdir(parents=True, exist_ok=True)
- output_path.write_bytes(self.capture_jpeg())
- return output_path
- def close(self) -> None:
- with self._lock:
- camera = self._camera
- self._camera = None
- if camera is None:
- return
- try:
- camera.stop()
- except Exception:
- logger.warning("停止相机时发生异常。", exc_info=True)
- try:
- camera.close()
- except Exception:
- logger.warning("关闭相机时发生异常。", exc_info=True)
- def _get_camera(self) -> Any:
- if self._camera is not None:
- return self._camera
- try:
- from libcamera import controls
- from picamera2 import Picamera2
- except ImportError as exc:
- raise CameraUnavailableError(
- "当前环境缺少 picamera2 或 libcamera。"
- ) from exc
- camera = None
- try:
- camera = Picamera2(camera_num=self._config.camera_index)
- try:
- camera.configure(
- camera.create_still_configuration(
- main={
- "size": (self._config.width, self._config.height),
- "format": self._config.frame_format,
- }
- )
- )
- except Exception:
- logger.warning("相机不支持 %s,已回退默认格式。", self._config.frame_format)
- camera.configure(
- camera.create_still_configuration(
- main={"size": (self._config.width, self._config.height)}
- )
- )
- camera.start()
- if self._config.enable_continuous_autofocus:
- try:
- camera.set_controls({"AfMode": controls.AfModeEnum.Continuous})
- except Exception:
- logger.warning("当前摄像头不支持自动对焦。", exc_info=True)
- if self._config.enable_auto_white_balance:
- try:
- camera.set_controls({"AwbMode": controls.AwbModeEnum.Auto})
- except Exception:
- logger.warning("当前摄像头不支持自动白平衡。", exc_info=True)
- time.sleep(self._config.warmup_seconds)
- except Exception as exc:
- if camera is not None:
- try:
- camera.close()
- except Exception:
- logger.warning("相机初始化失败后关闭资源时发生异常。", exc_info=True)
- raise CameraUnavailableError("树莓派相机初始化失败。") from exc
- self._camera = camera
- return camera
- _camera_service: Optional[CameraService] = None
- _camera_service_lock = threading.Lock()
- def get_camera_service() -> CameraService:
- global _camera_service
- if _camera_service is not None:
- return _camera_service
- with _camera_service_lock:
- if _camera_service is None:
- _camera_service = CameraService(settings.camera)
- return _camera_service
|