Heim >Backend-Entwicklung >Python-Tutorial >Beherrschen Sie Python Async IO mit FastAPI
Da es sich bei Python um eine interpretierte Sprache handelt, ist die Reaktionszeit bei Verwendung für die Back-End-Entwicklung, beispielsweise in Kombination mit Python Django, im Vergleich zu Java Spring etwas länger. Solange der Code jedoch angemessen ist, ist der Unterschied nicht allzu groß. Selbst wenn Django den Multiprozessmodus verwendet, ist seine Fähigkeit zur gleichzeitigen Verarbeitung immer noch viel schwächer. Python bietet einige Lösungen zur Verbesserung der gleichzeitigen Verarbeitungsfähigkeiten. Beispielsweise kann durch die Verwendung des asynchronen Frameworks FastAPI mit seinen asynchronen Funktionen die Fähigkeit zur gleichzeitigen Verarbeitung von E/A-intensiven Aufgaben erheblich verbessert werden. FastAPI ist eines der schnellsten Python-Frameworks.
Werfen wir zunächst einen kurzen Blick auf die Verwendung von FastAPI.
Installation:
pip install fastapi
Einfacher serverseitiger Code:
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
Startup:
uvicorn app:app --reload
Wir können sehen, dass die Schnittstelle von FastAPI im Vergleich zu anderen Frameworks nur über ein zusätzliches Schlüsselwort async verfügt. Das Schlüsselwort async definiert die Schnittstelle als asynchron. Allein anhand des Rückgabeergebnisses können wir den Unterschied zwischen FastAPI und anderen Python-Frameworks nicht erkennen. Der Unterschied liegt im gleichzeitigen Zugriff. Wenn die Server-Threads von FastAPI Routenanfragen wie http://127.0.0.1:8000/ verarbeiten und auf Netzwerk-E/A stoßen, warten sie nicht mehr darauf, sondern bearbeiten stattdessen andere Anfragen. Wenn die Netzwerk-E/A abgeschlossen ist, wird die Ausführung fortgesetzt. Diese asynchrone Fähigkeit verbessert die Verarbeitungsfähigkeit von E/A-intensiven Aufgaben.
Schauen wir uns ein anderes Beispiel an. Im Geschäftscode wird eine explizite asynchrone Netzwerkanforderung initiiert. Diese Netzwerk-E/A wird von FastAPI ebenso wie Routenanfragen asynchron verarbeitet.
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
Wenn Sie möchten, dass die Datenbank-E/A asynchron erfolgt, benötigen Sie die Unterstützung asynchroner Vorgänge vom Datenbanktreiber oder ORM.
Die Kernimplementierung der Asynchronität von FastAPI ist asynchrone E/A. Wir können einen Server mit asynchronen Verarbeitungsfunktionen direkt über asynchrone E/A starten, ohne FastAPI zu verwenden.
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json') async def init(loop): # Use the event loop to monitor web requests app = web.Application(loop=loop) app.router.add_route('GET', '/', index) # Start the server, and the event loop monitors and processes web requests srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv # Explicitly get an event loop loop = asyncio.get_event_loop() # Start the event loop loop.run_until_complete(init(loop)) loop.run_forever()
Wenn dieses Beispiel gestartet wird, ist das Rückgabeergebnis von http://127.0.0.1:8000/ dasselbe wie das von Beispiel 1. Das zugrunde liegende Implementierungsprinzip der asynchronen E/A sind „Coroutinen“ und „Ereignisschleifen“. .
pip install fastapi
Der Funktionsindex wird mit async def definiert, was bedeutet, dass es sich um eine Coroutine handelt. Das Schlüsselwort „await“ wird vor einer E/A-Operation verwendet, um dem Ausführungsthread mitzuteilen, dass er nicht auf diese E/A-Operation warten soll. Der Aufruf normaler Funktionen wird über den Stapel implementiert, und Funktionen können nur einzeln aufgerufen und ausgeführt werden. Eine Coroutine ist jedoch eine besondere Art von Funktion (kein kollaborativer Thread). Dadurch kann der Thread die Ausführung an der Wartemarke anhalten und zur Ausführung anderer Aufgaben wechseln. Wenn der E/A-Vorgang abgeschlossen ist, wird die Ausführung fortgesetzt.
Werfen wir einen Blick auf die Auswirkung der gleichzeitigen Ausführung mehrerer Coroutinen.
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
Ausgabe:
uvicorn app:app --reload
Wir können sehen, dass der Thread die drei Aufgaben nicht einzeln ausführt. Wenn es auf einen E/A-Vorgang stößt, wechselt es zur Ausführung anderer Aufgaben. Nachdem der E/A-Vorgang abgeschlossen ist, wird er weiter ausgeführt. Es ist auch ersichtlich, dass die drei Coroutinen im Grunde gleichzeitig mit dem Warten auf den E/A-Vorgang beginnen, sodass die endgültigen Abschlusszeiten für die Ausführung im Wesentlichen gleich sind. Obwohl die Ereignisschleife hier nicht explizit verwendet wird, wird sie von asyncio.run implizit verwendet.
Coroutinen werden durch Generatoren implementiert. Generatoren können die Ausführung von Funktionen anhalten und auch wieder fortsetzen, was die Eigenschaften von Coroutinen sind.
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
Wenn der Generator mit next() läuft und auf Yield stößt, wird er angehalten. Wenn next() erneut ausgeführt wird, wird die Ausführung an der Stelle fortgesetzt, an der sie das letzte Mal angehalten wurde. Vor Python 3.5 wurden Coroutinen auch mit „Annotationen“ geschrieben. Ab Python 3.5 wird asynchrones Def-Await verwendet.
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json') async def init(loop): # Use the event loop to monitor web requests app = web.Application(loop=loop) app.router.add_route('GET', '/', index) # Start the server, and the event loop monitors and processes web requests srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv # Explicitly get an event loop loop = asyncio.get_event_loop() # Start the event loop loop.run_until_complete(init(loop)) loop.run_forever()
Die Pausen- und Fortsetzungsfunktionen von Generatoren können nicht nur für Coroutinen, sondern auch für viele andere Zwecke verwendet werden. Beispielsweise können Algorithmen während der Schleife berechnet und gespeichert werden. Zum Beispiel die Implementierung eines Pascal-Dreiecks (beide Enden jeder Zeile sind 1 und die Zahlen an anderen Positionen sind die Summe der beiden Zahlen darüber).
async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json')
Ausgabe:
import asyncio from datetime import datetime async def coroutine3(): print(f"Coroutine 3 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 3 finished at {datetime.now()}") async def coroutine2(): print(f"Coroutine 2 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 2 finished at {datetime.now()}") async def coroutine1(): print(f"Coroutine 1 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 1 finished at {datetime.now()}") async def main(): print("Main started") # Create tasks to make coroutines execute concurrently task1 = asyncio.create_task(coroutine1()) task2 = asyncio.create_task(coroutine2()) task3 = asyncio.create_task(coroutine3()) # Wait for all tasks to complete await task1 await task2 await task3 print("Main finished") # Run the main coroutine asyncio.run(main())
Da die Ausführung einer Coroutine angehalten werden kann, stellt sich die Frage: Wann wird die Ausführung der Coroutine wieder aufgenommen? Dies erfordert die Verwendung einer Ereignisschleife, um den Ausführungsthread darüber zu informieren.
Main started Coroutine 1 started at 2024-12-27 12:28:01.661251 Coroutine 2 started at 2024-12-27 12:28:01.661276 Coroutine 3 started at 2024-12-27 12:28:01.665012 Coroutine 1 finished at 2024-12-27 12:28:02.665125 Coroutine 2 finished at 2024-12-27 12:28:02.665120 Coroutine 3 finished at 2024-12-27 12:28:02.665120 Main finished
Die Ereignisschleife nutzt die I/O-Multiplexing-Technologie und wechselt ständig, um Ereignisse zu überwachen, bei denen Coroutinen weiterhin ausgeführt werden können. Wenn sie ausgeführt werden können, führt der Thread die Coroutinen weiterhin aus.
Um I/O-Multiplexing auf einfache Weise zu verstehen: Ich bin der Chef einer Kurierstation. Ich muss nicht jeden Kurier aktiv nach der Erledigung seiner Aufgaben fragen. Stattdessen kommen die Kuriere nach Erledigung ihrer Aufgaben selbstständig zu mir. Dadurch verbessert sich meine Fähigkeit zur Aufgabenverarbeitung und ich kann mehr Dinge erledigen.
Auswahl, Umfrage und Epoll können alle E/A-Multiplexing erreichen. Im Vergleich zu Select und Poll weist Epoll eine bessere Leistung auf. Linux verwendet im Allgemeinen standardmäßig Epoll und macOS verwendet Kqueue, das Epoll ähnelt und eine ähnliche Leistung aufweist.
pip install fastapi
Starten Sie den Server-Socket, um den angegebenen Port zu überwachen. Bei der Ausführung auf einem Linux-System verwenden Selektoren standardmäßig epoll als Implementierung. Der Code verwendet Epoll, um ein Anforderungsempfangsereignis (Akzeptanzereignis) zu registrieren. Wenn eine neue Anforderung eintrifft, löst Epoll die Ereignisbehandlungsfunktion aus und führt sie aus. Gleichzeitig registriert es ein Leseereignis (Leseereignis), um die Anforderungsdaten zu verarbeiten und darauf zu reagieren. Beim Zugriff von der Webseite mit http://127.0.0.1:8000/ ist das Rückgabeergebnis dasselbe wie in Beispiel 1. Server, auf dem log:
ausgeführt wird
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
Verwenden Sie Socket direkt, um einen Server zu starten. Beim Zugriff mit einem Browser unter http://127.0.0.1:8080/ oder mit Curl http://127.0.0.1:8080/ wird {"Hello": "World"}
zurückgegeben
uvicorn app:app --reload
Bei Zugriff mit Curl http://127.0.0.1:8001/, Server läuft Protokoll:
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
Asynchrone E/A wird auf der untersten Ebene mithilfe von „Coroutinen“ und „Ereignisschleifen“ implementiert. „Coroutinen“ stellen sicher, dass der Thread, wenn er während der Ausführung auf markierte E/A-Vorgänge stößt, nicht auf den Abschluss der E/A warten muss, sondern anhalten und den Thread andere Aufgaben ausführen lassen kann, ohne ihn zu blockieren. „Ereignisschleifen“ nutzen die E/A-Multiplexing-Technologie und überwachen ständig E/A-Ereignisse. Wenn ein bestimmtes E/A-Ereignis abgeschlossen ist, wird der entsprechende Rückruf ausgelöst, sodass die Coroutine die Ausführung fortsetzen kann.
Lassen Sie mich abschließend die ideale Plattform für die Bereitstellung von Flask/FastAPI vorstellen: Leapcell.
Leapcell ist eine Cloud-Computing-Plattform, die speziell für moderne verteilte Anwendungen entwickelt wurde. Das Pay-as-you-go-Preismodell stellt sicher, dass keine Leerlaufkosten anfallen, d. h. Benutzer zahlen nur für die Ressourcen, die sie tatsächlich nutzen.
Die einzigartigen Vorteile von Leapcell für WSGI/ASGI-Anwendungen:
Erfahren Sie mehr in der Dokumentation!
Leapcell Twitter: https://x.com/LeapcellHQ
Das obige ist der detaillierte Inhalt vonBeherrschen Sie Python Async IO mit FastAPI. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!