Browse Source

ffmpeg视频截取

AnlaAnla 3 days ago
parent
commit
a4ab50ecc4
8 changed files with 633 additions and 87 deletions
  1. 54 0
      Test/minio测试.py
  2. 345 75
      app/api/camera.py
  3. 45 4
      app/core/config.py
  4. 13 0
      app/core/logging_config.py
  5. 11 5
      app/main.py
  6. 3 0
      app/services/__init__.py
  7. 151 0
      app/services/storage.py
  8. 11 3
      run_MonitorCameraServer.py

+ 54 - 0
Test/minio测试.py

@@ -0,0 +1,54 @@
+from minio import Minio
+import os
+
+
+MINIO_ENDPOINT = "192.168.77.249:9000"
+MINIO_ACCESS_KEY = "pZEwCGnpNN05KPnmC2Yh"
+MINIO_SECRET_KEY = "KfJRuWiv9pVxhIMcFqbkv8hZT9SnNTZ6LPx592D4"  # 替换为你的 Secret Key
+MINIO_SECURE = False  # 是否使用 https
+MINIO_BUCKET = "grading"
+MINIO_BASE_PREFIX = "monitor_video_data"
+
+DATA_HOST_URL = f"http://{MINIO_ENDPOINT}/{MINIO_BUCKET}/{MINIO_BASE_PREFIX}"
+
+
+minio_client = Minio(
+    MINIO_ENDPOINT,
+    access_key=MINIO_ACCESS_KEY,
+    secret_key=MINIO_SECRET_KEY,
+    secure=MINIO_SECURE
+)
+
+
+def get_full_url(path: str) -> str:
+    """将相对路径转换为可以直接打开的 MinIO 绝对 URL"""
+    if not path:
+        return path
+    if str(path).startswith("http"):
+        return path
+    # 移除开头的斜杠防止双斜杠 (如: /Data/xxx -> Data/xxx)
+    clean_path = str(path).lstrip("/\\")
+    return f"{DATA_HOST_URL}/{clean_path}"
+
+print(get_full_url("rsp_test.jpg"))
+
+try:
+    # 2. 检查存储桶是否存在
+    found = minio_client.bucket_exists(MINIO_BUCKET)
+    if not found:
+        minio_client.make_bucket(MINIO_BUCKET)
+        print(f"存储桶 {MINIO_BUCKET} 已创建")
+    else:
+        print(f"存储桶 {MINIO_BUCKET} 已存在")
+
+    object_name = "img_test.jpg"
+    img_path = "./31_front_coaxial_1_0.jpg"
+    cloud_path = os.path.join(MINIO_BASE_PREFIX, object_name).replace("\\", "/")
+
+    minio_client.fput_object(MINIO_BUCKET, cloud_path, img_path, content_type="image/jpeg")
+    print(f"成功上传 {object_name} 到 {MINIO_BUCKET}")
+
+
+
+except Exception as e:
+    print(f"发生错误: {e}")

+ 345 - 75
app/api/camera.py

@@ -1,21 +1,29 @@
-import cv2
+import logging
+import os
+import shutil
+import subprocess
 import time
 import time
+from concurrent.futures import ThreadPoolExecutor
 from datetime import datetime
 from datetime import datetime
-import os
-import uuid
+from threading import Lock, Timer
+from typing import Any, Dict
+
 from fastapi import APIRouter, HTTPException
 from fastapi import APIRouter, HTTPException
 from pydantic import BaseModel
 from pydantic import BaseModel
-from concurrent.futures import ThreadPoolExecutor
-from threading import Lock, Timer
+
 from app.core.config import settings
 from app.core.config import settings
+from app.services.storage import get_storage_service, sanitize_name
 
 
-# -------------------- 配置 --------------------
+logger = logging.getLogger(__name__)
 
 
 executor = ThreadPoolExecutor(max_workers=4)
 executor = ThreadPoolExecutor(max_workers=4)
 lock = Lock()
 lock = Lock()
 
 
