import asyncio from contextlib import asynccontextmanager from fastapi import FastAPI import time import logging # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 创建一个 asyncio.Event 来作为关闭信号 shutdown_event = asyncio.Event() async def my_background_worker(name: str): """ 一个模拟的后台工作任务,它会持续运行直到收到关闭信号。 """ logging.info(f"Worker '{name}' starting.") count = 0 while not shutdown_event.is_set(): try: # 你的业务逻辑 logging.info(f"Worker '{name}' is running, count: {count}") count += 1 # 使用 asyncio.sleep 而不是 time.sleep,避免阻塞事件循环 await asyncio.sleep(2) except asyncio.CancelledError: # 当任务被取消时,正常退出循环 logging.warning(f"Worker '{name}' received cancellation request. Stopping.") break logging.info(f"Worker '{name}' has stopped gracefully.") @asynccontextmanager async def lifespan(app: FastAPI): logging.info("Application startup...") # 在应用启动时,创建并启动后台任务 # 将 task 存储在 app.state 中,以便在关闭时可以访问它 app.state.worker_task = None yield # 应用在此处运行 logging.info("Application shutdown...") # 1. 发送关闭信号 logging.info("Signaling worker to shut down.") shutdown_event.set() # 2. 等待后台任务完成 try: # 给一个超时时间,防止无限期等待 await asyncio.wait_for(app.state.worker_task, timeout=5.0) logging.info("Worker task has been awaited.") except asyncio.TimeoutError: logging.error("Worker task did not finish in time, cancelling.") app.state.worker_task.cancel() # 如果超时,强制取消 # 等待取消操作完成 await asyncio.gather(app.state.worker_task, return_exceptions=True) except Exception as e: logging.error(f"An error occurred during worker shutdown: {e}") app = FastAPI(lifespan=lifespan) @app.get("/") async def root(): app.state.worker_task = asyncio.create_task(my_background_worker("Worker-1")) return {"message": "Hello World"} if __name__ == '__main__': import uvicorn uvicorn.run(app)