使用构建 Python 服务器时FastAPI,可能需要合并一个与 API 逻辑无关但需要在后台重复运行的函数。例如,这可能涉及检查外部 API 并根据响应打印信息。
一种方法是创建一个执行所需函数的线程。实现方法如下:
import threading def start_worker(): print('[main]: starting worker...') # Define and execute the worker function worker = my_worker.Worker() worker.working_loop() if __name__ == '__main__': print('[main]: starting...') # Start the worker thread _worker_thread = Thread(target=start_worker, daemon=False) _worker_thread.start()
另一种方法涉及使用事件调度程序来执行后台任务:
import sched, time from threading import Thread s = sched.scheduler(time.time, time.sleep) def print_event(sc): print("Hello") sc.enter(5, 1, print_event, (sc,)) def start_scheduler(): s.enter(5, 1, print_event, (s,)) s.run() @app.on_event("startup") async def startup_event(): thread = Thread(target=start_scheduler) thread.start()
如果任务是async def函数,可以将其添加到使用 asyncio.create_task() 函数的当前事件循环:
from fastapi import FastAPI import asyncio async def print_task(s): while True: print('Hello') await asyncio.sleep(s) @asynccontextmanager async def lifespan(app: FastAPI): # Add the async task to the event loop asyncio.create_task(print_task(5)) yield print('Shutting down...') app = FastAPI(lifespan=lifespan)
以上是如何在 FastAPI 应用程序中运行后台任务?的详细内容。更多信息请关注PHP中文网其他相关文章!