| 123456789101112131415161718192021222324252627282930313233343536373839404142 |
- import os
- import time
- import uuid
- import aiofiles
- from fastapi import UploadFile
- from app.core.config import settings
- async def save_upload_file(file: UploadFile) -> str:
- """
- 保存上传的文件,返回文件名。
- 文件名格式: timestamp_uuid.ext
- """
- # 1. 确保目录存在
- if not os.path.exists(settings.UPLOAD_PATH):
- os.makedirs(settings.UPLOAD_PATH)
- # 2. 生成唯一文件名
- # 获取扩展名 (如 .jpg)
- filename = file.filename
- ext = os.path.splitext(filename)[1] if filename else ".jpg"
- # 使用 时间戳 + UUID 防止重名
- unique_name = f"{int(time.time())}_{uuid.uuid4().hex[:8]}{ext}"
- file_path = os.path.join(settings.UPLOAD_PATH, unique_name)
- # 3. 异步写入磁盘
- async with aiofiles.open(file_path, 'wb') as out_file:
- # 即使是 UploadFile,读取 content 也是异步流
- content = await file.read()
- await out_file.write(content)
- return unique_name
- def delete_file(filename: str) -> bool:
- """删除指定文件"""
- file_path = os.path.join(settings.UPLOAD_PATH, filename)
- if os.path.exists(file_path):
- os.remove(file_path)
- return True
- return False
|