Maison >développement back-end >Tutoriel Python >Comment exécuter efficacement des tâches en arrière-plan dans FastAPI ?
Tâches en arrière-plan dans FastAPI : exécuter des fonctions de manière asynchrone
Dans FastAPI, les fonctions en arrière-plan sont nécessaires lorsque vous souhaitez effectuer des opérations asynchrones qui ne doivent pas bloquer ou retarder les principales opérations de l'API. Explorons comment configurer efficacement les tâches en arrière-plan.
1. Tâches en arrière-plan basées sur les threads :
Le démarrage de threads est un moyen simple d'exécuter des fonctions en arrière-plan :
def start_worker(): print('[main]: starting worker...') my_worker = worker.Worker() my_worker.working_loop() # this function prints "hello" every 5 seconds if __name__ == '__main__': print('[main]: starting...') uvicorn.run(app, host="0.0.0.0", port=8000, reload=True) _worker_thread = Thread(target=start_worker, daemon=False) _worker_thread.start()
Cependant, cette approche a une limitation : elle bloque le thread principal d'exécution. Pour éviter cela, démarrez votre fil de discussion avant le processus principal de l'API, comme indiqué ci-dessous :
_worker_thread.start() uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
2. Tâches en arrière-plan basées sur des événements :
Une autre option consiste à utiliser des planificateurs basés sur des événements :
import sched, time from threading import Thread from fastapi import FastAPI import uvicorn app = FastAPI() s = sched.scheduler(time.time, time.sleep) def print_event(sc): print("Hello") sc.enter(5, 1, print_event, (sc,)) def start_scheduler(): s.enter(5, 1, print_event, (s,)) s.run() @app.on_event("startup") async def startup_event(): thread = Thread(target=start_scheduler) thread.start() if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=8000)
3. Tâches d'arrière-plan AsyncIO :
Si votre fonction d'arrière-plan est asynchrone, vous pouvez utiliser la fonction asyncio.create_task() :
from fastapi import FastAPI from contextlib import asynccontextmanager import asyncio async def print_task(s): while True: print('Hello') await asyncio.sleep(s) @asynccontextmanager async def lifespan(app: FastAPI): # Run at startup asyncio.create_task(print_task(5)) yield # Run on shutdown (if required) print('Shutting down...') app = FastAPI(lifespan=lifespan)
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!