ホームページ >バックエンド開発 >Python チュートリアル >FastAPI の非同期プログラミングで API パフォーマンスを大幅に向上
API ゲームを次のレベルに引き上げる準備はできていますか?
FastAPI は、API をより高速にし、応答性を高め、プロのように重い負荷を処理できるようにします。
この記事では、FastAPI で非同期プログラミングを活用して高パフォーマンスの API を構築する方法を説明します。最終的には、非同期エンドポイントを実装し、効果的にテストするための知識が身につくでしょう。
マスターする内容は次のとおりです:
非同期プログラミングにより、タスクを同時に実行できます。これは、応答を待つことが一般的なネットワーク リクエスト、データベース クエリ、ファイル操作などのタスクに特に役立ちます。
従来の同期プログラミングでは、タスクが順番に実行されるため、複数のリクエストを処理するときに遅延が発生します。非同期プログラミングを使用すると、複数のユーザーに同時にサービスを提供し、リソースの使用率を最大化し、より良いユーザー エクスペリエンスを確保できます。
さあ、腕まくりして素晴らしいものを作りましょう!
まず、必要なライブラリをインストールします。
pip install "fastapi[standard]" httpx aiofiles pytest
以下は、FastAPI での非同期プログラミングを示す完全な例です。コードの各部分は独自の目的を果たしており、順を追って説明します。
from fastapi import FastAPI, BackgroundTasks import httpx import aiofiles import pytest from fastapi.testclient import TestClient app = FastAPI() # API Endpoints @app.get("/item/{item_id}") async def read_item(item_id: int): return {"item_id": item_id} @app.get("/external-api") async def call_external_api(): async with httpx.AsyncClient() as client: response = await client.get("https://jsonplaceholder.typicode.com/posts/1") return response.json() @app.get("/read-file") async def read_file(): async with aiofiles.open("example.txt", mode="r") as file: content = await file.read() return {"content": content} def send_email(email: str, message: str): print(f"Sending email to {email} with message: {message}") @app.post("/send-email/") async def schedule_email(background_tasks: BackgroundTasks, email: str): background_tasks.add_task(send_email, email, "Welcome!") return {"message": "Email scheduled"} # Testing Code client = TestClient(app) def test_read_item(): response = client.get("/item/1") assert response.status_code == 200 assert response.json() == {"item_id": 1} def test_read_file(): # Create an example file for testing with open("example.txt", "w") as file: file.write("This is a test file content") response = client.get("/read-file") assert response.status_code == 200 assert response.json() == {"content": "This is a test file content"} def test_schedule_email(): response = client.post("/send-email/?email=fogigav197@rabitex.com") assert response.status_code == 200 assert response.json() == {"message": "Email scheduled"} @pytest.mark.asyncio async def test_call_external_api(): async with httpx.AsyncClient() as async_client: response = await async_client.get("https://jsonplaceholder.typicode.com/posts/1") assert response.status_code == 200 assert "id" in response.json()
1.単純な非同期エンドポイント
@app.get("/item/{item_id}") async def read_item(item_id: int): return {"item_id": item_id}
このエンドポイントは、async def を使用して基本的な非同期ルートを定義する方法を示します。 ID によってアイテムを取得します。
2.外部 API の呼び出し
@app.get("/external-api") async def call_external_api(): async with httpx.AsyncClient() as client: response = await client.get("https://jsonplaceholder.typicode.com/posts/1") return response.json()
これは、httpx を使用してノンブロッキング HTTP リクエストを作成する方法を示しています。外部 API 応答を待機している間、アプリケーションは他のリクエストを処理できます。
3.非同期ファイル読み取り
@app.get("/read-file") async def read_file(): async with aiofiles.open("example.txt", mode="r") as file: content = await file.read() return {"content": content}
これはファイルを非同期に読み取り、ファイル操作によってアプリケーションがブロックされないようにします。
4.バックグラウンドタスクの実行
def send_email(email: str, message: str): print(f"Sending email to {email} with message: {message}") @app.post("/send-email/") async def schedule_email(background_tasks: BackgroundTasks, email: str): background_tasks.add_task(send_email, email, "Welcome!") return {"message": "Email scheduled"}
このエンドポイントは電子メールを送信するバックグラウンド タスクをスケジュールし、メイン スレッドが他のリクエストを処理できるようにします。
1.基本エンドポイントのテスト
def test_read_item(): response = client.get("/item/1") assert response.status_code == 200 assert response.json() == {"item_id": 1}
これにより、/item/{item_id} エンドポイントが予期したデータを返すことが検証されます。
2.ファイルの読み込みテスト
pip install "fastapi[standard]" httpx aiofiles pytest
これにより、テスト ファイルが作成され、/read-file エンドポイントがその内容を正しく読み取って返すかどうかがチェックされます。
3.バックグラウンド タスクのテスト
from fastapi import FastAPI, BackgroundTasks import httpx import aiofiles import pytest from fastapi.testclient import TestClient app = FastAPI() # API Endpoints @app.get("/item/{item_id}") async def read_item(item_id: int): return {"item_id": item_id} @app.get("/external-api") async def call_external_api(): async with httpx.AsyncClient() as client: response = await client.get("https://jsonplaceholder.typicode.com/posts/1") return response.json() @app.get("/read-file") async def read_file(): async with aiofiles.open("example.txt", mode="r") as file: content = await file.read() return {"content": content} def send_email(email: str, message: str): print(f"Sending email to {email} with message: {message}") @app.post("/send-email/") async def schedule_email(background_tasks: BackgroundTasks, email: str): background_tasks.add_task(send_email, email, "Welcome!") return {"message": "Email scheduled"} # Testing Code client = TestClient(app) def test_read_item(): response = client.get("/item/1") assert response.status_code == 200 assert response.json() == {"item_id": 1} def test_read_file(): # Create an example file for testing with open("example.txt", "w") as file: file.write("This is a test file content") response = client.get("/read-file") assert response.status_code == 200 assert response.json() == {"content": "This is a test file content"} def test_schedule_email(): response = client.post("/send-email/?email=fogigav197@rabitex.com") assert response.status_code == 200 assert response.json() == {"message": "Email scheduled"} @pytest.mark.asyncio async def test_call_external_api(): async with httpx.AsyncClient() as async_client: response = await async_client.get("https://jsonplaceholder.typicode.com/posts/1") assert response.status_code == 200 assert "id" in response.json()
これは、バックグラウンド電子メール タスクが正常にスケジュールされているかどうかをテストします。
4.外部 API 呼び出しのテスト
@app.get("/item/{item_id}") async def read_item(item_id: int): return {"item_id": item_id}
これにより、/external-api エンドポイントが外部ソースからデータを正しくフェッチできるようになります。
提供されたコードを使用すると、FastAPI を使用して非同期 API を構築およびテストする方法を実践的に理解できるようになりました。ファイルの処理、外部 API の呼び出し、バックグラウンド タスクのスケジュールのいずれであっても、非同期プログラミングを使用すると、簡単に拡張できる高性能アプリケーションを作成できます。
次の FastAPI プロジェクトを構築する準備はできましたか?始めましょう!
読んでいただきありがとうございます...
コーディングを楽しんでください!
以上がFastAPI の非同期プログラミングで API パフォーマンスを大幅に向上の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。