import os import time import uuid import aiofiles from fastapi import UploadFile from app.core.config import settings async def save_upload_file(file: UploadFile) -> str: """ 保存上传的文件,返回文件名。 文件名格式: timestamp_uuid.ext """ # 1. 确保目录存在 if not os.path.exists(settings.UPLOAD_PATH): os.makedirs(settings.UPLOAD_PATH) # 2. 生成唯一文件名 # 获取扩展名 (如 .jpg) filename = file.filename ext = os.path.splitext(filename)[1] if filename else ".jpg" # 使用 时间戳 + UUID 防止重名 unique_name = f"{int(time.time())}_{uuid.uuid4().hex[:8]}{ext}" file_path = os.path.join(settings.UPLOAD_PATH, unique_name) # 3. 异步写入磁盘 async with aiofiles.open(file_path, 'wb') as out_file: # 即使是 UploadFile,读取 content 也是异步流 content = await file.read() await out_file.write(content) return unique_name def delete_file(filename: str) -> bool: """删除指定文件""" file_path = os.path.join(settings.UPLOAD_PATH, filename) if os.path.exists(file_path): os.remove(file_path) return True return False