camera_service.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. import logging
  2. import threading
  3. import time
  4. from io import BytesIO
  5. from typing import Any, Optional
  6. import cv2
  7. from PIL import Image
  8. from app.core.config import CameraSettings, settings
  9. logger = logging.getLogger(__name__)
  10. class CameraUnavailableError(RuntimeError):
  11. """相机不可用,例如依赖缺失或硬件初始化失败。"""
  12. class CameraCaptureError(RuntimeError):
  13. """相机抓拍失败,例如采集帧失败或图像编码失败。"""
  14. class CameraService:
  15. """
  16. 树莓派相机服务。
  17. 设计上把硬件访问封装到这个类里,而不是直接写在接口函数中,
  18. 有几个明显好处:
  19. 1. 路由层只关心 HTTP 请求和响应,职责更清晰。
  20. 2. 以后如果要增加录像、参数调节、图片落盘等功能,可以继续在这里扩展。
  21. 3. 相机对象是重量级资源,做成服务之后更容易复用和统一关闭。
  22. """
  23. def __init__(self, settings: CameraSettings) -> None:
  24. self._settings = settings
  25. self._lock = threading.RLock()
  26. self._camera: Optional[Any] = None
  27. @property
  28. def is_initialized(self) -> bool:
  29. """用于健康检查,判断相机对象是否已经初始化完成。"""
  30. return self._camera is not None
  31. def initialize(self) -> None:
  32. """
  33. 显式初始化相机。
  34. 这个方法适合在 FastAPI 启动阶段预热相机,
  35. 也适合你以后做手动重连、后台巡检时复用。
  36. """
  37. with self._lock:
  38. self._ensure_camera()
  39. def capture_jpeg(self) -> bytes:
  40. """
  41. 抓拍一张图片,并直接返回 JPEG 二进制内容。
  42. """
  43. with self._lock:
  44. camera = self._ensure_camera()
  45. try:
  46. frame = camera.capture_array()
  47. frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
  48. except Exception as exc:
  49. raise CameraCaptureError("相机抓拍失败,请检查相机连接和占用状态。") from exc
  50. # 当前设备输出 shape=(H, W, 3),PIL 直接接收前3通道颜色正确
  51. if getattr(frame, "ndim", 0) != 3 or frame.shape[2] < 3:
  52. raise CameraCaptureError(
  53. f"相机返回了无法识别的帧结构: shape={getattr(frame, 'shape', None)}"
  54. )
  55. try:
  56. image = Image.fromarray(frame[:, :, :3])
  57. # image.save("temp.jpg")
  58. buffer = BytesIO()
  59. image.save(
  60. buffer,
  61. format="JPEG",
  62. quality=self._settings.jpeg_quality,
  63. )
  64. return buffer.getvalue()
  65. except Exception as exc:
  66. raise CameraCaptureError("图片编码为 JPEG 失败。") from exc
  67. def close(self) -> None:
  68. """
  69. 关闭相机资源。
  70. FastAPI 服务退出时应调用这个方法,避免相机句柄残留,
  71. 否则后续重新启动服务时可能出现资源被占用的问题。
  72. """
  73. with self._lock:
  74. if self._camera is None:
  75. return
  76. try:
  77. self._camera.stop()
  78. except Exception:
  79. logger.exception("停止相机时发生异常。")
  80. finally:
  81. try:
  82. self._camera.close()
  83. except Exception:
  84. logger.exception("关闭相机句柄时发生异常。")
  85. finally:
  86. self._camera = None
  87. def _ensure_camera(self) -> Any:
  88. """
  89. 延迟初始化相机。
  90. """
  91. if self._camera is not None:
  92. return self._camera
  93. camera = None
  94. try:
  95. from libcamera import controls
  96. from picamera2 import Picamera2
  97. except ImportError as exc:
  98. raise CameraUnavailableError(
  99. "当前环境缺少树莓派相机依赖,请确认已安装 picamera2 和 libcamera。"
  100. ) from exc
  101. try:
  102. camera = Picamera2(camera_num=self._settings.camera_index)
  103. still_configuration = self._build_still_configuration(camera)
  104. camera.configure(still_configuration)
  105. camera.start()
  106. self._apply_optional_controls(camera, controls)
  107. time.sleep(self._settings.warmup_seconds)
  108. except Exception as exc:
  109. if camera is not None:
  110. try:
  111. camera.stop()
  112. except Exception:
  113. logger.exception("初始化失败后停止相机时发生异常。")
  114. try:
  115. camera.close()
  116. except Exception:
  117. logger.exception("初始化失败后关闭相机句柄时发生异常。")
  118. raise CameraUnavailableError(
  119. "树莓派相机初始化失败,请检查相机排线、驱动和权限配置。"
  120. ) from exc
  121. self._camera = camera
  122. return camera
  123. def _build_still_configuration(self, camera: Any) -> Any:
  124. """
  125. 创建静态拍照配置。
  126. 默认优先尝试使用 `RGB888`,这样返回的数组通常就是 3 通道 RGB,
  127. 后续编码 JPEG 会更直接。
  128. 如果目标设备不支持指定格式,则自动回退到默认配置。
  129. """
  130. main_config = {
  131. "size": (self._settings.width, self._settings.height),
  132. }
  133. if self._settings.frame_format:
  134. main_config["format"] = self._settings.frame_format
  135. try:
  136. return camera.create_still_configuration(main=main_config)
  137. except Exception:
  138. if "format" not in main_config:
  139. raise
  140. logger.warning(
  141. "相机不支持指定的帧格式 %s,已回退到默认格式。",
  142. self._settings.frame_format,
  143. )
  144. main_config.pop("format")
  145. return camera.create_still_configuration(main=main_config)
  146. def _apply_optional_controls(self, camera: Any, controls: Any) -> None:
  147. """
  148. 应用可选相机参数。
  149. 对焦和白平衡不一定所有摄像头都支持,所以这里采用“尽力而为”的策略:
  150. 支持就启用,不支持则记录日志,但不阻断整个服务启动。
  151. """
  152. if self._settings.enable_continuous_autofocus:
  153. try:
  154. camera.set_controls({"AfMode": controls.AfModeEnum.Continuous})
  155. except Exception:
  156. logger.warning("当前摄像头不支持连续自动对焦,已跳过。", exc_info=True)
  157. if self._settings.enable_auto_white_balance:
  158. try:
  159. camera.set_controls({"AwbMode": controls.AwbModeEnum.Auto})
  160. except Exception:
  161. logger.warning("当前摄像头不支持自动白平衡,已跳过。", exc_info=True)
  162. _camera_service: Optional[CameraService] = None
  163. _camera_service_lock = threading.Lock()
  164. def get_camera_service() -> CameraService:
  165. """
  166. 返回全局单例相机服务。
  167. 相机通常只需要维护一个共享实例,否则多个请求反复创建/关闭相机,
  168. 会明显增加拍照延迟,也更容易触发硬件占用问题。
  169. """
  170. global _camera_service
  171. if _camera_service is not None:
  172. return _camera_service
  173. with _camera_service_lock:
  174. if _camera_service is None:
  175. _camera_service = CameraService(settings.camera)
  176. return _camera_service