storage.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. import logging
  2. import os
  3. import re
  4. from dataclasses import dataclass
  5. from datetime import datetime
  6. from typing import Optional
  7. from app.core.config import settings
  8. logger = logging.getLogger(__name__)
  9. def sanitize_name(name: str) -> str:
  10. """清理路径片段,避免把特殊字符带入对象名或本地文件名。"""
  11. cleaned_name = re.sub(r'[\s<>:"/\\|?*]+', "_", str(name)).strip("._ ")
  12. return cleaned_name or "unnamed"
  13. @dataclass
  14. class StorageResult:
  15. storage_type: str
  16. local_file: str
  17. bucket: Optional[str] = None
  18. object_name: Optional[str] = None
  19. download_url: Optional[str] = None
  20. local_file_deleted: bool = False
  21. class BaseStorageService:
  22. def save_recording(
  23. self,
  24. camera_id: str,
  25. task_name: str,
  26. local_file: str,
  27. started_at: datetime,
  28. ) -> StorageResult:
  29. raise NotImplementedError
  30. class LocalStorageService(BaseStorageService):
  31. def save_recording(
  32. self,
  33. camera_id: str,
  34. task_name: str,
  35. local_file: str,
  36. started_at: datetime,
  37. ) -> StorageResult:
  38. logger.info("使用本地存储模式,录像文件保留在本地: %s", local_file)
  39. return StorageResult(
  40. storage_type="local",
  41. local_file=local_file,
  42. )
  43. class MinioStorageService(BaseStorageService):
  44. def __init__(self) -> None:
  45. try:
  46. from minio import Minio
  47. except ImportError as exc:
  48. raise RuntimeError("未安装 minio 依赖,请先执行 pip install minio") from exc
  49. self.client = Minio(
  50. settings.MINIO_ENDPOINT,
  51. access_key=settings.MINIO_ACCESS_KEY,
  52. secret_key=settings.MINIO_SECRET_KEY,
  53. secure=settings.MINIO_SECURE,
  54. )
  55. def _build_object_name(self, camera_id: str, task_name: str, started_at: datetime, local_file: str) -> str:
  56. """按日期分层生成对象名。"""
  57. safe_task_name = sanitize_name(task_name)
  58. file_ext = os.path.splitext(local_file)[1] or ".mp4"
  59. timestamp = started_at.strftime("%Y%m%d_%H%M%S")
  60. date_path = started_at.strftime("%Y/%m/%d")
  61. base_prefix = settings.MINIO_BASE_PREFIX.strip("/\\")
  62. return (
  63. f"{base_prefix}/{sanitize_name(camera_id)}/{date_path}/{safe_task_name}_{timestamp}{file_ext}"
  64. ).replace("\\", "/")
  65. def _ensure_bucket_exists(self) -> None:
  66. """按配置检查并自动创建桶。"""
  67. bucket_name = settings.MINIO_BUCKET
  68. if self.client.bucket_exists(bucket_name):
  69. return
  70. if not settings.MINIO_AUTO_CREATE_BUCKET:
  71. raise RuntimeError(f"MinIO 存储桶不存在: {bucket_name}")
  72. self.client.make_bucket(bucket_name)
  73. logger.info("MinIO 存储桶不存在,已自动创建: %s", bucket_name)
  74. def _build_full_url(self, object_name: str) -> str:
  75. scheme = "https" if settings.MINIO_SECURE else "http"
  76. return f"{scheme}://{settings.MINIO_ENDPOINT}/{settings.MINIO_BUCKET}/{object_name}"
  77. def save_recording(
  78. self,
  79. camera_id: str,
  80. task_name: str,
  81. local_file: str,
  82. started_at: datetime,
  83. ) -> StorageResult:
  84. self._ensure_bucket_exists()
  85. object_name = self._build_object_name(camera_id, task_name, started_at, local_file)
  86. logger.info(
  87. "开始上传录像到 MinIO,摄像头: %s,任务: %s,对象路径: %s",
  88. camera_id,
  89. task_name,
  90. object_name,
  91. )
  92. self.client.fput_object(
  93. settings.MINIO_BUCKET,
  94. object_name,
  95. local_file,
  96. content_type="video/mp4",
  97. )
  98. logger.info("录像上传完成,摄像头: %s,对象路径: %s", camera_id, object_name)
  99. local_file_deleted = False
  100. if settings.DELETE_LOCAL_AFTER_UPLOAD and os.path.exists(local_file):
  101. os.remove(local_file)
  102. local_file_deleted = True
  103. logger.info("已删除本地录像文件: %s", local_file)
  104. return StorageResult(
  105. storage_type="minio",
  106. local_file=local_file,
  107. bucket=settings.MINIO_BUCKET,
  108. object_name=object_name,
  109. download_url=self._build_full_url(object_name),
  110. local_file_deleted=local_file_deleted,
  111. )
  112. _storage_service: Optional[BaseStorageService] = None
  113. def get_storage_service() -> BaseStorageService:
  114. global _storage_service
  115. if _storage_service is not None:
  116. return _storage_service
  117. if settings.STORAGE_TYPE == "minio":
  118. _storage_service = MinioStorageService()
  119. else:
  120. _storage_service = LocalStorageService()
  121. logger.info("已初始化存储服务,当前模式: %s", settings.STORAGE_TYPE)
  122. return _storage_service