file_ops.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. import os
  2. import time
  3. import uuid
  4. import aiofiles
  5. from fastapi import UploadFile
  6. from app.core.config import settings
  7. async def save_upload_file(file: UploadFile) -> str:
  8. """
  9. 保存上传的文件,返回文件名。
  10. 文件名格式: timestamp_uuid.ext
  11. """
  12. # 1. 确保目录存在
  13. if not os.path.exists(settings.UPLOAD_PATH):
  14. os.makedirs(settings.UPLOAD_PATH)
  15. # 2. 生成唯一文件名
  16. # 获取扩展名 (如 .jpg)
  17. filename = file.filename
  18. ext = os.path.splitext(filename)[1] if filename else ".jpg"
  19. # 使用 时间戳 + UUID 防止重名
  20. unique_name = f"{int(time.time())}_{uuid.uuid4().hex[:8]}{ext}"
  21. file_path = os.path.join(settings.UPLOAD_PATH, unique_name)
  22. # 3. 异步写入磁盘
  23. async with aiofiles.open(file_path, 'wb') as out_file:
  24. # 即使是 UploadFile,读取 content 也是异步流
  25. content = await file.read()
  26. await out_file.write(content)
  27. return unique_name
  28. def delete_file(filename: str) -> bool:
  29. """删除指定文件"""
  30. file_path = os.path.join(settings.UPLOAD_PATH, filename)
  31. if os.path.exists(file_path):
  32. os.remove(file_path)
  33. return True
  34. return False