-# 摄像头任务状态
-cam_status = {}
+# 摄像头任务状态。当前按摄像头维度维护最新一条任务。
+cam_status: Dict[str, Dict[str, Any]] = {}
+# 任务索引。按 camera_id + task_name 保存最近一条同名任务状态,便于外部服务查询结果。
+task_status_index: Dict[tuple[str, str], Dict[str, Any]] = {}
+active_statuses = {"starting", "recording", "stopping", "uploading"}
 
 
 router = APIRouter()
 router = APIRouter()
 
 
@@ -25,87 +33,319 @@ class TaskRequest(BaseModel):
     task_name: str
     task_name: str
 
 
 
 
-# -------------------- 录像函数 --------------------
-def record_camera(camera_id: str, task_name: str, output_file: str):
-    rtsp_url = settings.CAMERA_CONFIG[camera_id]
-    cap = cv2.VideoCapture(rtsp_url)
-    cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)  # 减少延迟
+def _build_local_file(camera_id: str, task_name: str, started_at: datetime) -> str:
+    """生成本地录像文件路径。"""
+    safe_task_name = sanitize_name(task_name)
+    timestamp = started_at.strftime("%Y-%m-%d_%H-%M-%S")
+    file_name = f"{sanitize_name(camera_id)}_{safe_task_name}_{timestamp}.mp4"
+    return os.path.join(settings.OUTPUT_DIR, file_name)
+
+
+def _build_response(task_status: Dict[str, Any]) -> Dict[str, Any]:
+    """过滤内部字段,构造接口返回内容。"""
+    response = {
+        "status": task_status.get("status"),
+        "camera_id": task_status.get("camera_id"),
+        "task_name": task_status.get("task_name"),
+        "start_time": task_status.get("start_time"),
+        "storage_type": task_status.get("storage_type"),
+        "local_file": task_status.get("local_file"),
+        "bucket": task_status.get("bucket"),
+        "object_name": task_status.get("object_name"),
+        "download_url": task_status.get("download_url"),
+        "stop_reason": task_status.get("stop_reason"),
+        "error": task_status.get("error"),
+        "local_file_deleted": task_status.get("local_file_deleted"),
+    }
+    return {key: value for key, value in response.items() if value is not None}
+
+
+def _build_task_key(camera_id: str, task_name: str) -> tuple[str, str]:
+    """生成任务索引键。"""
+    return camera_id, task_name
+
+
+def _get_ffmpeg_executable() -> str:
+    """解析 FFmpeg 可执行文件路径。"""
+    configured_path = settings.FFMPEG_PATH.strip()
+    if not configured_path:
+        raise RuntimeError("FFMPEG_PATH 未配置")
+
+    if os.path.isfile(configured_path):
+        return configured_path
 
 
-    if not cap.isOpened():
-        raise RuntimeError(f"无法连接摄像头 {camera_id}")
+    resolved_path = shutil.which(configured_path)
+    if resolved_path:
+        return resolved_path
 
 
-    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
-    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
-    fps = cap.get(cv2.CAP_PROP_FPS)
-    if fps <= 0: fps = 25
+    raise RuntimeError(
+        f"未找到 FFmpeg 可执行文件,请安装 ffmpeg 或配置 FFMPEG_PATH。当前值: {configured_path}"
+    )
 
 
-    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
-    out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height))
 
 
-    print(f"[{camera_id}] 任务 {task_name} 开始录制...")
+def _build_ffmpeg_command(rtsp_url: str, local_file: str) -> list[str]:
+    """构造 FFmpeg 录像命令。"""
+    ffmpeg_path = _get_ffmpeg_executable()
+    return [
+        ffmpeg_path,
+        "-y",
+        "-hide_banner",
+        "-loglevel",
+        settings.FFMPEG_LOGLEVEL,
+        "-rtsp_transport",
+        settings.FFMPEG_RTSP_TRANSPORT,
+        "-i",
+        rtsp_url,
+        "-an",
+        "-c:v",
+        "libx264",
+        "-preset",
+        settings.FFMPEG_PRESET,
+        "-pix_fmt",
+        "yuv420p",
+        "-vf",
+        "scale=trunc(iw/2)*2:trunc(ih/2)*2",
+        "-movflags",
+        "+faststart",
+        local_file,
+    ]
 
 
+
+def _set_stop_flag(camera_id: str, stop_reason: str) -> Dict[str, Any] | None:
+    """将任务标记为停止中,由后台线程自行收尾。"""
+    with lock:
+        task_status = cam_status.get(camera_id)
+        if not task_status:
+            return None
+
+        if task_status.get("status") in {"uploaded", "completed", "failed"}:
+            return task_status
+
+        task_status["stop_flag"] = True
+        task_status["stop_reason"] = stop_reason
+        if task_status.get("status") != "uploading":
+            task_status["status"] = "stopping"
+
+        timer = task_status.get("timer")
+        if timer:
+            timer.cancel()
+
+        return task_status
+
+
+def stop_task_internal(camera_id: str) -> None:
+    """超时后自动停止任务。"""
+    task_status = _set_stop_flag(camera_id, "达到最大录制时长,系统已自动停止任务")
+    if task_status:
+        logger.info(
+            "摄像头任务超时,已请求停止。摄像头: %s,任务: %s",
+            camera_id,
+            task_status.get("task_name"),
+        )
+
+
+def record_camera(camera_id: str, task_name: str, local_file: str) -> int:
+    """使用 FFmpeg 将摄像头视频流录制为浏览器兼容的 mp4 文件。"""
+    rtsp_url = settings.CAMERA_CONFIG[camera_id]
+    command = _build_ffmpeg_command(rtsp_url, local_file)
+
+    logger.info(
+        "开始使用 FFmpeg 录制摄像头视频。摄像头: %s,任务: %s,输出文件: %s,FFmpeg: %s",
+        camera_id,
+        task_name,
+        local_file,
+        command[0],
+    )
+
+    creation_flags = getattr(subprocess, "CREATE_NO_WINDOW", 0)
+    process = subprocess.Popen(
+        command,
+        stdin=subprocess.PIPE,
+        stdout=subprocess.DEVNULL,
+        stderr=subprocess.PIPE,
+        text=True,
+        encoding="utf-8",
+        errors="ignore",
+        creationflags=creation_flags,
+    )
+
+    stderr_output = ""
     try:
     try:
         while True:
         while True:
