Maison >développement back-end >Tutoriel Python >Comment exécuter des tâches en arrière-plan dans une application FastAPI ?

Comment exécuter des tâches en arrière-plan dans une application FastAPI ?

DDD
DDDoriginal
2024-12-05 06:52:10622parcourir

How to Run Background Tasks in a FastAPI Application?

Comment exécuter un fil de discussion en arrière-plan pour FastAPI Python

Défi : incorporer une fonction d'arrière-plan indépendante dans FastAPI

Lors de la construction d'un serveur Python à l'aide FastAPI, il peut être nécessaire d'incorporer une fonction qui n'est pas liée à la logique de l'API mais qui doit s'exécuter en arrière-plan de manière récurrente. Par exemple, cela pourrait impliquer de vérifier des API externes et d'imprimer des informations en fonction des réponses.

Option 1 : Utiliser un fil

Une approche consiste à créer un fil qui exécute la fonction souhaitée. Voici comment cela peut être réalisé :

import threading

def start_worker():
    print('[main]: starting worker...')
    # Define and execute the worker function
    worker = my_worker.Worker()
    worker.working_loop()

if __name__ == '__main__':
    print('[main]: starting...')
    # Start the worker thread
    _worker_thread = Thread(target=start_worker, daemon=False)
    _worker_thread.start()

Option 2 : Planification d'événements

Une méthode alternative consiste à utiliser un planificateur d'événements pour la tâche en arrière-plan :

import sched, time
from threading import Thread

s = sched.scheduler(time.time, time.sleep)

def print_event(sc):
    print("Hello")
    sc.enter(5, 1, print_event, (sc,))

def start_scheduler():
    s.enter(5, 1, print_event, (s,))
    s.run()

@app.on_event("startup")
async def startup_event():
    thread = Thread(target=start_scheduler)
    thread.start()

Option 3 : Tâches asynchrones avec boucle d'événement

Si la tâche est une fonction de définition asynchrone, elle peut être ajoutée à la boucle d'événement actuelle en utilisant la fonction asyncio.create_task() :

from fastapi import FastAPI
import asyncio

async def print_task(s):
    while True:
        print('Hello')
        await asyncio.sleep(s)

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Add the async task to the event loop
    asyncio.create_task(print_task(5))
    yield
    print('Shutting down...')

app = FastAPI(lifespan=lifespan)

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn