FastAPI 中的后台作业:异步运行函数
在 FastAPI 中,当您想要执行不应阻塞的异步操作时,后台函数是必需的或延迟主要 API 操作。让我们探讨一下如何有效地设置后台作业。
1.基于线程的后台任务:
启动线程是在后台运行函数的一种简单方法:
def start_worker(): print('[main]: starting worker...') my_worker = worker.Worker() my_worker.working_loop() # this function prints "hello" every 5 seconds if __name__ == '__main__': print('[main]: starting...') uvicorn.run(app, host="0.0.0.0", port=8000, reload=True) _worker_thread = Thread(target=start_worker, daemon=False) _worker_thread.start()
但是,这种方法有一个限制:它会阻塞主线程的执行。为了避免这种情况,请在主 API 进程之前启动线程,如下所示:
_worker_thread.start() uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
2.基于事件的后台任务:
另一种选择是使用基于事件的调度程序:
import sched, time from threading import Thread from fastapi import FastAPI import uvicorn app = FastAPI() s = sched.scheduler(time.time, time.sleep) def print_event(sc): print("Hello") sc.enter(5, 1, print_event, (sc,)) def start_scheduler(): s.enter(5, 1, print_event, (s,)) s.run() @app.on_event("startup") async def startup_event(): thread = Thread(target=start_scheduler) thread.start() if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=8000)
3。 AsyncIO 后台任务:
如果您的后台函数是异步的,您可以使用 asyncio.create_task() 函数:
from fastapi import FastAPI from contextlib import asynccontextmanager import asyncio async def print_task(s): while True: print('Hello') await asyncio.sleep(s) @asynccontextmanager async def lifespan(app: FastAPI): # Run at startup asyncio.create_task(print_task(5)) yield # Run on shutdown (if required) print('Shutting down...') app = FastAPI(lifespan=lifespan)
以上是如何在FastAPI中高效运行后台作业?的详细内容。更多信息请关注PHP中文网其他相关文章!