AnlaAnla před 3 týdny
revize
9b965a02de

+ 1 - 0
.env

@@ -0,0 +1 @@
+APP_TITLE="Raspberry Pi Camera API 测试"

+ 10 - 0
.idea/.gitignore

@@ -0,0 +1,10 @@
+# 默认忽略的文件
+/shelf/
+/workspace.xml
+# 已忽略包含查询文件的默认文件夹
+/queries/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
+# 基于编辑器的 HTTP 客户端请求
+/httpRequests/

+ 20 - 0
.idea/inspectionProfiles/Project_Default.xml

@@ -0,0 +1,20 @@
+<component name="InspectionProjectProfileManager">
+  <profile version="1.0">
+    <option name="myName" value="Project Default" />
+    <inspection_tool class="DuplicatedCode" enabled="true" level="WEAK WARNING" enabled_by_default="true">
+      <Languages>
+        <language minSize="102" name="Python" />
+      </Languages>
+    </inspection_tool>
+    <inspection_tool class="Eslint" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
+      <option name="ignoredErrors">
+        <list>
+          <option value="N803" />
+          <option value="N802" />
+          <option value="N806" />
+        </list>
+      </option>
+    </inspection_tool>
+  </profile>
+</component>

+ 6 - 0
.idea/inspectionProfiles/profiles_settings.xml

@@ -0,0 +1,6 @@
+<component name="InspectionProjectProfileManager">
+  <settings>
+    <option name="USE_PROJECT_PROFILE" value="false" />
+    <version value="1.0" />
+  </settings>
+</component>

+ 7 - 0
.idea/misc.xml

@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="Black">
+    <option name="sdkName" value="C:\Code\Miniconda3\envs\pytorch" />
+  </component>
+  <component name="ProjectRootManager" version="2" project-jdk-name="Remote 3.9 (sftp://pi@192.168.77.108:22/usr/bin/python3)" project-jdk-type="Python SDK" />
+</project>

+ 8 - 0
.idea/modules.xml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectModuleManager">
+    <modules>
+      <module fileurl="file://$PROJECT_DIR$/.idea/raspi_cam_arm.iml" filepath="$PROJECT_DIR$/.idea/raspi_cam_arm.iml" />
+    </modules>
+  </component>
+</project>

+ 8 - 0
.idea/raspi_cam_arm.iml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="PYTHON_MODULE" version="4">
+  <component name="NewModuleRootManager">
+    <content url="file://$MODULE_DIR$" />
+    <orderEntry type="jdk" jdkName="Remote 3.9 (sftp://pi@192.168.77.108:22/usr/bin/python3)" jdkType="Python SDK" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+</module>

+ 6 - 0
.idea/vcs.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+  </component>
+</project>

+ 35 - 0
Test/test01.py

@@ -0,0 +1,35 @@
+import picamera2
+from libcamera import controls
+from PIL import Image
+import cv2
+import time
+
+picam2 = picamera2.Picamera2()
+picam2.configure(picam2.create_still_configuration(
+    main={"size": (2048, 2048)}
+    # main={"size": (1800, 1800)}
+))
+picam2.start()  # 开始相机流
+# 设置自动对焦,如果摄像头支持
+try:
+    picam2.set_controls({"AfMode": controls.AfModeEnum.Continuous})
+except Exception as e:
+    print('该摄像头无法自动对焦, 请手动对焦: {}'.format(e))
+
+# 设置自动白平衡
+try:
+    picam2.set_controls({"AwbMode": controls.AwbModeEnum.Auto})  # 自动白平衡
+except Exception as e:
+    print('设置自动白平衡失败: {}'.format(e))
+
+time.sleep(0.1)  # 等待相机初始化
+
+frame = picam2.capture_array()  # 捕获图像
+
+# 打印图像信息
+print(frame)
+print(frame.shape)  # 这里使用 shape 来查看图像的尺寸
+
+frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB)
+# 保存图片查看结果
+cv2.imwrite("test.jpg", frame)

+ 4 - 0
Test/test02.py

@@ -0,0 +1,4 @@
+import os
+
+t = os.getenv("APP_TITLE", "Raspberry Pi Camera API")
+print(t)