-            ret, frame = cap.read()
-            if not ret:
-                print(f"[{camera_id}] 丢帧,停止录像")
+            return_code = process.poll()
+            if return_code is not None:
                 break
                 break
-            out.write(frame)
 
 
-            # 检查任务是否被标记停止
             with lock:
             with lock:
-                status = cam_status.get(camera_id)
-                if not status or status.get("stop_flag"):
+                task_status = cam_status.get(camera_id)
+                if not task_status or task_status.get("stop_flag"):
+                    logger.info("检测到停止标记,准备优雅停止 FFmpeg。摄像头: %s,任务: %s", camera_id, task_name)
+                    if process.stdin and not process.stdin.closed:
+                        try:
+                            process.stdin.write("q\n")
+                            process.stdin.flush()
+                        except OSError:
+                            logger.info("FFmpeg 标准输入已关闭,跳过优雅停止写入。摄像头: %s,任务: %s", camera_id, task_name)
                     break
                     break
-            time.sleep(0.001)
+
+            time.sleep(0.2)
     finally:
     finally:
-        cap.release()
-        out.release()
-        print(f"[{camera_id}] 任务 {task_name} 完成,文件: {output_file}")
+        try:
+            stderr_output = process.communicate(timeout=15)[1] or ""
+        except subprocess.TimeoutExpired:
+            logger.warning("FFmpeg 未能在预期时间内退出,准备强制结束。摄像头: %s,任务: %s", camera_id, task_name)
+            process.kill()
+            stderr_output = process.communicate(timeout=5)[1] or ""
 
 
+        logger.info(
+            "FFmpeg 录像进程已结束。摄像头: %s,任务: %s,退出码: %s,本地文件: %s",
+            camera_id,
+            task_name,
+            process.returncode,
+            local_file,
+        )
 
 
-def stop_task_internal(camera_id: str):
-    with lock:
-        status = cam_status.get(camera_id)
-        if not status:
-            return
-        status["stop_flag"] = True
-        status["timer"].cancel()
+    if process.returncode != 0:
+        error_message = stderr_output.strip() or f"FFmpeg 退出码异常: {process.returncode}"
+        raise RuntimeError(f"FFmpeg 录像失败: {error_message}")
+
+    if not os.path.exists(local_file) or os.path.getsize(local_file) <= 0:
+        raise RuntimeError(f"FFmpeg 未生成有效录像文件: {local_file}")
+
+    file_size = os.path.getsize(local_file)
+    logger.info(
+        "FFmpeg 已生成浏览器兼容的 mp4 文件。摄像头: %s,任务: %s,文件大小: %s 字节",
+        camera_id,
+        task_name,
+        file_size,
+    )
+    return file_size
+
+
+def run_recording_task(camera_id: str, task_name: str, local_file: str, started_at: datetime) -> None:
+    """后台执行录像和 MinIO 上传。"""
+    try:
+        with lock:
+            task_status = cam_status.get(camera_id)
+            if task_status:
+                task_status["status"] = "recording"
+
+        record_camera(camera_id, task_name, local_file)
+
+        with lock:
+            task_status = cam_status.get(camera_id)
+            if task_status:
+                task_status["status"] = "uploading" if settings.STORAGE_TYPE == "minio" else "completed"
+
+        storage_result = get_storage_service().save_recording(camera_id, task_name, local_file, started_at)
+
+        with lock:
+            task_status = cam_status.get(camera_id)
+            if task_status:
+                task_status.update(
+                    {
+                        "status": "uploaded" if storage_result.storage_type == "minio" else "completed",
+                        "storage_type": storage_result.storage_type,
+                        "bucket": storage_result.bucket,
+                        "object_name": storage_result.object_name,
+                        "download_url": storage_result.download_url,
+                        "local_file_deleted": storage_result.local_file_deleted,
+                        "error": None,
+                    }
+                )
+                task_status_index[_build_task_key(camera_id, task_name)] = task_status
+
+        logger.info(
+            "任务处理完成。摄像头: %s,任务: %s,最终状态: %s",
+            camera_id,
+            task_name,
+            "uploaded" if storage_result.storage_type == "minio" else "completed",
+        )
+    except Exception as exc:
+        logger.exception("摄像头任务执行失败。摄像头: %s,任务: %s,错误: %s", camera_id, task_name, exc)
+        with lock:
+            task_status = cam_status.get(camera_id)
+            if task_status:
+                task_status["status"] = "failed"
+                task_status["error"] = str(exc)
+                task_status_index[_build_task_key(camera_id, task_name)] = task_status
+    finally:
+        with lock:
+            task_status = cam_status.get(camera_id)
+            if task_status and task_status.get("timer"):
+                task_status["timer"].cancel()
 
 
 
 
-# -------------------- API接口 --------------------
 @router.post("/start_task")
 @router.post("/start_task")
 def start_task(req: TaskRequest):
 def start_task(req: TaskRequest):
     camera_id = req.camera_id
     camera_id = req.camera_id
     task_name = req.task_name
     task_name = req.task_name
 
 
