ホームページ >バックエンド開発 >Python チュートリアル >KubeMQ による GenAI アプリケーションの強化: 検索拡張生成 (RAG) の効率的なスケーリング
生成 AI (GenAI) の採用が業界全体で急増する中、組織は検索拡張生成 (RAG) 技術をますます活用して、リアルタイムでコンテキスト豊富な AI モデルを強化しています。データ。このようなアプリケーションで情報の複雑なフローを管理することは、特に継続的に生成される大規模なデータを扱う場合に、重大な課題を引き起こします。 堅牢なメッセージ ブローカーである KubeMQ は、複数の RAG プロセスのルーティングを合理化し、GenAI アプリケーションでの効率的なデータ処理を保証するソリューションとして登場しました。
RAG ワークフローの効率とスケーラビリティをさらに強化するには、FalkorDB のような高性能データベースを統合することが不可欠です。 FalkorDB は、RAG システムが依存する動的なナレッジ ベースに信頼性が高くスケーラブルなストレージ ソリューションを提供し、迅速なデータ取得と KubeMQ のようなメッセージング システムとのシームレスな統合を保証します。
RAG は、検索メカニズムを統合することで生成 AI モデルを強化し、推論中にモデルが外部の知識ベースにアクセスできるようにするパラダイムです。このアプローチでは、利用可能な最新の関連情報に基づいて生成された応答の精度、関連性、適時性が大幅に向上します。
RAG を使用する一般的な GenAI ワークフローでは、プロセスには複数のステップが含まれます。
クエリ処理: ユーザーの入力を解釈して意図とコンテキストを理解します
取得: FalkorDB などの動的なナレッジ ベースから関連するドキュメントやデータを取得します。これにより、最新の関連情報に迅速かつ効率的にアクセスできます。
生成: 入力データと取得したデータの両方を使用して応答を生成します
応答配信: 最終的な充実した出力をユーザーに提供します
特にデータが継続的に生成および更新される環境でこれらのステップを拡張するには、RAG パイプラインのさまざまなコンポーネント間のデータ フローのための効率的で信頼性の高いメカニズムが必要です。
IoT ネットワーク、ソーシャル メディア プラットフォーム、リアルタイム分析システムなどのシナリオでは、新しいデータが絶えず生成され、AI モデルはこの情報を組み込むために迅速に適応する必要があります。従来の要求/応答アーキテクチャは、高スループット条件下ではボトルネックとなり、レイテンシーの問題やパフォーマンスの低下につながる可能性があります。
KubeMQ は、サービス間の効率的なデータ ルーティングのためのスケーラブルで堅牢なインフラストラクチャを提供することで、高スループットのメッセージング シナリオを管理します。 KubeMQ を RAG パイプラインに統合することにより、新しいデータ ポイントがそれぞれメッセージ キューまたはストリームにパブリッシュされ、システムに負荷をかけることなく、取得コンポーネントが最新の情報に即時にアクセスできるようになります。このリアルタイム データ処理機能は、GenAI 出力の関連性と正確性を維持するために非常に重要です。
KubeMQ は、キュー、ストリーム、パブリッシュ/サブスクライブ (pub/sub)、リモート プロシージャ コール (RPC) などのさまざまなメッセージング パターンを提供し、RAG パイプライン内で多用途かつ強力なルーターとなります。その低レイテンシーと高性能特性により、迅速なメッセージ配信が保証されます。これは、遅延がユーザー エクスペリエンスやシステム効率に大きな影響を与える可能性があるリアルタイム GenAI アプリケーションにとって不可欠です。
さらに、複雑なルーティング ロジックを処理できる KubeMQ の機能により、高度なデータ分散戦略が可能になります。これにより、AI システムのさまざまなコンポーネントが、不必要な重複や遅延を発生させることなく、必要なデータを必要なときに正確に受信できるようになります。
KubeMQ はサービス間でメッセージを効率的にルーティングしますが、FalkorDB は、RAG プロセスに必要な膨大な量のデータを保存および取得するためのスケーラブルで高性能なグラフ データベース ソリューションを提供することでこれを補完します。この統合により、新しいデータが KubeMQ を介して流れると、シームレスに FalkorDB に保存され、レイテンシーやボトルネックを発生させることなく、すぐに取得操作に利用できるようになります。
GenAI アプリケーションのユーザー ベースとデータ量の両方が増加するにつれて、スケーラビリティが最大の懸念事項になります。 KubeMQ はスケーラブルであり、負荷の増加にシームレスに対応するための水平スケーリングをサポートしています。これにより、RAG プロセスの数が増加したり、データ生成が加速したりしても、メッセージング インフラストラクチャの堅牢性と応答性が維持されます。
さらに、KubeMQ はメッセージの永続性とフォールト トレランスを提供します。システム障害やネットワーク中断が発生した場合、KubeMQ はメッセージが失われず、システムが正常に回復できることを保証します。この信頼性は、ユーザーがタイムリーで正確な情報を得るために依存する AI アプリケーションの整合性を維持する上で非常に重要です。
RAG パイプラインでのデータ処理のためのカスタム ルーティング サービスの実装は、リソースを大量に消費し、複雑になる可能性があります。多くの場合、これらのサービスを構築、維持、拡張するには多大な開発労力が必要となり、コアの AI アプリケーション開発から焦点がそらされてしまいます。
KubeMQ を採用することで、組織はオーダーメイドのルーティング ソリューションを作成する必要がなくなります。 KubeMQ は、複雑なルーティング パターン、メッセージ フィルタリング、優先順位の処理など、RAG プロセスのルーティング ニーズに対処するすぐに使える機能を提供します。これにより、開発とメンテナンスのオーバーヘッドが削減されるだけでなく、GenAI ソリューションの市場投入までの時間が短縮されます。
KubeMQ は、メッセージ ブローカー機能と対話するための複数のインターフェイスを提供します。
REST API: 言語に依存しない統合を可能にし、任意のプログラミング言語で記述されたサービスが HTTP 経由でメッセージを送受信できるようにします
SDK: さまざまなプログラミング言語 (Python、Java、Go、.NET など) のクライアント ライブラリを提供し、ネイティブ統合を通じてより効率的な通信パターンとパフォーマンスの向上を促進します
この柔軟性により、開発者は特定のユースケースに最適な方法を選択できるため、アーキテクチャが簡素化され、開発サイクルが加速されます。データ ルーティングの単一タッチポイントにより、RAG パイプラインのさまざまなコンポーネント間の通信が合理化され、システム全体の一貫性が強化されます。
このコード例は、KubeMQ を RAG パイプラインに統合して映画情報検索システムを構築する方法を示しています。 GPT-4 を使用してナレッジ グラフを構築するために、Rotten Tomatoes から映画 URL を取り込むサーバーをセットアップします。ユーザーはチャット クライアントを通じてこのシステムと対話し、映画関連のクエリを送信し、AI が生成した応答を受信できます。このユースケースでは、映画のコンテキスト内で効率的なメッセージ処理とサービス間通信に KubeMQ を利用して、実際のアプリケーションで継続的なデータの取り込みとリアルタイムのクエリ処理を処理する方法を示します。
データ取り込みサービス: 新しいデータが利用可能になったときにキャプチャして KubeMQ ストリームに公開します
取得サービス: KubeMQ ストリームをサブスクライブして更新を受信し、ナレッジ ベースを更新します
生成サービス: クエリリクエストをリッスンし、AI モデルと対話し、応答を生成します
レスポンス サービス: 生成されたレスポンスを適切なチャネルを通じてユーザーに送り返します
KubeMQ が動作していることを確認します。これは、Docker を使用して KubeMQ をデプロイすることで実現できます。
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
このコマンドは、REST および gRPC 通信に必要なポートを公開して KubeMQ を開始します。
このコード (GitHub リポジトリ) は、チャット クエリを処理し、メッセージ処理に KubeMQ を使用してナレッジ ソースを管理する RAG サーバーを実装します。
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
サーバーは 2 つのメイン スレッドを実行します。1 つは「rag-chat-query」と呼ばれるチャネルを通じてチャット クエリをサブスクライブし、GPT-4 のナレッジ グラフを使用して処理します。もう 1 つは「rag-chat-query」と呼ばれるキューから継続的にプルします。 -sources-queue」を使用して、新しいソースをナレッジ グラフに追加します。ナレッジ グラフは、JSON ファイルからロードされたカスタム オントロジーで初期化され、処理に OpenAI の GPT-4 モデルを使用します。サーバーは正常なシャットダウン処理とエラー管理を実装し、サーバーの停止時にすべてのスレッドが適切に終了するようにします。
# server.py import json import threading from typing import List from dotenv import load_dotenv load_dotenv() import time from kubemq.common import CancellationToken from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription from kubemq.queues import Client as QueuesClient from graphrag_sdk.models.openai import OpenAiGenerativeModel from graphrag_sdk.model_config import KnowledgeGraphModelConfig from graphrag_sdk import KnowledgeGraph, Ontology from graphrag_sdk.source import URL class RAGServer: def __init__(self): self.cq_client = CQClient(address="localhost:50000") self.queues_client = QueuesClient(address="localhost:50000") model = OpenAiGenerativeModel(model_name="gpt-4o") with open("ontology.json", "r") as f: ontology = json.load(f) ontology = Ontology.from_json(ontology) self.kg = KnowledgeGraph( name="movies", model_config=KnowledgeGraphModelConfig.with_model(model), ontology=ontology) self.chat = self.kg.chat_session() self.shutdown_event = threading.Event() self.threads: List[threading.Thread] = [] def handle_chat(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') print(f"Received chat message: {message}") result= self.chat.send_message(message) answer = result.get("response","No answer") print(f"Chat response: {answer}") response = QueryResponseMessage( query_received=request, is_executed=True, body=answer.encode('utf-8') ) self.cq_client.send_response_message(response) except Exception as e: print(f"Error processing chat message: {str(e)}") self.cq_client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def pull_from_queue(self): while not self.shutdown_event.is_set(): try: result = self.queues_client.pull("rag-sources-queue", 10, 1) if result.is_error: print(f"Error pulling message from queue: {result.error}") continue sources = [] for message in result.messages: source = message.body.decode('utf-8') print(f"Received source: {source}, adding to knowledge graph") sources.append(URL(message.body.decode('utf-8'))) if sources: self.kg.process_sources(sources) except Exception as e: if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error processing sources: {str(e)}") def subscribe_to_chat_queries(self): def on_error(err: str): if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error: {err}") cancellation_token = CancellationToken() try: self.cq_client.subscribe_to_queries( subscription=QueriesSubscription( channel="rag-chat-query", on_receive_query_callback=self.handle_chat, on_error_callback=on_error, ), cancel=cancellation_token ) # Wait for shutdown signal while not self.shutdown_event.is_set(): time.sleep(0.1) # Cancel subscription when shutdown is requested cancellation_token.cancel() except Exception as e: if not self.shutdown_event.is_set(): print(f"Error in subscription thread: {str(e)}") def run(self): chat_thread = threading.Thread(target=self.subscribe_to_chat_queries) queue_thread = threading.Thread(target=self.pull_from_queue) self.threads.extend([chat_thread, queue_thread]) for thread in self.threads: thread.daemon = True # Make threads daemon so they exit when main thread exits thread.start() print("RAG server started") try: while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down gracefully...") self.shutdown() self.cq_client.close() self.queues_client.close() def shutdown(self): print("Initiating shutdown sequence...") self.shutdown_event.set() # Signal all threads to stop for thread in self.threads: thread.join(timeout=5.0) # Wait up to 5 seconds for each thread if thread.is_alive(): print(f"Warning: Thread {thread.name} did not shutdown cleanly") print("Shutdown complete") if __name__ == "__main__": rag_server = RAGServer() rag_server.run()
このコードは、KubeMQ のキュー システムを通じてムービー URL を RAG サーバーに送信する単純なクライアントを実装します。具体的には、KubeMQ に接続する SourceClient クラスを作成し、RAG サーバーが監視するのと同じキューである「rag-sources-queue」チャネルにメッセージを送信します。メイン プログラムとして実行すると、Rotten Tomatoes の映画 URL (マトリックス映画、ジョン ウィック、スピードを含む) のリストが送信され、RAG サーバーによって処理され、ナレッジ グラフに追加されます。
# sources_client.py from kubemq.queues import * class SourceClient: def __init__(self, address="localhost:50000"): self.client = Client(address=address) def send_source(self, message: str) : send_result = self.client.send_queues_message( QueueMessage( channel="rag-sources-queue", body=message.encode("utf-8"), ) ) if send_result.is_error: print(f"message send error, error:{send_result.error}") if __name__ == "__main__": client = SourceClient() urls = ["https://www.rottentomatoes.com/m/side_by_side_2012", "https://www.rottentomatoes.com/m/matrix", "https://www.rottentomatoes.com/m/matrix_revolutions", "https://www.rottentomatoes.com/m/matrix_reloaded", "https://www.rottentomatoes.com/m/speed_1994", "https://www.rottentomatoes.com/m/john_wick_chapter_4"] for url in urls: client.send_source(url) print("done")
このコードは、KubeMQ のクエリ システムを通じて RAG サーバーと通信するチャット クライアントを実装します。 ChatClient クラスは、「rag-chat-query」チャネルにメッセージを送信し、クエリごとに 30 秒のタイムアウトで応答を待ちます。メイン プログラムとして実行すると、マトリックスの監督とキアヌ リーブスとの関係に関する 2 つの関連する質問を送信し、受信した各応答を出力することでクライアントの機能を示します。
すべてのコード例は、元の GitHub リポジトリの私のフォークにあります。
KubeMQ を GenAI アプリケーションの RAG パイプラインに統合すると、連続データ ストリームと複雑なプロセス間通信を処理するためのスケーラブルで信頼性が高く、効率的なメカニズムが提供されます。 KubeMQ は、汎用性の高いメッセージング パターンを備えた統合ルーターとして機能することで、アーキテクチャ全体を簡素化し、カスタム ルーティング ソリューションの必要性を減らし、開発サイクルを加速します。
さらに、FalkorDB を組み込むことで、KubeMQ とシームレスに統合された高性能のナレッジ ベースが提供され、データ管理が強化されます。この組み合わせにより、最適化されたデータの取得と保存が保証され、RAG プロセスの動的な要件がサポートされます。
高スループットのシナリオを処理する機能と、永続性やフォールト トレランスなどの機能を組み合わせることで、高負荷下やシステム中断に直面した場合でも、GenAI アプリケーションの応答性と信頼性が確保されます。
KubeMQ と FalkorDB を活用することで、組織はデータ ルーティング インフラストラクチャが堅牢で最新の AI ワークフローの要求を満たすことができると確信して、AI モデルの強化と貴重な洞察とサービスの提供に集中できます。
以上がKubeMQ による GenAI アプリケーションの強化: 検索拡張生成 (RAG) の効率的なスケーリングの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。