+ 0 - 0
app/__init__.py


+ 8 - 0
app/api/__init__.py

@@ -0,0 +1,8 @@
+from fastapi import APIRouter
+
+from app.api.camera import router as camera_router
+
+api_router = APIRouter()
+api_router.include_router(camera_router)
+
+__all__ = ["api_router"]

+ 47 - 0
app/api/camera.py

@@ -0,0 +1,47 @@
+from datetime import datetime
+
+from fastapi import APIRouter, HTTPException, Query, Response
+
+from app.services import (
+    CameraCaptureError,
+    CameraUnavailableError,
+    get_camera_service,
+)
+
+router = APIRouter(
+    prefix="/camera",
+    tags=["camera"],
+)
+
+
+@router.api_route("/capture", methods=["GET"], summary="拍照并直接返回图片")
+def capture_image(
+        download: bool = Query(
+            default=False,
+            description="是否以附件下载方式返回图片。默认 False,浏览器会直接预览。",
+        ),
+) -> Response:
+    """
+    拍照接口。
+
+    调用这个接口时,服务会即时触发树莓派相机拍照,
+    然后把最新拍到的图片直接作为 HTTP 响应返回。
+    """
+    camera_service = get_camera_service()
+
+    try:
+        image_bytes = camera_service.capture_jpeg()
+    except (CameraUnavailableError, CameraCaptureError) as exc:
+        raise HTTPException(status_code=503, detail=str(exc)) from exc
+
+    filename = f"capture_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
+    disposition = "attachment" if download else "inline"
+
+    return Response(
+        content=image_bytes,
+        media_type="image/jpeg",
+        headers={
+            "Content-Disposition": f'{disposition}; filename="{filename}"',
+            "Cache-Control": "no-store",
+        },
+    )

+ 0 - 0
app/core/__init__.py


+ 124 - 0
app/core/config.py

@@ -0,0 +1,124 @@
+import os
+from dataclasses import dataclass
+from functools import lru_cache
+from dotenv import load_dotenv
+
+load_dotenv()
+
+
+def _get_bool(name: str, default: bool) -> bool:
+    value = os.getenv(name)
+    if value is None:
+        return default
+
+    return value.strip().lower() in {"1", "true", "yes", "on"}
+
+
+def _get_int(name: str, default: int) -> int:
+    """从环境变量中读取整数配置。"""
+    value = os.getenv(name)
+    if value is None:
+        return default
+
+    try:
+        return int(value)
+    except ValueError:
+        return default
+
+
+def _get_float(name: str, default: float) -> float:
+    """从环境变量中读取浮点数配置。"""
+    value = os.getenv(name)
+    if value is None:
+        return default
+
+    try:
+        return float(value)
+    except ValueError:
+        return default
+
+
+@dataclass(frozen=True)
+class AppSettings:
+    """
+    FastAPI 服务本身的运行配置。
+    """
+
+    title: str
+    version: str
+    description: str
+    host: str
+    port: int
+
+    @classmethod
+    def from_env(cls) -> "AppSettings":
+        return cls(
+            title=os.getenv("APP_TITLE", "Raspberry Pi Camera API"),
+            version=os.getenv("APP_VERSION", "0.1.0"),
+            description=os.getenv(
+                "APP_DESCRIPTION",
+                "用于树莓派相机抓拍的 FastAPI 服务。",
+            ),
+            host=os.getenv("APP_HOST", "0.0.0.0"),
+            port=_get_int("APP_PORT", 8002)
+        )
+
+
+@dataclass(frozen=True)
+class CameraSettings:
+    """
+    树莓派相机的集中配置。
+    """
+
+    camera_index: int
+    width: int
+    height: int
+    frame_format: str
+    jpeg_quality: int
+    warmup_seconds: float
+    enable_continuous_autofocus: bool
+    enable_auto_white_balance: bool
+
+    @classmethod
+    def from_env(cls) -> "CameraSettings":
+        return cls(
+            camera_index=_get_int("CAMERA_INDEX", 0),
+            width=_get_int("CAMERA_WIDTH", 2048),
+            height=_get_int("CAMERA_HEIGHT", 2048),
+            frame_format=os.getenv("CAMERA_FRAME_FORMAT", "RGB888"),
+            jpeg_quality=_get_int("CAMERA_JPEG_QUALITY", 95),
+            warmup_seconds=_get_float("CAMERA_WARMUP_SECONDS", 0.05),
+            enable_continuous_autofocus=_get_bool(
+                "CAMERA_ENABLE_CONTINUOUS_AUTOFOCUS",
+                True,
+            ),
+            enable_auto_white_balance=_get_bool(
+                "CAMERA_ENABLE_AUTO_WHITE_BALANCE",
+                True,
+            ),
+        )
+
+
+@dataclass(frozen=True)
+class Settings:
+    """项目总配置入口,统一汇总服务配置与相机配置。"""
+
+    app: AppSettings
+    camera: CameraSettings
+
+
+@lru_cache(maxsize=1)
+def get_settings() -> Settings:
+    """
+    返回单例配置对象。
+
+    使用缓存的原因是:
+    1. 避免每次请求都重复读取环境变量。
+    2. 让整个项目里的配置来源保持一致。
+    """
+    return Settings(
+        app=AppSettings.from_env(),
+        camera=CameraSettings.from_env(),
+    )
+
+settings = get_settings()

