| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import asyncio
- from contextlib import asynccontextmanager
- from fastapi import FastAPI
- import time
- import logging
- # 配置日志
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- # 创建一个 asyncio.Event 来作为关闭信号
- shutdown_event = asyncio.Event()
- async def my_background_worker(name: str):
- """
- 一个模拟的后台工作任务,它会持续运行直到收到关闭信号。
- """
- logging.info(f"Worker '{name}' starting.")
- count = 0
- while not shutdown_event.is_set():
- try:
- # 你的业务逻辑
- logging.info(f"Worker '{name}' is running, count: {count}")
- count += 1
- # 使用 asyncio.sleep 而不是 time.sleep,避免阻塞事件循环
- await asyncio.sleep(2)
- except asyncio.CancelledError:
- # 当任务被取消时,正常退出循环
- logging.warning(f"Worker '{name}' received cancellation request. Stopping.")
- break
- logging.info(f"Worker '{name}' has stopped gracefully.")
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- logging.info("Application startup...")
- # 在应用启动时,创建并启动后台任务
- # 将 task 存储在 app.state 中,以便在关闭时可以访问它
- app.state.worker_task = None
- yield # 应用在此处运行
- logging.info("Application shutdown...")
- # 1. 发送关闭信号
- logging.info("Signaling worker to shut down.")
- shutdown_event.set()
- # 2. 等待后台任务完成
- try:
- # 给一个超时时间,防止无限期等待
- await asyncio.wait_for(app.state.worker_task, timeout=5.0)
- logging.info("Worker task has been awaited.")
- except asyncio.TimeoutError:
- logging.error("Worker task did not finish in time, cancelling.")
- app.state.worker_task.cancel() # 如果超时,强制取消
- # 等待取消操作完成
- await asyncio.gather(app.state.worker_task, return_exceptions=True)
- except Exception as e:
- logging.error(f"An error occurred during worker shutdown: {e}")
- app = FastAPI(lifespan=lifespan)
- @app.get("/")
- async def root():
- app.state.worker_task = asyncio.create_task(my_background_worker("Worker-1"))
- return {"message": "Hello World"}
- if __name__ == '__main__':
- import uvicorn
- uvicorn.run(app)
|