camera_service.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. import logging
  2. import threading
  3. import time
  4. from io import BytesIO
  5. from typing import Any, Optional
  6. import cv2
  7. from PIL import Image
  8. from app.core.config import CameraSettings, settings
  9. logger = logging.getLogger(__name__)
  10. class CameraUnavailableError(RuntimeError):
  11. """相机不可用,例如依赖缺失或硬件初始化失败。"""
  12. class CameraCaptureError(RuntimeError):
  13. """相机抓拍失败,例如采集帧失败或图像编码失败。"""
  14. class CameraService:
  15. """
  16. 树莓派相机服务。
  17. 设计上把硬件访问封装到这个类里,而不是直接写在接口函数中,
  18. 有几个明显好处:
  19. 1. 路由层只关心 HTTP 请求和响应,职责更清晰。
  20. 2. 以后如果要增加录像、参数调节、图片落盘等功能,可以继续在这里扩展。
  21. 3. 相机对象是重量级资源,做成服务之后更容易复用和统一关闭。
  22. """
  23. def __init__(self, settings: CameraSettings) -> None:
  24. self._settings = settings
  25. self._lock = threading.RLock()
  26. self._camera: Optional[Any] = None
  27. @property
  28. def is_initialized(self) -> bool:
  29. """用于健康检查,判断相机对象是否已经初始化完成。"""
  30. return self._camera is not None
  31. def initialize(self) -> None:
  32. """
  33. 显式初始化相机。
  34. 这个方法适合在 FastAPI 启动阶段预热相机,
  35. 也适合你以后做手动重连、后台巡检时复用。
  36. """
  37. with self._lock:
  38. self._ensure_camera()
  39. def capture_jpeg(self) -> bytes:
  40. """
  41. 抓拍一张图片,并直接返回 JPEG 二进制内容。
  42. 接口层拿到这个字节流之后,就可以直接通过 HTTP 返回
  43. """
  44. with self._lock:
  45. camera = self._ensure_camera()
  46. try:
  47. frame = camera.capture_array()
  48. except Exception as exc:
  49. raise CameraCaptureError("相机抓拍失败,请检查相机连接和占用状态。") from exc
  50. rgb_frame = self._normalize_frame(frame)
  51. try:
  52. image = Image.fromarray(rgb_frame)
  53. buffer = BytesIO()
  54. image.save(
  55. buffer,
  56. format="JPEG",
  57. quality=self._settings.jpeg_quality,
  58. )
  59. return buffer.getvalue()
  60. except Exception as exc:
  61. raise CameraCaptureError("图片编码为 JPEG 失败。") from exc
  62. def close(self) -> None:
  63. """
  64. 关闭相机资源。
  65. FastAPI 服务退出时应调用这个方法,避免相机句柄残留,
  66. 否则后续重新启动服务时可能出现资源被占用的问题。
  67. """
  68. with self._lock:
  69. if self._camera is None:
  70. return
  71. try:
  72. self._camera.stop()
  73. except Exception:
  74. logger.exception("停止相机时发生异常。")
  75. finally:
  76. try:
  77. self._camera.close()
  78. except Exception:
  79. logger.exception("关闭相机句柄时发生异常。")
  80. finally:
  81. self._camera = None
  82. def _ensure_camera(self) -> Any:
  83. """
  84. 延迟初始化相机。
  85. """
  86. if self._camera is not None:
  87. return self._camera
  88. camera = None
  89. try:
  90. from libcamera import controls
  91. from picamera2 import Picamera2
  92. except ImportError as exc:
  93. raise CameraUnavailableError(
  94. "当前环境缺少树莓派相机依赖,请确认已安装 picamera2 和 libcamera。"
  95. ) from exc
  96. try:
  97. camera = Picamera2(camera_num=self._settings.camera_index)
  98. still_configuration = self._build_still_configuration(camera)
  99. camera.configure(still_configuration)
  100. camera.start()
  101. self._apply_optional_controls(camera, controls)
  102. time.sleep(self._settings.warmup_seconds)
  103. except Exception as exc:
  104. if camera is not None:
  105. try:
  106. camera.stop()
  107. except Exception:
  108. logger.exception("初始化失败后停止相机时发生异常。")
  109. try:
  110. camera.close()
  111. except Exception:
  112. logger.exception("初始化失败后关闭相机句柄时发生异常。")
  113. raise CameraUnavailableError(
  114. "树莓派相机初始化失败,请检查相机排线、驱动和权限配置。"
  115. ) from exc
  116. self._camera = camera
  117. return camera
  118. def _build_still_configuration(self, camera: Any) -> Any:
  119. """
  120. 创建静态拍照配置。
  121. 默认优先尝试使用 `RGB888`,这样返回的数组通常就是 3 通道 RGB,
  122. 后续编码 JPEG 会更直接。
  123. 如果目标设备不支持指定格式,则自动回退到默认配置。
  124. """
  125. main_config = {
  126. "size": (self._settings.width, self._settings.height),
  127. }
  128. if self._settings.frame_format:
  129. main_config["format"] = self._settings.frame_format
  130. try:
  131. return camera.create_still_configuration(main=main_config)
  132. except Exception:
  133. if "format" not in main_config:
  134. raise
  135. logger.warning(
  136. "相机不支持指定的帧格式 %s,已回退到默认格式。",
  137. self._settings.frame_format,
  138. )
  139. main_config.pop("format")
  140. return camera.create_still_configuration(main=main_config)
  141. def _apply_optional_controls(self, camera: Any, controls: Any) -> None:
  142. """
  143. 应用可选相机参数。
  144. 对焦和白平衡不一定所有摄像头都支持,所以这里采用“尽力而为”的策略:
  145. 支持就启用,不支持则记录日志,但不阻断整个服务启动。
  146. """
  147. if self._settings.enable_continuous_autofocus:
  148. try:
  149. camera.set_controls({"AfMode": controls.AfModeEnum.Continuous})
  150. except Exception:
  151. logger.warning("当前摄像头不支持连续自动对焦,已跳过。", exc_info=True)
  152. if self._settings.enable_auto_white_balance:
  153. try:
  154. camera.set_controls({"AwbMode": controls.AwbModeEnum.Auto})
  155. except Exception:
  156. logger.warning("当前摄像头不支持自动白平衡,已跳过。", exc_info=True)
  157. def _normalize_frame(self, frame: Any) -> Any:
  158. """
  159. 统一处理相机返回的帧格式,最终转换成适合 JPEG 编码的 RGB 数据。
  160. `Picamera2` 在不同配置下,返回的数组通道数可能不同:
  161. - 4 通道:常见于 RGBA
  162. - 3 通道:通常是 RGB
  163. - 1 通道:灰度图
  164. """
  165. if frame is None:
  166. raise CameraCaptureError("相机返回了空帧。")
  167. if getattr(frame, "ndim", 0) == 2:
  168. return cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
  169. if getattr(frame, "ndim", 0) != 3:
  170. raise CameraCaptureError("相机返回了无法识别的帧结构。")
  171. channel_count = frame.shape[2]
  172. if channel_count == 4:
  173. return cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB)
  174. if channel_count == 3:
  175. return frame
  176. if channel_count == 1:
  177. return cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
  178. raise CameraCaptureError(f"暂不支持 {channel_count} 通道的图像数据。")
  179. _camera_service: Optional[CameraService] = None
  180. _camera_service_lock = threading.Lock()
  181. def get_camera_service() -> CameraService:
  182. """
  183. 返回全局单例相机服务。
  184. 相机通常只需要维护一个共享实例,否则多个请求反复创建/关闭相机,
  185. 会明显增加拍照延迟,也更容易触发硬件占用问题。
  186. """
  187. global _camera_service
  188. if _camera_service is not None:
  189. return _camera_service
  190. with _camera_service_lock:
  191. if _camera_service is None:
  192. _camera_service = CameraService(settings.camera)
  193. return _camera_service