+ 77 - 0
app/main.py

@@ -0,0 +1,77 @@
+import logging
+from contextlib import asynccontextmanager
+
+from fastapi import FastAPI
+
+from app.api import api_router
+from app.services import CameraUnavailableError, get_camera_service
+from app.core.config import settings
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+@asynccontextmanager
+async def lifespan(_: FastAPI):
+    """
+    FastAPI 生命周期管理。
+
+    启动阶段会尝试预热相机,这样第一次请求拍照时更快。
+    但这里不会因为相机异常而阻止整个服务启动,
+    因为有时你可能还需要先看接口文档、做健康检查或排查环境。
+    """
+    camera_service = get_camera_service()
+
+    try:
+        camera_service.initialize()
+        logger.info("树莓派相机预热完成。")
+    except CameraUnavailableError as exc:
+        logger.warning("相机预热失败,拍照接口暂时不可用:%s", exc)
+
+    try:
+        yield
+    finally:
+        camera_service.close()
+        logger.info("相机资源已关闭。")
+
+
+app = FastAPI(
+    title=settings.app.title,
+    version=settings.app.version,
+    description=settings.app.description,
+    lifespan=lifespan,
+)
+
+app.include_router(api_router, prefix="/api")
+
+
+@app.get("/", summary="服务入口")
+def read_root() -> dict[str, str]:
+    """
+    服务入口说明。
+
+    这里返回最基础的服务信息,方便你确认服务已启动,
+    也方便后续再逐步加入更多模块说明。
+    """
+    return {
+        "message": "Raspberry Pi Camera API is running.",
+        "docs": "/docs",
+    }
+
+
+@app.get("/health", summary="健康检查")
+def health_check() -> dict[str, object]:
+    """
+    健康检查接口。
+
+    这个接口不强依赖相机真实可用,只返回服务当前状态和相机是否已初始化。
+    在部署脚本、容器探针或反向代理检查中都比较实用。
+    """
+    camera_service = get_camera_service()
+
+    return {
+        "status": "ok",
+        "camera_initialized": camera_service.is_initialized,
+        "app_name": settings.app.title,
+        "app_version": settings.app.version,
+    }

+ 13 - 0
app/services/__init__.py

@@ -0,0 +1,13 @@
+from app.services.camera_service import (
+    CameraCaptureError,
+    CameraService,
+    CameraUnavailableError,
+    get_camera_service,
+)
+
+__all__ = [
+    "CameraCaptureError",
+    "CameraService",
+    "CameraUnavailableError",
+    "get_camera_service",
+]

+ 244 - 0
app/services/camera_service.py

