Maison >développement back-end >Tutoriel Python >Comment exécuter des tâches en arrière-plan dans une application FastAPI ?
Lors de la construction d'un serveur Python à l'aide FastAPI, il peut être nécessaire d'incorporer une fonction qui n'est pas liée à la logique de l'API mais qui doit s'exécuter en arrière-plan de manière récurrente. Par exemple, cela pourrait impliquer de vérifier des API externes et d'imprimer des informations en fonction des réponses.
Une approche consiste à créer un fil qui exécute la fonction souhaitée. Voici comment cela peut être réalisé :
import threading def start_worker(): print('[main]: starting worker...') # Define and execute the worker function worker = my_worker.Worker() worker.working_loop() if __name__ == '__main__': print('[main]: starting...') # Start the worker thread _worker_thread = Thread(target=start_worker, daemon=False) _worker_thread.start()
Une méthode alternative consiste à utiliser un planificateur d'événements pour la tâche en arrière-plan :
import sched, time from threading import Thread s = sched.scheduler(time.time, time.sleep) def print_event(sc): print("Hello") sc.enter(5, 1, print_event, (sc,)) def start_scheduler(): s.enter(5, 1, print_event, (s,)) s.run() @app.on_event("startup") async def startup_event(): thread = Thread(target=start_scheduler) thread.start()
Si la tâche est une fonction de définition asynchrone, elle peut être ajoutée à la boucle d'événement actuelle en utilisant la fonction asyncio.create_task() :
from fastapi import FastAPI import asyncio async def print_task(s): while True: print('Hello') await asyncio.sleep(s) @asynccontextmanager async def lifespan(app: FastAPI): # Add the async task to the event loop asyncio.create_task(print_task(5)) yield print('Shutting down...') app = FastAPI(lifespan=lifespan)
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!