| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- import logging
- import os
- import re
- from dataclasses import dataclass
- from datetime import datetime
- from typing import Optional
- from app.core.config import settings
- logger = logging.getLogger(__name__)
- def sanitize_name(name: str) -> str:
- """清理路径片段,避免把特殊字符带入对象名或本地文件名。"""
- cleaned_name = re.sub(r'[\s<>:"/\\|?*]+', "_", str(name)).strip("._ ")
- return cleaned_name or "unnamed"
- @dataclass
- class StorageResult:
- storage_type: str
- local_file: str
- bucket: Optional[str] = None
- object_name: Optional[str] = None
- download_url: Optional[str] = None
- local_file_deleted: bool = False
- class BaseStorageService:
- def save_recording(
- self,
- camera_id: str,
- task_name: str,
- local_file: str,
- started_at: datetime,
- ) -> StorageResult:
- raise NotImplementedError
- class LocalStorageService(BaseStorageService):
- def save_recording(
- self,
- camera_id: str,
- task_name: str,
- local_file: str,
- started_at: datetime,
- ) -> StorageResult:
- logger.info("使用本地存储模式,录像文件保留在本地: %s", local_file)
- return StorageResult(
- storage_type="local",
- local_file=local_file,
- )
- class MinioStorageService(BaseStorageService):
- def __init__(self) -> None:
- try:
- from minio import Minio
- except ImportError as exc:
- raise RuntimeError("未安装 minio 依赖,请先执行 pip install minio") from exc
- self.client = Minio(
- settings.MINIO_ENDPOINT,
- access_key=settings.MINIO_ACCESS_KEY,
- secret_key=settings.MINIO_SECRET_KEY,
- secure=settings.MINIO_SECURE,
- )
- def _build_object_name(self, camera_id: str, task_name: str, started_at: datetime, local_file: str) -> str:
- """按日期分层生成对象名。"""
- safe_task_name = sanitize_name(task_name)
- file_ext = os.path.splitext(local_file)[1] or ".mp4"
- timestamp = started_at.strftime("%Y%m%d_%H%M%S")
- date_path = started_at.strftime("%Y/%m/%d")
- base_prefix = settings.MINIO_BASE_PREFIX.strip("/\\")
- return (
- f"{base_prefix}/{sanitize_name(camera_id)}/{date_path}/{safe_task_name}_{timestamp}{file_ext}"
- ).replace("\\", "/")
- def _ensure_bucket_exists(self) -> None:
- """按配置检查并自动创建桶。"""
- bucket_name = settings.MINIO_BUCKET
- if self.client.bucket_exists(bucket_name):
- return
- if not settings.MINIO_AUTO_CREATE_BUCKET:
- raise RuntimeError(f"MinIO 存储桶不存在: {bucket_name}")
- self.client.make_bucket(bucket_name)
- logger.info("MinIO 存储桶不存在,已自动创建: %s", bucket_name)
- def _build_full_url(self, object_name: str) -> str:
- scheme = "https" if settings.MINIO_SECURE else "http"
- return f"{scheme}://{settings.MINIO_ENDPOINT}/{settings.MINIO_BUCKET}/{object_name}"
- def save_recording(
- self,
- camera_id: str,
- task_name: str,
- local_file: str,
- started_at: datetime,
- ) -> StorageResult:
- self._ensure_bucket_exists()
- object_name = self._build_object_name(camera_id, task_name, started_at, local_file)
- logger.info(
- "开始上传录像到 MinIO,摄像头: %s,任务: %s,对象路径: %s",
- camera_id,
- task_name,
- object_name,
- )
- self.client.fput_object(
- settings.MINIO_BUCKET,
- object_name,
- local_file,
- content_type="video/mp4",
- )
- logger.info("录像上传完成,摄像头: %s,对象路径: %s", camera_id, object_name)
- local_file_deleted = False
- if settings.DELETE_LOCAL_AFTER_UPLOAD and os.path.exists(local_file):
- os.remove(local_file)
- local_file_deleted = True
- logger.info("已删除本地录像文件: %s", local_file)
- return StorageResult(
- storage_type="minio",
- local_file=local_file,
- bucket=settings.MINIO_BUCKET,
- object_name=object_name,
- download_url=self._build_full_url(object_name),
- local_file_deleted=local_file_deleted,
- )
- _storage_service: Optional[BaseStorageService] = None
- def get_storage_service() -> BaseStorageService:
- global _storage_service
- if _storage_service is not None:
- return _storage_service
- if settings.STORAGE_TYPE == "minio":
- _storage_service = MinioStorageService()
- else:
- _storage_service = LocalStorageService()
- logger.info("已初始化存储服务,当前模式: %s", settings.STORAGE_TYPE)
- return _storage_service
|