非同期コルーチン開発の実践: 高パフォーマンスのメッセージ キュー システムの構築
インターネットの発展に伴い、メッセージ キュー システムは高パフォーマンスのメッセージ キュー システムを構築するための重要なツールになりました。 - パフォーマンスとスケーラブル 分散システムの主要コンポーネント。メッセージ キュー システムを構築する場合、非同期コルーチンを適用すると、システムのパフォーマンスとスケーラビリティを効果的に向上させることができます。この記事では、高パフォーマンスなメッセージキューシステムの構築を例に、非同期コルーチンの実践的な開発を紹介し、具体的なコード例を示します。
1.1 軽量: 非同期コルーチンは追加のスレッドを作成する必要がなく、少数のコルーチンを作成するだけで済みます。 -スケールの同時実行を実現できます。これにより、システム リソースの消費が大幅に削減されます。
1.2 効率: 非同期コルーチンは、ノンブロッキング I/O およびイベント駆動メカニズムを利用して、非常に低いオーバーヘッドで効率的なタスクのスケジューリングと処理を実現し、コンテキスト切り替えのオーバーヘッドの影響を受けません。
1.3 スケーラビリティ: 非同期コルーチンは、スレッド プール サイズなどのパラメーターを手動で調整する必要がなく、システム負荷の増加に応じて自動的に拡張できます。
import asyncio message_queue = [] subscriptions = {} async def publish(channel, message): message_queue.append((channel, message)) await notify_subscribers() async def notify_subscribers(): while message_queue: channel, message = message_queue.pop(0) for subscriber in subscriptions.get(channel, []): asyncio.ensure_future(subscriber(message)) async def subscribe(channel, callback): if channel not in subscriptions: subscriptions[channel] = [] subscriptions[channel].append(callback) async def consumer(message): print("Received message:", message) async def main(): await subscribe("channel1", consumer) await publish("channel1", "hello world") if __name__ == "__main__": asyncio.run(main())
上記のコードでは、公開されたメッセージを保存するために message_queue
リストを使用します。購読者と対応するチャネルを保存するための辞書 subscriptions
。 publish
関数はメッセージの公開に使用され、notify_subscribers
関数は購読者に通知するために使用され、subscribe
関数はチャネルの購読に使用され、consumer
function Consumer を例として挙げます。
main
関数では、まず subscribe
関数を使用して channel1
チャネルにサブスクライブし、consumer
を指定します。加入者向けの機能。次に、publish
関数を使用して channel1
チャネルにメッセージをパブリッシュすると、notify_subscribers
がメッセージを購読者に自動的に送信します。
以下は、非同期 I/O とコルーチン プールに基づくメッセージ キュー システム用に最適化されたサンプル コードです。
import asyncio from concurrent.futures import ThreadPoolExecutor message_queue = [] subscriptions = {} executor = ThreadPoolExecutor() async def publish(channel, message): message_queue.append((channel, message)) await notify_subscribers() async def notify_subscribers(): while message_queue: channel, message = message_queue.pop(0) for subscriber in subscriptions.get(channel, []): await execute(subscriber(message)) async def execute(callback): loop = asyncio.get_running_loop() await loop.run_in_executor(executor, callback) async def subscribe(channel, callback): if channel not in subscriptions: subscriptions[channel] = [] subscriptions[channel].append(callback) async def consumer(message): print("Received message:", message) async def main(): await subscribe("channel1", consumer) await publish("channel1", "hello world") if __name__ == "__main__": asyncio.run(main())
最適化されたサンプル コードでは、executor を使用します。
コルーチン プールを作成し、execute
関数を介して実行できるようにコールバック関数をコルーチン プールに配置します。これにより、過剰なコンテキストの切り替えが回避され、コールバック関数が同時に実行され、メッセージ処理機能が向上します。
もちろん、実際のメッセージキューシステムでは、メッセージ永続化やメッセージ確認機構の導入、水平拡張など、さらに最適化や拡張が可能です。
以上が非同期コルーチン開発の実践: 高性能メッセージ キュー システムの構築の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。