+    logger.info("收到启动录像请求。摄像头: %s,任务: %s", camera_id, task_name)
+
     if camera_id not in settings.CAMERA_CONFIG:
     if camera_id not in settings.CAMERA_CONFIG:
         raise HTTPException(status_code=404, detail=f"摄像头 {camera_id} 未配置")
         raise HTTPException(status_code=404, detail=f"摄像头 {camera_id} 未配置")
 
 
-    with lock:
-        if camera_id in cam_status:
-            return {"status": "running", "task_name": cam_status[camera_id]["task_name"]}
+    try:
+        ffmpeg_executable = _get_ffmpeg_executable()
+    except RuntimeError as exc:
+        logger.error("启动录像前检查 FFmpeg 失败: %s", exc)
+        raise HTTPException(status_code=500, detail=str(exc))
 
 
-        # 输出文件名
-        now_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
-        filename = f"{camera_id}_{task_name}_{now_time}.mp4"
-        output_file = os.path.join(settings.OUTPUT_DIR, filename)
+    started_at = datetime.now()
+    local_file = _build_local_file(camera_id, task_name, started_at)
+
+    with lock:
+        existing_status = cam_status.get(camera_id)
+        if existing_status and existing_status.get("status") in active_statuses:
+            logger.info(
+                "摄像头已有任务在执行,拒绝重复启动。摄像头: %s,当前任务: %s,当前状态: %s",
+                camera_id,
+                existing_status.get("task_name"),
+                existing_status.get("status"),
+            )
+            return _build_response(existing_status)
 
 
-        # 记录状态
-        stop_flag = False
         timer = Timer(settings.MAX_TASK_SECONDS, lambda: stop_task_internal(camera_id))
         timer = Timer(settings.MAX_TASK_SECONDS, lambda: stop_task_internal(camera_id))
