test01.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import asyncio
  2. from contextlib import asynccontextmanager
  3. from fastapi import FastAPI
  4. import time
  5. import logging
  6. # 配置日志
  7. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  8. # 创建一个 asyncio.Event 来作为关闭信号
  9. shutdown_event = asyncio.Event()
  10. async def my_background_worker(name: str):
  11. """
  12. 一个模拟的后台工作任务,它会持续运行直到收到关闭信号。
  13. """
  14. logging.info(f"Worker '{name}' starting.")
  15. count = 0
  16. while not shutdown_event.is_set():
  17. try:
  18. # 你的业务逻辑
  19. logging.info(f"Worker '{name}' is running, count: {count}")
  20. count += 1
  21. # 使用 asyncio.sleep 而不是 time.sleep,避免阻塞事件循环
  22. await asyncio.sleep(2)
  23. except asyncio.CancelledError:
  24. # 当任务被取消时,正常退出循环
  25. logging.warning(f"Worker '{name}' received cancellation request. Stopping.")
  26. break
  27. logging.info(f"Worker '{name}' has stopped gracefully.")
  28. @asynccontextmanager
  29. async def lifespan(app: FastAPI):
  30. logging.info("Application startup...")
  31. # 在应用启动时,创建并启动后台任务
  32. # 将 task 存储在 app.state 中,以便在关闭时可以访问它
  33. app.state.worker_task = None
  34. yield # 应用在此处运行
  35. logging.info("Application shutdown...")
  36. # 1. 发送关闭信号
  37. logging.info("Signaling worker to shut down.")
  38. shutdown_event.set()
  39. # 2. 等待后台任务完成
  40. try:
  41. # 给一个超时时间,防止无限期等待
  42. await asyncio.wait_for(app.state.worker_task, timeout=5.0)
  43. logging.info("Worker task has been awaited.")
  44. except asyncio.TimeoutError:
  45. logging.error("Worker task did not finish in time, cancelling.")
  46. app.state.worker_task.cancel() # 如果超时,强制取消
  47. # 等待取消操作完成
  48. await asyncio.gather(app.state.worker_task, return_exceptions=True)
  49. except Exception as e:
  50. logging.error(f"An error occurred during worker shutdown: {e}")
  51. app = FastAPI(lifespan=lifespan)
  52. @app.get("/")
  53. async def root():
  54. app.state.worker_task = asyncio.create_task(my_background_worker("Worker-1"))
  55. return {"message": "Hello World"}
  56. if __name__ == '__main__':
  57. import uvicorn
  58. uvicorn.run(app)