Heim >Backend-Entwicklung >Python-Tutorial >Wie führe ich Hintergrundaufgaben in einer FastAPI-Anwendung aus?
Beim Aufbau eines Python-Servers mit FastAPI kann es erforderlich sein, eine Funktion zu integrieren, die nichts mit der API-Logik zu tun hat, aber regelmäßig im Hintergrund ausgeführt werden muss. Dies könnte beispielsweise das Überprüfen externer APIs und das Drucken von Informationen basierend auf den Antworten umfassen.
Ein Ansatz besteht darin, einen Thread zu erstellen, der die gewünschte Funktion ausführt. So kann dies erreicht werden:
import threading def start_worker(): print('[main]: starting worker...') # Define and execute the worker function worker = my_worker.Worker() worker.working_loop() if __name__ == '__main__': print('[main]: starting...') # Start the worker thread _worker_thread = Thread(target=start_worker, daemon=False) _worker_thread.start()
Eine alternative Methode besteht darin, einen Ereignisplaner für die Hintergrundaufgabe zu verwenden:
import sched, time from threading import Thread s = sched.scheduler(time.time, time.sleep) def print_event(sc): print("Hello") sc.enter(5, 1, print_event, (sc,)) def start_scheduler(): s.enter(5, 1, print_event, (s,)) s.run() @app.on_event("startup") async def startup_event(): thread = Thread(target=start_scheduler) thread.start()
Wenn es sich bei der Aufgabe um eine asynchrone Def-Funktion handelt, kann sie mit zur aktuellen Ereignisschleife hinzugefügt werden die Funktion asyncio.create_task():
from fastapi import FastAPI import asyncio async def print_task(s): while True: print('Hello') await asyncio.sleep(s) @asynccontextmanager async def lifespan(app: FastAPI): # Add the async task to the event loop asyncio.create_task(print_task(5)) yield print('Shutting down...') app = FastAPI(lifespan=lifespan)
Das obige ist der detaillierte Inhalt vonWie führe ich Hintergrundaufgaben in einer FastAPI-Anwendung aus?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!