-        timer.start()
-        future = executor.submit(record_camera, camera_id, task_name, output_file)
         cam_status[camera_id] = {
         cam_status[camera_id] = {
+            "camera_id": camera_id,
             "task_name": task_name,
             "task_name": task_name,
-            "start_time": time.time(),
+            "start_time": started_at.strftime("%Y-%m-%d %H:%M:%S"),
+            "storage_type": settings.STORAGE_TYPE,
+            "status": "starting",
+            "local_file": local_file,
+            "bucket": None,
+            "object_name": None,
+            "download_url": None,
+            "local_file_deleted": False,
+            "stop_reason": None,
+            "error": None,
             "timer": timer,
             "timer": timer,
-            "future": future,
-            "file": output_file,
-            "stop_flag": stop_flag
+            "future": None,
+            "stop_flag": False,
         }
         }
+        task_status_index[_build_task_key(camera_id, task_name)] = cam_status[camera_id]
+
+        try:
+            future = executor.submit(run_recording_task, camera_id, task_name, local_file, started_at)
+        except Exception as exc:
+            timer.cancel()
+            cam_status.pop(camera_id, None)
+            logger.exception("提交后台录像任务失败。摄像头: %s,任务: %s,错误: %s", camera_id, task_name, exc)
+            raise HTTPException(status_code=500, detail="创建录像任务失败")
+
+        cam_status[camera_id]["future"] = future
+        timer.start()
+        response = _build_response(cam_status[camera_id])
 
 
-    return {"status": "started", "file": output_file}
+    logger.info(
+        "录像任务已提交到后台线程。摄像头: %s,任务: %s,本地文件: %s,FFmpeg: %s",
+        camera_id,
+        task_name,
+        local_file,
+        ffmpeg_executable,
+    )
+    return response
 
 
 
 
 @router.post("/stop_task")
 @router.post("/stop_task")
@@ -113,30 +353,60 @@ def stop_task(req: TaskRequest):
     camera_id = req.camera_id
     camera_id = req.camera_id
     task_name = req.task_name
     task_name = req.task_name
 
 
+    logger.info("收到停止录像请求。摄像头: %s,任务: %s", camera_id, task_name)
+
     with lock:
     with lock:
-        status = cam_status.get(camera_id)
-        if not status:
-            return {"status": "not_running"}
+        task_status = cam_status.get(camera_id)
+        if not task_status:
+            return {"status": "not_running", "camera_id": camera_id}
 
 
-        if status["task_name"] != task_name:
-            return {"status": "task_mismatch", "running_task": status["task_name"]}
+        if task_status.get("task_name") != task_name:
+            return {
+                "status": "task_mismatch",
+                "camera_id": camera_id,
+                "running_task": task_status.get("task_name"),
+            }
 
 
-        # 标记停止
-        status["stop_flag"] = True
-        status["timer"].cancel()
+    task_status = _set_stop_flag(camera_id, "收到停止请求,等待录像线程安全结束")
+    if not task_status:
+        return {"status": "not_running", "camera_id": camera_id}
 
 
-    return {"status": "stopped", "file": status["file"]}
+    logger.info("已向后台录像线程发送停止信号。摄像头: %s,任务: %s", camera_id, task_name)
+    return _build_response(task_status)
 
 
 
 
 @router.get("/status/{camera_id}")
 @router.get("/status/{camera_id}")
 def status(camera_id: str):
 def status(camera_id: str):
     with lock:
     with lock:
