FastAPI 中的后台任务执行
在 FastAPI 中,开发者可能会遇到需要在后台运行某个函数的场景,该函数与他们的API。为此,需要采用线程机制。
解决方案 1:在 uvicorn.run 之前启动线程
一种方法是在调用 uvicorn.run 之前启动线程,因为 uvicorn.run 会阻塞线程。这种方法可以按如下方式实现:
import time import threading from fastapi import FastAPI import uvicorn app = FastAPI() class BackgroundTasks(threading.Thread): def run(self,*args,**kwargs): while True: print('Hello') time.sleep(5) if __name__ == '__main__': t = BackgroundTasks() t.start() uvicorn.run(app, host="0.0.0.0", port=8000)
或者,开发人员可以利用 FastAPI 的启动事件在应用程序启动之前启动线程:
@app.on_event("startup") async def startup_event(): t = BackgroundTasks() t.start()
解决方案 2:事件调度程序
另一个选项是为以下对象实现一个重复事件调度程序后台任务:
import sched, time from threading import Thread from fastapi import FastAPI import uvicorn app = FastAPI() s = sched.scheduler(time.time, time.sleep) def print_event(sc): print("Hello") sc.enter(5, 1, print_event, (sc,)) def start_scheduler(): s.enter(5, 1, print_event, (s,)) s.run() @app.on_event("startup") async def startup_event(): thread = Thread(target=start_scheduler) thread.start() if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=8000)
解决方案 3:使用事件循环
对于异步后台任务(async def 函数),开发人员可以将当前事件循环与asyncio.create_task() 函数:
from asyncio import create_task from fastapi import FastAPI app = FastAPI() @app.on_event("startup") async def startup_task(): await create_task(print_task(5))
在这种方法中,事件循环在启动时创建uvicorn 服务器。
以上是如何在FastAPI中运行后台任务?的详细内容。更多信息请关注PHP中文网其他相关文章!