FastAPI での非同期タスク処理にメッセージ キューを使用する方法
はじめに:
Web アプリケーションでは、電子メールの送信やレポートの生成など、時間のかかるタスクを処理する必要があることがよくあります。 、など。これらのタスクが同期の要求と応答のプロセスに配置されると、ユーザーは長時間待機する必要があり、ユーザー エクスペリエンスとサーバーの応答速度が低下します。この問題を解決するには、非同期タスク処理にメッセージ キューを使用します。この記事では、メッセージ キューを使用して FastAPI フレームワークで非同期タスクを処理する方法を紹介し、対応するコード例を示します。
1. メッセージ キューとは何ですか?
メッセージ キューは、アプリケーション コンポーネント間の非同期通信のためのメカニズムです。これにより、送信者はメッセージをキューに送信し、受信者はキューからこれらのメッセージを取得して処理できるようになります。メッセージ キューの利点は、送信者と受信者が分離されていることです。送信者は、他のタスクの実行を続ける前に受信者が処理を完了するのを待つ必要がないため、システムのスループットと同時実行パフォーマンスが向上します。
2. 適切なメッセージ キュー サービスの選択
メッセージ キューを使用する前に、適切なメッセージ キュー サービスを選択する必要があります。現在、より一般的に使用されているメッセージ キュー サービスには、RabbitMQ、Kafka、ActiveMQ などが含まれます。これらのメッセージキューサービスは豊富な機能と信頼性保証を提供しており、実際のニーズに応じて適切なサービスを選択できます。
3. FastAPI でのメッセージ キューの使用
FastAPI でメッセージ キューを使用するには、まず対応するメッセージ キュー クライアント ライブラリをインストールする必要があります。 RabbitMQ を例に挙げると、pip install aio-pika
コマンドを使用してインストールできます。インストールが完了したら、対応する依存関係とモジュールを FastAPI のメイン ファイルに導入できます。
from fastapi import FastAPI from fastapi import BackgroundTasks from aio_pika import connect, IncomingMessage
次に、メッセージ キューの接続情報を構成し、メッセージを処理する関数を作成する必要があります。
AMQP_URL = "amqp://guest:guest@localhost/" QUEUE_NAME = "task_queue" async def process_message(message: IncomingMessage): # 在这里编写异步任务的处理逻辑 # 例如发送邮件、生成报表等 print(f"Received message: {message.body}") # 这里可以根据实际情况进行任务处理 # ... message.ack()
次に、非同期処理が必要なタスクを受け取るためのインターフェイスを FastAPI アプリケーションに定義する必要があります。
app = FastAPI() @app.post("/task") async def handle_task(request: dict, background_tasks: BackgroundTasks): connection = await connect(AMQP_URL) channel = await connection.channel() queue = await channel.declare_queue(QUEUE_NAME) # 发送任务给消息队列 await queue.publish( body=str(request).encode(), routing_key=QUEUE_NAME ) connection.close() return {"message": "Task submitted successfully"}
上記のコードは、POST インターフェイス /task
を定義します。リクエストが受信されると、タスクは非同期処理のためにメッセージ キューに渡され、処理後に成功メッセージが返されます。完成されました。
最後に、メッセージ キューをリッスンして非同期タスクを処理する非同期関数を作成する必要があります。
async def listen_to_queue(): connection = await connect(AMQP_URL) channel = await connection.channel() queue = await channel.declare_queue(QUEUE_NAME) # 持续监听消息队列 async with queue.iterator() as queue_iterator: async for message in queue_iterator: async with message.process(): await process_message(message)
FastAPI アプリケーションの入り口で、メッセージ キューをリッスンする非同期関数を開始する必要があります。
app = FastAPI() @app.on_event("startup") async def startup_event(): # 启动消息队列监听 await listen_to_queue()
ここまでで、FastAPI のメッセージキューを使用した非同期タスク処理の設定とコーディングが完了しました。
結論:
メッセージ キューを使用すると、時間のかかるタスクを同期プロセスから分離し、アプリケーションのパフォーマンスと応答速度を向上させることができます。この記事では、FastAPI でメッセージ キューを構成および使用する方法について説明し、対応するコード例を示します。非同期タスク処理を開発する際の参考になれば幸いです。
参考文献:
[1] https://fastapi.tiangolo.com/
[2] https://docs.aio-pika.readthedocs.io/
以上がFastAPI での非同期タスク処理にメッセージ キューを使用する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。