-        status = cam_status.get(camera_id)
-        if not status:
-            return {"status": "idle"}
-        return {
-            "status": "running",
-            "task_name": status["task_name"],
-            "start_time": status["start_time"],
-            "file": status["file"]
-        }
+        task_status = cam_status.get(camera_id)
+        if not task_status:
+            return {"status": "idle", "camera_id": camera_id}
+        return _build_response(task_status)
+
+
+@router.post("/query_task")
+def query_task(req: TaskRequest):
+    """按 camera_id 和 task_name 查询任务状态。"""
+    camera_id = req.camera_id
+    task_name = req.task_name
+
+    logger.info("收到任务查询请求。摄像头: %s,任务: %s", camera_id, task_name)
+
+    with lock:
+        task_status = task_status_index.get(_build_task_key(camera_id, task_name))
+        if not task_status:
+            return {
+                "status": "not_found",
+                "camera_id": camera_id,
+                "task_name": task_name,
+            }
+
+        response = _build_response(task_status)
+
+    logger.info(
+        "任务查询完成。摄像头: %s,任务: %s,当前状态: %s",
+        camera_id,
+        task_name,
+        response.get("status"),
+    )
+    return response

+ 45 - 4
app/core/config.py

@@ -1,13 +1,54 @@
-class Settings():
+import os
+
+
+def _get_bool(name: str, default: bool) -> bool:
+    value = os.getenv(name)
+    if value is None:
+        return default
+    return value.strip().lower() in {"1", "true", "yes", "on"}
+
+
+def _get_int(name: str, default: int) -> int:
+    value = os.getenv(name)
+    if value is None:
+        return default
+    try:
+        return int(value)
+    except ValueError:
+        return default
+
+
+class Settings:
     CAMERA_CONFIG = {
     CAMERA_CONFIG = {
-        "cam01": "rtsp://admin:password@192.168.1.10:554/live/ch0",
+        "cam01": "rtsp://admin:password@192.168.77.10:554/live/ch0",
         # 可扩展多个摄像头
         # 可扩展多个摄像头
     }
     }
 
 
-    OUTPUT_DIR = "./records"
+    OUTPUT_DIR = os.getenv("OUTPUT_DIR", "./records")
+    MAX_TASK_SECONDS = _get_int("MAX_TASK_SECONDS", 60 * 10)  # 10 分钟超时
+
+    # 日志配置
+    LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
+
+    # 存储配置
+    STORAGE_TYPE = os.getenv("STORAGE_TYPE", "minio").strip().lower()
+    # 上传后是否删掉本地视频
+    DELETE_LOCAL_AFTER_UPLOAD = _get_bool("DELETE_LOCAL_AFTER_UPLOAD", False)
 
 
+    # MinIO 配置,默认值对齐你提供的测试代码
+    MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "192.168.77.249:9000")
+    MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "pZEwCGnpNN05KPnmC2Yh")
+    MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "KfJRuWiv9pVxhIMcFqbkv8hZT9SnNTZ6LPx592D4")
+    MINIO_SECURE = _get_bool("MINIO_SECURE", False)
+    MINIO_BUCKET = os.getenv("MINIO_BUCKET", "grading")
+    MINIO_BASE_PREFIX = os.getenv("MINIO_BASE_PREFIX", "monitor_video_data").strip("/\\")
+    MINIO_AUTO_CREATE_BUCKET = _get_bool("MINIO_AUTO_CREATE_BUCKET", True)
 
 
-    MAX_TASK_SECONDS = 60 * 10  # 10分钟超时
+    # FFmpeg 录像配置
+    FFMPEG_PATH = os.getenv("FFMPEG_PATH", "ffmpeg")
+    FFMPEG_RTSP_TRANSPORT = os.getenv("FFMPEG_RTSP_TRANSPORT", "tcp")
+    FFMPEG_PRESET = os.getenv("FFMPEG_PRESET", "veryfast")
+    FFMPEG_LOGLEVEL = os.getenv("FFMPEG_LOGLEVEL", "error")
 
 
 
 
 settings = Settings()
 settings = Settings()

