camera_service.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import logging
  2. import threading
  3. import time
  4. from io import BytesIO
  5. from pathlib import Path
  6. from typing import Any, Optional, Union
  7. import cv2
  8. from PIL import Image
  9. from app.core.config import CameraSettings, settings
  10. logger = logging.getLogger(__name__)
  11. class CameraUnavailableError(RuntimeError):
  12. """相机不可用。"""
  13. class CameraCaptureError(RuntimeError):
  14. """相机抓拍失败。"""
  15. class CameraService:
  16. """树莓派相机服务。"""
  17. def __init__(self, config: CameraSettings) -> None:
  18. self._config = config
  19. self._camera: Optional[Any] = None
  20. self._lock = threading.RLock()
  21. @property
  22. def is_initialized(self) -> bool:
  23. return self._camera is not None
  24. def initialize(self) -> None:
  25. with self._lock:
  26. self._get_camera()
  27. def capture_jpeg(self) -> bytes:
  28. with self._lock:
  29. try:
  30. frame = self._get_camera().capture_array()
  31. except Exception as exc:
  32. raise CameraCaptureError("相机抓拍失败,请检查相机连接状态。") from exc
  33. if getattr(frame, "ndim", 0) != 3 or frame.shape[2] < 3:
  34. raise CameraCaptureError(
  35. f"相机返回了异常图像数据: shape={getattr(frame, 'shape', None)}"
  36. )
  37. try:
  38. rgb_frame = cv2.cvtColor(frame[:, :, :3], cv2.COLOR_BGR2RGB)
  39. image = Image.fromarray(rgb_frame)
  40. buffer = BytesIO()
  41. image.save(buffer, format="JPEG", quality=self._config.jpeg_quality)
  42. return buffer.getvalue()
  43. except Exception as exc:
  44. raise CameraCaptureError("图片编码失败。") from exc
  45. def capture_to_file(self, output_path: Union[str, Path]) -> Path:
  46. output_path = Path(output_path)
  47. output_path.parent.mkdir(parents=True, exist_ok=True)
  48. output_path.write_bytes(self.capture_jpeg())
  49. return output_path
  50. def close(self) -> None:
  51. with self._lock:
  52. camera = self._camera
  53. self._camera = None
  54. if camera is None:
  55. return
  56. try:
  57. camera.stop()
  58. except Exception:
  59. logger.warning("停止相机时发生异常。", exc_info=True)
  60. try:
  61. camera.close()
  62. except Exception:
  63. logger.warning("关闭相机时发生异常。", exc_info=True)
  64. def _get_camera(self) -> Any:
  65. if self._camera is not None:
  66. return self._camera
  67. try:
  68. from libcamera import controls
  69. from picamera2 import Picamera2
  70. except ImportError as exc:
  71. raise CameraUnavailableError(
  72. "当前环境缺少 picamera2 或 libcamera。"
  73. ) from exc
  74. camera = None
  75. try:
  76. camera = Picamera2(camera_num=self._config.camera_index)
  77. try:
  78. camera.configure(
  79. camera.create_still_configuration(
  80. main={
  81. "size": (self._config.width, self._config.height),
  82. "format": self._config.frame_format,
  83. }
  84. )
  85. )
  86. except Exception:
  87. logger.warning("相机不支持 %s,已回退默认格式。", self._config.frame_format)
  88. camera.configure(
  89. camera.create_still_configuration(
  90. main={"size": (self._config.width, self._config.height)}
  91. )
  92. )
  93. camera.start()
  94. if self._config.enable_continuous_autofocus:
  95. try:
  96. camera.set_controls({"AfMode": controls.AfModeEnum.Continuous})
  97. except Exception:
  98. logger.warning("当前摄像头不支持自动对焦。", exc_info=True)
  99. if self._config.enable_auto_white_balance:
  100. try:
  101. camera.set_controls({"AwbMode": controls.AwbModeEnum.Auto})
  102. except Exception:
  103. logger.warning("当前摄像头不支持自动白平衡。", exc_info=True)
  104. time.sleep(self._config.warmup_seconds)
  105. except Exception as exc:
  106. if camera is not None:
  107. try:
  108. camera.close()
  109. except Exception:
  110. logger.warning("相机初始化失败后关闭资源时发生异常。", exc_info=True)
  111. raise CameraUnavailableError("树莓派相机初始化失败。") from exc
  112. self._camera = camera
  113. return camera
  114. _camera_service: Optional[CameraService] = None
  115. _camera_service_lock = threading.Lock()
  116. def get_camera_service() -> CameraService:
  117. global _camera_service
  118. if _camera_service is not None:
  119. return _camera_service
  120. with _camera_service_lock:
  121. if _camera_service is None:
  122. _camera_service = CameraService(settings.camera)
  123. return _camera_service