import logging import os import re from dataclasses import dataclass from datetime import datetime from typing import Optional from app.core.config import settings logger = logging.getLogger(__name__) def sanitize_name(name: str) -> str: """清理路径片段,避免把特殊字符带入对象名或本地文件名。""" cleaned_name = re.sub(r'[\s<>:"/\\|?*]+', "_", str(name)).strip("._ ") return cleaned_name or "unnamed" @dataclass class StorageResult: storage_type: str local_file: str bucket: Optional[str] = None object_name: Optional[str] = None download_url: Optional[str] = None local_file_deleted: bool = False class BaseStorageService: def save_recording( self, camera_id: str, task_name: str, local_file: str, started_at: datetime, ) -> StorageResult: raise NotImplementedError class LocalStorageService(BaseStorageService): def save_recording( self, camera_id: str, task_name: str, local_file: str, started_at: datetime, ) -> StorageResult: logger.info("使用本地存储模式,录像文件保留在本地: %s", local_file) return StorageResult( storage_type="local", local_file=local_file, ) class MinioStorageService(BaseStorageService): def __init__(self) -> None: try: from minio import Minio except ImportError as exc: raise RuntimeError("未安装 minio 依赖,请先执行 pip install minio") from exc self.client = Minio( settings.MINIO_ENDPOINT, access_key=settings.MINIO_ACCESS_KEY, secret_key=settings.MINIO_SECRET_KEY, secure=settings.MINIO_SECURE, ) def _build_object_name(self, camera_id: str, task_name: str, started_at: datetime, local_file: str) -> str: """按日期分层生成对象名。""" safe_task_name = sanitize_name(task_name) file_ext = os.path.splitext(local_file)[1] or ".mp4" timestamp = started_at.strftime("%Y%m%d_%H%M%S") date_path = started_at.strftime("%Y/%m/%d") base_prefix = settings.MINIO_BASE_PREFIX.strip("/\\") return ( f"{base_prefix}/{sanitize_name(camera_id)}/{date_path}/{safe_task_name}_{timestamp}{file_ext}" ).replace("\\", "/") def _ensure_bucket_exists(self) -> None: """按配置检查并自动创建桶。""" bucket_name = settings.MINIO_BUCKET if self.client.bucket_exists(bucket_name): return if not settings.MINIO_AUTO_CREATE_BUCKET: raise RuntimeError(f"MinIO 存储桶不存在: {bucket_name}") self.client.make_bucket(bucket_name) logger.info("MinIO 存储桶不存在,已自动创建: %s", bucket_name) def _build_full_url(self, object_name: str) -> str: scheme = "https" if settings.MINIO_SECURE else "http" return f"{scheme}://{settings.MINIO_ENDPOINT}/{settings.MINIO_BUCKET}/{object_name}" def save_recording( self, camera_id: str, task_name: str, local_file: str, started_at: datetime, ) -> StorageResult: self._ensure_bucket_exists() object_name = self._build_object_name(camera_id, task_name, started_at, local_file) logger.info( "开始上传录像到 MinIO,摄像头: %s,任务: %s,对象路径: %s", camera_id, task_name, object_name, ) self.client.fput_object( settings.MINIO_BUCKET, object_name, local_file, content_type="video/mp4", ) logger.info("录像上传完成,摄像头: %s,对象路径: %s", camera_id, object_name) local_file_deleted = False if settings.DELETE_LOCAL_AFTER_UPLOAD and os.path.exists(local_file): os.remove(local_file) local_file_deleted = True logger.info("已删除本地录像文件: %s", local_file) return StorageResult( storage_type="minio", local_file=local_file, bucket=settings.MINIO_BUCKET, object_name=object_name, download_url=self._build_full_url(object_name), local_file_deleted=local_file_deleted, ) _storage_service: Optional[BaseStorageService] = None def get_storage_service() -> BaseStorageService: global _storage_service if _storage_service is not None: return _storage_service if settings.STORAGE_TYPE == "minio": _storage_service = MinioStorageService() else: _storage_service = LocalStorageService() logger.info("已初始化存储服务,当前模式: %s", settings.STORAGE_TYPE) return _storage_service