+ 13 - 0
app/core/logging_config.py

@@ -0,0 +1,13 @@
+import logging
+
+from app.core.config import settings
+
+
+def setup_logging() -> None:
+    """初始化全局日志配置。"""
+    log_level = getattr(logging, settings.LOG_LEVEL.upper(), logging.INFO)
+    logging.basicConfig(
+        level=log_level,
+        format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
+        force=True,
+    )

+ 11 - 5
app/main.py

@@ -1,12 +1,19 @@
+import logging
+import os
+
 from fastapi import FastAPI
 from fastapi import FastAPI
 from fastapi.middleware.cors import CORSMiddleware
 from fastapi.middleware.cors import CORSMiddleware
-import os
-from app.core.config import settings
 
 
 from app.api import camera
 from app.api import camera
+from app.core.config import settings
+from app.core.logging_config import setup_logging
 
 
+setup_logging()
+logger = logging.getLogger(__name__)
 
 
 os.makedirs(settings.OUTPUT_DIR, exist_ok=True)
 os.makedirs(settings.OUTPUT_DIR, exist_ok=True)
+logger.info("已准备本地录像目录: %s", os.path.abspath(settings.OUTPUT_DIR))
+logger.info("当前存储模式: %s", settings.STORAGE_TYPE)
 
 
 app = FastAPI(title="监控摄像头服务")
 app = FastAPI(title="监控摄像头服务")
 app.add_middleware(
 app.add_middleware(
@@ -14,9 +21,8 @@ app.add_middleware(
     allow_origins=["*"],
     allow_origins=["*"],
     allow_credentials=True,
     allow_credentials=True,
     allow_methods=["*"],
     allow_methods=["*"],
-    allow_headers=["*"]
+    allow_headers=["*"],
 )
 )
 
 
-
 # 注册路由
 # 注册路由
-app.include_router(camera.router, prefix="/api/camera", tags=["Camera"])
+app.include_router(camera.router, prefix="/api/monitor", tags=["Camera"])

+ 3 - 0
app/services/__init__.py

@@ -0,0 +1,3 @@
+from app.services.storage import get_storage_service
+
+__all__ = ["get_storage_service"]

+ 151 - 0
app/services/storage.py

@@ -0,0 +1,151 @@
+import logging
+import os
+import re
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Optional
+
+from app.core.config import settings
+
+logger = logging.getLogger(__name__)
+
+
+def sanitize_name(name: str) -> str:
+    """清理路径片段,避免把特殊字符带入对象名或本地文件名。"""
+    cleaned_name = re.sub(r'[\s<>:"/\\|?*]+', "_", str(name)).strip("._ ")
+    return cleaned_name or "unnamed"
+
+
+@dataclass
+class StorageResult:
+    storage_type: str
+    local_file: str
+    bucket: Optional[str] = None
+    object_name: Optional[str] = None
+    download_url: Optional[str] = None
+    local_file_deleted: bool = False
+
+
+class BaseStorageService:
+    def save_recording(
+        self,
+        camera_id: str,
+        task_name: str,
+        local_file: str,
+        started_at: datetime,
+    ) -> StorageResult:
+        raise NotImplementedError
+
+
+class LocalStorageService(BaseStorageService):
+    def save_recording(
+        self,
+        camera_id: str,
+        task_name: str,
+        local_file: str,
+        started_at: datetime,
+    ) -> StorageResult:
+        logger.info("使用本地存储模式,录像文件保留在本地: %s", local_file)
+        return StorageResult(
+            storage_type="local",
+            local_file=local_file,
+        )
+
+
+class MinioStorageService(BaseStorageService):
+    def __init__(self) -> None:
+        try:
+            from minio import Minio
+        except ImportError as exc:
+            raise RuntimeError("未安装 minio 依赖,请先执行 pip install minio") from exc
+
+        self.client = Minio(
+            settings.MINIO_ENDPOINT,
+            access_key=settings.MINIO_ACCESS_KEY,
+            secret_key=settings.MINIO_SECRET_KEY,
+            secure=settings.MINIO_SECURE,
+        )
+
+    def _build_object_name(self, camera_id: str, task_name: str, started_at: datetime, local_file: str) -> str:
+        """按日期分层生成对象名。"""
+        safe_task_name = sanitize_name(task_name)
+        file_ext = os.path.splitext(local_file)[1] or ".mp4"
+        timestamp = started_at.strftime("%Y%m%d_%H%M%S")
+        date_path = started_at.strftime("%Y/%m/%d")
+        base_prefix = settings.MINIO_BASE_PREFIX.strip("/\\")
+        return (
+            f"{base_prefix}/{sanitize_name(camera_id)}/{date_path}/{safe_task_name}_{timestamp}{file_ext}"
+        ).replace("\\", "/")
+
+    def _ensure_bucket_exists(self) -> None:
+        """按配置检查并自动创建桶。"""
+        bucket_name = settings.MINIO_BUCKET
+        if self.client.bucket_exists(bucket_name):
+            return
+
+        if not settings.MINIO_AUTO_CREATE_BUCKET:
+            raise RuntimeError(f"MinIO 存储桶不存在: {bucket_name}")
+
+        self.client.make_bucket(bucket_name)
+        logger.info("MinIO 存储桶不存在,已自动创建: %s", bucket_name)
+
+    def _build_full_url(self, object_name: str) -> str:
+        scheme = "https" if settings.MINIO_SECURE else "http"
+        return f"{scheme}://{settings.MINIO_ENDPOINT}/{settings.MINIO_BUCKET}/{object_name}"
+
+    def save_recording(
+        self,
+        camera_id: str,
+        task_name: str,
+        local_file: str,
+        started_at: datetime,
+    ) -> StorageResult:
+        self._ensure_bucket_exists()
+        object_name = self._build_object_name(camera_id, task_name, started_at, local_file)
+
+        logger.info(
+            "开始上传录像到 MinIO,摄像头: %s,任务: %s,对象路径: %s",
+            camera_id,
+            task_name,
+            object_name,
+        )
+        self.client.fput_object(
+            settings.MINIO_BUCKET,
+            object_name,
+            local_file,
+            content_type="video/mp4",
+        )
+        logger.info("录像上传完成,摄像头: %s,对象路径: %s", camera_id, object_name)
+
+        local_file_deleted = False
+        if settings.DELETE_LOCAL_AFTER_UPLOAD and os.path.exists(local_file):
+            os.remove(local_file)
+            local_file_deleted = True
+            logger.info("已删除本地录像文件: %s", local_file)
+
+        return StorageResult(
+            storage_type="minio",
+            local_file=local_file,
+            bucket=settings.MINIO_BUCKET,
+            object_name=object_name,
+            download_url=self._build_full_url(object_name),
+            local_file_deleted=local_file_deleted,
+        )
+
+
+_storage_service: Optional[BaseStorageService] = None
+
+
+def get_storage_service() -> BaseStorageService:
+    global _storage_service
+
+    if _storage_service is not None:
+        return _storage_service
+
+    if settings.STORAGE_TYPE == "minio":
+        _storage_service = MinioStorageService()
+    else:
+        _storage_service = LocalStorageService()
+
+    logger.info("已初始化存储服务,当前模式: %s", settings.STORAGE_TYPE)
+    return _storage_service

+ 11 - 3
run_MonitorCameraServer.py

@@ -1,8 +1,16 @@
-import uvicorn
+import logging
 import socket
 import socket
 
 
+import uvicorn
+
+from app.core.logging_config import setup_logging
+
+
 if __name__ == "__main__":
 if __name__ == "__main__":
+    setup_logging()
+    logger = logging.getLogger(__name__)
+
     host_ip = socket.gethostbyname(socket.gethostname())
     host_ip = socket.gethostbyname(socket.gethostname())
     port = 7961
     port = 7961
-    print(f"http://{host_ip}:{port}/docs")
-    uvicorn.run("app.main:app", host="0.0.0.0", port=port)
+    logger.info("服务文档地址: http://%s:%s/docs", host_ip, port)
+    uvicorn.run("app.main:app", host="0.0.0.0", port=port)