|
|
@@ -2,7 +2,8 @@ import logging
|
|
|
import threading
|
|
|
import time
|
|
|
from io import BytesIO
|
|
|
-from typing import Any, Optional
|
|
|
+from pathlib import Path
|
|
|
+from typing import Any, Optional, Union
|
|
|
|
|
|
import cv2
|
|
|
from PIL import Image
|
|
|
@@ -13,202 +14,138 @@ logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class CameraUnavailableError(RuntimeError):
|
|
|
- """相机不可用,例如依赖缺失或硬件初始化失败。"""
|
|
|
+ """相机不可用。"""
|
|
|
|
|
|
|
|
|
class CameraCaptureError(RuntimeError):
|
|
|
- """相机抓拍失败,例如采集帧失败或图像编码失败。"""
|
|
|
+ """相机抓拍失败。"""
|
|
|
|
|
|
|
|
|
class CameraService:
|
|
|
- """
|
|
|
- 树莓派相机服务。
|
|
|
-
|
|
|
- 设计上把硬件访问封装到这个类里,而不是直接写在接口函数中,
|
|
|
- 有几个明显好处:
|
|
|
- 1. 路由层只关心 HTTP 请求和响应,职责更清晰。
|
|
|
- 2. 以后如果要增加录像、参数调节、图片落盘等功能,可以继续在这里扩展。
|
|
|
- 3. 相机对象是重量级资源,做成服务之后更容易复用和统一关闭。
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self, settings: CameraSettings) -> None:
|
|
|
- self._settings = settings
|
|
|
- self._lock = threading.RLock()
|
|
|
+ """树莓派相机服务。"""
|
|
|
+
|
|
|
+ def __init__(self, config: CameraSettings) -> None:
|
|
|
+ self._config = config
|
|
|
self._camera: Optional[Any] = None
|
|
|
+ self._lock = threading.RLock()
|
|
|
|
|
|
@property
|
|
|
def is_initialized(self) -> bool:
|
|
|
- """用于健康检查,判断相机对象是否已经初始化完成。"""
|
|
|
return self._camera is not None
|
|
|
|
|
|
def initialize(self) -> None:
|
|
|
- """
|
|
|
- 显式初始化相机。
|
|
|
-
|
|
|
- 这个方法适合在 FastAPI 启动阶段预热相机,
|
|
|
- 也适合你以后做手动重连、后台巡检时复用。
|
|
|
- """
|
|
|
with self._lock:
|
|
|
- self._ensure_camera()
|
|
|
+ self._get_camera()
|
|
|
|
|
|
def capture_jpeg(self) -> bytes:
|
|
|
- """
|
|
|
- 抓拍一张图片,并直接返回 JPEG 二进制内容。
|
|
|
- """
|
|
|
with self._lock:
|
|
|
- camera = self._ensure_camera()
|
|
|
-
|
|
|
try:
|
|
|
- frame = camera.capture_array()
|
|
|
- frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
+ frame = self._get_camera().capture_array()
|
|
|
except Exception as exc:
|
|
|
- raise CameraCaptureError("相机抓拍失败,请检查相机连接和占用状态。") from exc
|
|
|
+ raise CameraCaptureError("相机抓拍失败,请检查相机连接状态。") from exc
|
|
|
|
|
|
- # 当前设备输出 shape=(H, W, 3),PIL 直接接收前3通道颜色正确
|
|
|
if getattr(frame, "ndim", 0) != 3 or frame.shape[2] < 3:
|
|
|
raise CameraCaptureError(
|
|
|
- f"相机返回了无法识别的帧结构: shape={getattr(frame, 'shape', None)}"
|
|
|
+ f"相机返回了异常图像数据: shape={getattr(frame, 'shape', None)}"
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
- image = Image.fromarray(frame[:, :, :3])
|
|
|
-
|
|
|
- # image.save("temp.jpg")
|
|
|
+ rgb_frame = cv2.cvtColor(frame[:, :, :3], cv2.COLOR_BGR2RGB)
|
|
|
+ image = Image.fromarray(rgb_frame)
|
|
|
buffer = BytesIO()
|
|
|
- image.save(
|
|
|
- buffer,
|
|
|
- format="JPEG",
|
|
|
- quality=self._settings.jpeg_quality,
|
|
|
- )
|
|
|
+ image.save(buffer, format="JPEG", quality=self._config.jpeg_quality)
|
|
|
return buffer.getvalue()
|
|
|
except Exception as exc:
|
|
|
- raise CameraCaptureError("图片编码为 JPEG 失败。") from exc
|
|
|
+ raise CameraCaptureError("图片编码失败。") from exc
|
|
|
|
|
|
- def close(self) -> None:
|
|
|
- """
|
|
|
- 关闭相机资源。
|
|
|
+ def capture_to_file(self, output_path: Union[str, Path]) -> Path:
|
|
|
+ output_path = Path(output_path)
|
|
|
+ output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
+ output_path.write_bytes(self.capture_jpeg())
|
|
|
+ return output_path
|
|
|
|
|
|
- FastAPI 服务退出时应调用这个方法,避免相机句柄残留,
|
|
|
- 否则后续重新启动服务时可能出现资源被占用的问题。
|
|
|
- """
|
|
|
+ def close(self) -> None:
|
|
|
with self._lock:
|
|
|
- if self._camera is None:
|
|
|
+ camera = self._camera
|
|
|
+ self._camera = None
|
|
|
+
|
|
|
+ if camera is None:
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
- self._camera.stop()
|
|
|
+ camera.stop()
|
|
|
except Exception:
|
|
|
- logger.exception("停止相机时发生异常。")
|
|
|
- finally:
|
|
|
- try:
|
|
|
- self._camera.close()
|
|
|
- except Exception:
|
|
|
- logger.exception("关闭相机句柄时发生异常。")
|
|
|
- finally:
|
|
|
- self._camera = None
|
|
|
-
|
|
|
- def _ensure_camera(self) -> Any:
|
|
|
- """
|
|
|
- 延迟初始化相机。
|
|
|
- """
|
|
|
+ logger.warning("停止相机时发生异常。", exc_info=True)
|
|
|
+
|
|
|
+ try:
|
|
|
+ camera.close()
|
|
|
+ except Exception:
|
|
|
+ logger.warning("关闭相机时发生异常。", exc_info=True)
|
|
|
+
|
|
|
+ def _get_camera(self) -> Any:
|
|
|
if self._camera is not None:
|
|
|
return self._camera
|
|
|
|
|
|
- camera = None
|
|
|
-
|
|
|
try:
|
|
|
from libcamera import controls
|
|
|
from picamera2 import Picamera2
|
|
|
except ImportError as exc:
|
|
|
raise CameraUnavailableError(
|
|
|
- "当前环境缺少树莓派相机依赖,请确认已安装 picamera2 和 libcamera。"
|
|
|
+ "当前环境缺少 picamera2 或 libcamera。"
|
|
|
) from exc
|
|
|
|
|
|
+ camera = None
|
|
|
try:
|
|
|
- camera = Picamera2(camera_num=self._settings.camera_index)
|
|
|
- still_configuration = self._build_still_configuration(camera)
|
|
|
- camera.configure(still_configuration)
|
|
|
+ camera = Picamera2(camera_num=self._config.camera_index)
|
|
|
+ try:
|
|
|
+ camera.configure(
|
|
|
+ camera.create_still_configuration(
|
|
|
+ main={
|
|
|
+ "size": (self._config.width, self._config.height),
|
|
|
+ "format": self._config.frame_format,
|
|
|
+ }
|
|
|
+ )
|
|
|
+ )
|
|
|
+ except Exception:
|
|
|
+ logger.warning("相机不支持 %s,已回退默认格式。", self._config.frame_format)
|
|
|
+ camera.configure(
|
|
|
+ camera.create_still_configuration(
|
|
|
+ main={"size": (self._config.width, self._config.height)}
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
camera.start()
|
|
|
|
|
|
- self._apply_optional_controls(camera, controls)
|
|
|
- time.sleep(self._settings.warmup_seconds)
|
|
|
- except Exception as exc:
|
|
|
- if camera is not None:
|
|
|
+ if self._config.enable_continuous_autofocus:
|
|
|
try:
|
|
|
- camera.stop()
|
|
|
+ camera.set_controls({"AfMode": controls.AfModeEnum.Continuous})
|
|
|
except Exception:
|
|
|
- logger.exception("初始化失败后停止相机时发生异常。")
|
|
|
+ logger.warning("当前摄像头不支持自动对焦。", exc_info=True)
|
|
|
+
|
|
|
+ if self._config.enable_auto_white_balance:
|
|
|
try:
|
|
|
- camera.close()
|
|
|
+ camera.set_controls({"AwbMode": controls.AwbModeEnum.Auto})
|
|
|
except Exception:
|
|
|
- logger.exception("初始化失败后关闭相机句柄时发生异常。")
|
|
|
+ logger.warning("当前摄像头不支持自动白平衡。", exc_info=True)
|
|
|
|
|
|
- raise CameraUnavailableError(
|
|
|
- "树莓派相机初始化失败,请检查相机排线、驱动和权限配置。"
|
|
|
- ) from exc
|
|
|
+ time.sleep(self._config.warmup_seconds)
|
|
|
+ except Exception as exc:
|
|
|
+ if camera is not None:
|
|
|
+ try:
|
|
|
+ camera.close()
|
|
|
+ except Exception:
|
|
|
+ logger.warning("相机初始化失败后关闭资源时发生异常。", exc_info=True)
|
|
|
+ raise CameraUnavailableError("树莓派相机初始化失败。") from exc
|
|
|
|
|
|
self._camera = camera
|
|
|
return camera
|
|
|
|
|
|
- def _build_still_configuration(self, camera: Any) -> Any:
|
|
|
- """
|
|
|
- 创建静态拍照配置。
|
|
|
-
|
|
|
- 默认优先尝试使用 `RGB888`,这样返回的数组通常就是 3 通道 RGB,
|
|
|
- 后续编码 JPEG 会更直接。
|
|
|
- 如果目标设备不支持指定格式,则自动回退到默认配置。
|
|
|
- """
|
|
|
- main_config = {
|
|
|
- "size": (self._settings.width, self._settings.height),
|
|
|
- }
|
|
|
-
|
|
|
- if self._settings.frame_format:
|
|
|
- main_config["format"] = self._settings.frame_format
|
|
|
-
|
|
|
- try:
|
|
|
- return camera.create_still_configuration(main=main_config)
|
|
|
- except Exception:
|
|
|
- if "format" not in main_config:
|
|
|
- raise
|
|
|
-
|
|
|
- logger.warning(
|
|
|
- "相机不支持指定的帧格式 %s,已回退到默认格式。",
|
|
|
- self._settings.frame_format,
|
|
|
- )
|
|
|
- main_config.pop("format")
|
|
|
- return camera.create_still_configuration(main=main_config)
|
|
|
-
|
|
|
- def _apply_optional_controls(self, camera: Any, controls: Any) -> None:
|
|
|
- """
|
|
|
- 应用可选相机参数。
|
|
|
-
|
|
|
- 对焦和白平衡不一定所有摄像头都支持,所以这里采用“尽力而为”的策略:
|
|
|
- 支持就启用,不支持则记录日志,但不阻断整个服务启动。
|
|
|
- """
|
|
|
- if self._settings.enable_continuous_autofocus:
|
|
|
- try:
|
|
|
- camera.set_controls({"AfMode": controls.AfModeEnum.Continuous})
|
|
|
- except Exception:
|
|
|
- logger.warning("当前摄像头不支持连续自动对焦,已跳过。", exc_info=True)
|
|
|
-
|
|
|
- if self._settings.enable_auto_white_balance:
|
|
|
- try:
|
|
|
- camera.set_controls({"AwbMode": controls.AwbModeEnum.Auto})
|
|
|
- except Exception:
|
|
|
- logger.warning("当前摄像头不支持自动白平衡,已跳过。", exc_info=True)
|
|
|
-
|
|
|
|
|
|
_camera_service: Optional[CameraService] = None
|
|
|
_camera_service_lock = threading.Lock()
|
|
|
|
|
|
|
|
|
def get_camera_service() -> CameraService:
|
|
|
- """
|
|
|
- 返回全局单例相机服务。
|
|
|
-
|
|
|
- 相机通常只需要维护一个共享实例,否则多个请求反复创建/关闭相机,
|
|
|
- 会明显增加拍照延迟,也更容易触发硬件占用问题。
|
|
|
- """
|
|
|
global _camera_service
|
|
|
|
|
|
if _camera_service is not None:
|