@@ -0,0 +1,244 @@
+import logging
+import threading
+import time
+from io import BytesIO
+from typing import Any, Optional
+
+import cv2
+from PIL import Image
+
+from app.core.config import CameraSettings, settings
+
+logger = logging.getLogger(__name__)
+
+
+class CameraUnavailableError(RuntimeError):
+    """相机不可用,例如依赖缺失或硬件初始化失败。"""
+
+
+class CameraCaptureError(RuntimeError):
+    """相机抓拍失败,例如采集帧失败或图像编码失败。"""
+
+
+class CameraService:
+    """
+    树莓派相机服务。
+
+    设计上把硬件访问封装到这个类里,而不是直接写在接口函数中,
+    有几个明显好处:
+    1. 路由层只关心 HTTP 请求和响应,职责更清晰。
+    2. 以后如果要增加录像、参数调节、图片落盘等功能,可以继续在这里扩展。
+    3. 相机对象是重量级资源,做成服务之后更容易复用和统一关闭。
+    """
+
+    def __init__(self, settings: CameraSettings) -> None:
+        self._settings = settings
+        self._lock = threading.RLock()
+        self._camera: Optional[Any] = None
+
+    @property
+    def is_initialized(self) -> bool:
+        """用于健康检查,判断相机对象是否已经初始化完成。"""
+        return self._camera is not None
+
+    def initialize(self) -> None:
+        """
+        显式初始化相机。
+
+        这个方法适合在 FastAPI 启动阶段预热相机,
+        也适合你以后做手动重连、后台巡检时复用。
+        """
+        with self._lock:
+            self._ensure_camera()
+
+    def capture_jpeg(self) -> bytes:
+        """
+        抓拍一张图片,并直接返回 JPEG 二进制内容。
+
+        接口层拿到这个字节流之后,就可以直接通过 HTTP 返回
+        """
+        with self._lock:
+            camera = self._ensure_camera()
+
+            try:
+                frame = camera.capture_array()
+            except Exception as exc:
+                raise CameraCaptureError("相机抓拍失败,请检查相机连接和占用状态。") from exc
+
+            rgb_frame = self._normalize_frame(frame)
+
+            try:
+                image = Image.fromarray(rgb_frame)
+                buffer = BytesIO()
+                image.save(
+                    buffer,
+                    format="JPEG",
+                    quality=self._settings.jpeg_quality,
+                )
+                return buffer.getvalue()
+            except Exception as exc:
+                raise CameraCaptureError("图片编码为 JPEG 失败。") from exc
+
+    def close(self) -> None:
+        """
+        关闭相机资源。
+
+        FastAPI 服务退出时应调用这个方法,避免相机句柄残留,
+        否则后续重新启动服务时可能出现资源被占用的问题。
+        """
+        with self._lock:
+            if self._camera is None:
+                return
+
+            try:
+                self._camera.stop()
+            except Exception:
+                logger.exception("停止相机时发生异常。")
+            finally:
+                try:
+                    self._camera.close()
+                except Exception:
+                    logger.exception("关闭相机句柄时发生异常。")
+                finally:
+                    self._camera = None
+
+    def _ensure_camera(self) -> Any:
+        """
+        延迟初始化相机。
+        """
+        if self._camera is not None:
+            return self._camera
+
+        camera = None
+
+        try:
+            from libcamera import controls
+            from picamera2 import Picamera2
+        except ImportError as exc:
+            raise CameraUnavailableError(
+                "当前环境缺少树莓派相机依赖,请确认已安装 picamera2 和 libcamera。"
+            ) from exc
+
+        try:
+            camera = Picamera2(camera_num=self._settings.camera_index)
+            still_configuration = self._build_still_configuration(camera)
+            camera.configure(still_configuration)
+            camera.start()
+
+            self._apply_optional_controls(camera, controls)
+            time.sleep(self._settings.warmup_seconds)
+        except Exception as exc:
+            if camera is not None:
+                try:
+                    camera.stop()
+                except Exception:
+                    logger.exception("初始化失败后停止相机时发生异常。")
+                try:
+                    camera.close()
+                except Exception:
+                    logger.exception("初始化失败后关闭相机句柄时发生异常。")
+
+            raise CameraUnavailableError(
+                "树莓派相机初始化失败,请检查相机排线、驱动和权限配置。"
+            ) from exc
+
+        self._camera = camera
+        return camera
+
+    def _build_still_configuration(self, camera: Any) -> Any:
+        """
+        创建静态拍照配置。
+
+        默认优先尝试使用 `RGB888`,这样返回的数组通常就是 3 通道 RGB,
+        后续编码 JPEG 会更直接。
+        如果目标设备不支持指定格式,则自动回退到默认配置。
+        """
+        main_config = {
+            "size": (self._settings.width, self._settings.height),
+        }
+
+        if self._settings.frame_format:
+            main_config["format"] = self._settings.frame_format
+
+        try:
+            return camera.create_still_configuration(main=main_config)
+        except Exception:
+            if "format" not in main_config:
+                raise
+
+            logger.warning(
+                "相机不支持指定的帧格式 %s,已回退到默认格式。",
+                self._settings.frame_format,
+            )
+            main_config.pop("format")
+            return camera.create_still_configuration(main=main_config)
+
+    def _apply_optional_controls(self, camera: Any, controls: Any) -> None:
+        """
+        应用可选相机参数。
+
+        对焦和白平衡不一定所有摄像头都支持,所以这里采用“尽力而为”的策略:
+        支持就启用,不支持则记录日志,但不阻断整个服务启动。
+        """
+        if self._settings.enable_continuous_autofocus:
+            try:
+                camera.set_controls({"AfMode": controls.AfModeEnum.Continuous})
+            except Exception:
+                logger.warning("当前摄像头不支持连续自动对焦,已跳过。", exc_info=True)
+
+        if self._settings.enable_auto_white_balance:
+            try:
+                camera.set_controls({"AwbMode": controls.AwbModeEnum.Auto})
+            except Exception:
+                logger.warning("当前摄像头不支持自动白平衡,已跳过。", exc_info=True)
+
+    def _normalize_frame(self, frame: Any) -> Any:
+        """
+        统一处理相机返回的帧格式,最终转换成适合 JPEG 编码的 RGB 数据。
+
+        `Picamera2` 在不同配置下,返回的数组通道数可能不同:
+        - 4 通道:常见于 RGBA
+        - 3 通道:通常是 RGB
+        - 1 通道:灰度图
+        """
+        if frame is None:
+            raise CameraCaptureError("相机返回了空帧。")
+
+        if getattr(frame, "ndim", 0) == 2:
+            return cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
+
+        if getattr(frame, "ndim", 0) != 3:
+            raise CameraCaptureError("相机返回了无法识别的帧结构。")
+
+        channel_count = frame.shape[2]
+        if channel_count == 4:
+            return cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB)
+        if channel_count == 3:
+            return frame
+        if channel_count == 1:
+            return cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
+
+        raise CameraCaptureError(f"暂不支持 {channel_count} 通道的图像数据。")
+
+
+_camera_service: Optional[CameraService] = None
+_camera_service_lock = threading.Lock()
+
+
+def get_camera_service() -> CameraService:
+    """
+    返回全局单例相机服务。
+
+    相机通常只需要维护一个共享实例,否则多个请求反复创建/关闭相机,
+    会明显增加拍照延迟,也更容易触发硬件占用问题。
+    """
+    global _camera_service
+
+    if _camera_service is not None:
+        return _camera_service
+
+    with _camera_service_lock:
+        if _camera_service is None:
+            _camera_service = CameraService(settings.camera)
+
+    return _camera_service

+ 25 - 0
run_raspi_cam_arm.py

@@ -0,0 +1,25 @@
+import uvicorn
+from app.core.config import settings
+import socket
+
+
+def get_host_ip():
+    """
+    查询本机ip地址
+    """
+    try:
+        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        s.connect(('8.8.8.8', 80))
+        ip = s.getsockname()[0]
+    except Exception:
+        # 如果没有网络,回退到 host name
+        ip = socket.gethostbyname(socket.gethostname())
+    finally:
+        s.close()
+    return ip
+
+if __name__ == '__main__':
+    ip = get_host_ip()
+    print(f" Docs available at: http://{ip}:{settings.app.port}/docs")
+
+    uvicorn.run("app.main:app", host=settings.app.host, port=settings.app.port, reload=True)