生成 AI (GenAI) の採用が業界全体で急増する中、組織は検索拡張生成 (RAG) 技術をますます活用して、リアルタイムでコンテキスト豊富な AI モデルを強化しています。データ。このようなアプリケーションで情報の複雑なフローを管理することは、特に継続的に生成される大規模なデータを扱う場合に、重大な課題を引き起こします。 堅牢なメッセージ ブローカーである KubeMQ は、複数の RAG プロセスのルーティングを合理化し、GenAI アプリケーションでの効率的なデータ処理を保証するソリューションとして登場しました。
RAG ワークフローの効率とスケーラビリティをさらに強化するには、FalkorDB のような高性能データベースを統合することが不可欠です。 FalkorDB は、RAG システムが依存する動的なナレッジ ベースに信頼性が高くスケーラブルなストレージ ソリューションを提供し、迅速なデータ取得と KubeMQ のようなメッセージング システムとのシームレスな統合を保証します。
GenAI ワークフローにおける RAG を理解する
RAG は、検索メカニズムを統合することで生成 AI モデルを強化し、推論中にモデルが外部の知識ベースにアクセスできるようにするパラダイムです。このアプローチでは、利用可能な最新の関連情報に基づいて生成された応答の精度、関連性、適時性が大幅に向上します。
RAG を使用する一般的な GenAI ワークフローでは、プロセスには複数のステップが含まれます。
クエリ処理: ユーザーの入力を解釈して意図とコンテキストを理解します
取得: FalkorDB などの動的なナレッジ ベースから関連するドキュメントやデータを取得します。これにより、最新の関連情報に迅速かつ効率的にアクセスできます。
生成: 入力データと取得したデータの両方を使用して応答を生成します
応答配信: 最終的な充実した出力をユーザーに提供します
特にデータが継続的に生成および更新される環境でこれらのステップを拡張するには、RAG パイプラインのさまざまなコンポーネント間のデータ フローのための効率的で信頼性の高いメカニズムが必要です。
RAG 処理における KubeMQ の重要な役割
大規模な連続データ ストリームの処理
IoT ネットワーク、ソーシャル メディア プラットフォーム、リアルタイム分析システムなどのシナリオでは、新しいデータが絶えず生成され、AI モデルはこの情報を組み込むために迅速に適応する必要があります。従来の要求/応答アーキテクチャは、高スループット条件下ではボトルネックとなり、レイテンシーの問題やパフォーマンスの低下につながる可能性があります。
KubeMQ は、サービス間の効率的なデータ ルーティングのためのスケーラブルで堅牢なインフラストラクチャを提供することで、高スループットのメッセージング シナリオを管理します。 KubeMQ を RAG パイプラインに統合することにより、新しいデータ ポイントがそれぞれメッセージ キューまたはストリームにパブリッシュされ、システムに負荷をかけることなく、取得コンポーネントが最新の情報に即時にアクセスできるようになります。このリアルタイム データ処理機能は、GenAI 出力の関連性と正確性を維持するために非常に重要です。
最適なルーターとして機能します
KubeMQ は、キュー、ストリーム、パブリッシュ/サブスクライブ (pub/sub)、リモート プロシージャ コール (RPC) などのさまざまなメッセージング パターンを提供し、RAG パイプライン内で多用途かつ強力なルーターとなります。その低レイテンシーと高性能特性により、迅速なメッセージ配信が保証されます。これは、遅延がユーザー エクスペリエンスやシステム効率に大きな影響を与える可能性があるリアルタイム GenAI アプリケーションにとって不可欠です。
さらに、複雑なルーティング ロジックを処理できる KubeMQ の機能により、高度なデータ分散戦略が可能になります。これにより、AI システムのさまざまなコンポーネントが、不必要な重複や遅延を発生させることなく、必要なデータを必要なときに正確に受信できるようになります。
FalkorDB を統合してデータ管理を強化
KubeMQ はサービス間でメッセージを効率的にルーティングしますが、FalkorDB は、RAG プロセスに必要な膨大な量のデータを保存および取得するためのスケーラブルで高性能なグラフ データベース ソリューションを提供することでこれを補完します。この統合により、新しいデータが KubeMQ を介して流れると、シームレスに FalkorDB に保存され、レイテンシーやボトルネックを発生させることなく、すぐに取得操作に利用できるようになります。
スケーラビリティと信頼性の強化
GenAI アプリケーションのユーザー ベースとデータ量の両方が増加するにつれて、スケーラビリティが最大の懸念事項になります。 KubeMQ はスケーラブルであり、負荷の増加にシームレスに対応するための水平スケーリングをサポートしています。これにより、RAG プロセスの数が増加したり、データ生成が加速したりしても、メッセージング インフラストラクチャの堅牢性と応答性が維持されます。
さらに、KubeMQ はメッセージの永続性とフォールト トレランスを提供します。システム障害やネットワーク中断が発生した場合、KubeMQ はメッセージが失われず、システムが正常に回復できることを保証します。この信頼性は、ユーザーがタイムリーで正確な情報を得るために依存する AI アプリケーションの整合性を維持する上で非常に重要です。
専用のルーティング サービスの必要性を排除
RAG パイプラインでのデータ処理のためのカスタム ルーティング サービスの実装は、リソースを大量に消費し、複雑になる可能性があります。多くの場合、これらのサービスを構築、維持、拡張するには多大な開発労力が必要となり、コアの AI アプリケーション開発から焦点がそらされてしまいます。
KubeMQ を採用することで、組織はオーダーメイドのルーティング ソリューションを作成する必要がなくなります。 KubeMQ は、複雑なルーティング パターン、メッセージ フィルタリング、優先順位の処理など、RAG プロセスのルーティング ニーズに対処するすぐに使える機能を提供します。これにより、開発とメンテナンスのオーバーヘッドが削減されるだけでなく、GenAI ソリューションの市場投入までの時間が短縮されます。
REST および SDK を介した統合アクセス
KubeMQ は、メッセージ ブローカー機能と対話するための複数のインターフェイスを提供します。
REST API: 言語に依存しない統合を可能にし、任意のプログラミング言語で記述されたサービスが HTTP 経由でメッセージを送受信できるようにします
SDK: さまざまなプログラミング言語 (Python、Java、Go、.NET など) のクライアント ライブラリを提供し、ネイティブ統合を通じてより効率的な通信パターンとパフォーマンスの向上を促進します
この柔軟性により、開発者は特定のユースケースに最適な方法を選択できるため、アーキテクチャが簡素化され、開発サイクルが加速されます。データ ルーティングの単一タッチポイントにより、RAG パイプラインのさまざまなコンポーネント間の通信が合理化され、システム全体の一貫性が強化されます。
RAG パイプラインでの KubeMQ の実装: 詳細な例
このコード例は、KubeMQ を RAG パイプラインに統合して映画情報検索システムを構築する方法を示しています。 GPT-4 を使用してナレッジ グラフを構築するために、Rotten Tomatoes から映画 URL を取り込むサーバーをセットアップします。ユーザーはチャット クライアントを通じてこのシステムと対話し、映画関連のクエリを送信し、AI が生成した応答を受信できます。このユースケースでは、映画のコンテキスト内で効率的なメッセージ処理とサービス間通信に KubeMQ を利用して、実際のアプリケーションで継続的なデータの取り込みとリアルタイムのクエリ処理を処理する方法を示します。
アーキテクチャの概要
データ取り込みサービス: 新しいデータが利用可能になったときにキャプチャして KubeMQ ストリームに公開します
取得サービス: KubeMQ ストリームをサブスクライブして更新を受信し、ナレッジ ベースを更新します
生成サービス: クエリリクエストをリッスンし、AI モデルと対話し、応答を生成します
レスポンス サービス: 生成されたレスポンスを適切なチャネルを通じてユーザーに送り返します
KubeMQ のセットアップ
KubeMQ が動作していることを確認します。これは、Docker を使用して KubeMQ をデプロイすることで実現できます。
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
このコマンドは、REST および gRPC 通信に必要なポートを公開して KubeMQ を開始します。
RAGサーバー側
このコード (GitHub リポジトリ) は、チャット クエリを処理し、メッセージ処理に KubeMQ を使用してナレッジ ソースを管理する RAG サーバーを実装します。
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
サーバーは 2 つのメイン スレッドを実行します。1 つは「rag-chat-query」と呼ばれるチャネルを通じてチャット クエリをサブスクライブし、GPT-4 のナレッジ グラフを使用して処理します。もう 1 つは「rag-chat-query」と呼ばれるキューから継続的にプルします。 -sources-queue」を使用して、新しいソースをナレッジ グラフに追加します。ナレッジ グラフは、JSON ファイルからロードされたカスタム オントロジーで初期化され、処理に OpenAI の GPT-4 モデルを使用します。サーバーは正常なシャットダウン処理とエラー管理を実装し、サーバーの停止時にすべてのスレッドが適切に終了するようにします。
RAG ナレッジ グラフに取り込むためのソース データの送信
# server.py import json import threading from typing import List from dotenv import load_dotenv load_dotenv() import time from kubemq.common import CancellationToken from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription from kubemq.queues import Client as QueuesClient from graphrag_sdk.models.openai import OpenAiGenerativeModel from graphrag_sdk.model_config import KnowledgeGraphModelConfig from graphrag_sdk import KnowledgeGraph, Ontology from graphrag_sdk.source import URL class RAGServer: def __init__(self): self.cq_client = CQClient(address="localhost:50000") self.queues_client = QueuesClient(address="localhost:50000") model = OpenAiGenerativeModel(model_name="gpt-4o") with open("ontology.json", "r") as f: ontology = json.load(f) ontology = Ontology.from_json(ontology) self.kg = KnowledgeGraph( name="movies", model_config=KnowledgeGraphModelConfig.with_model(model), ontology=ontology) self.chat = self.kg.chat_session() self.shutdown_event = threading.Event() self.threads: List[threading.Thread] = [] def handle_chat(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') print(f"Received chat message: {message}") result= self.chat.send_message(message) answer = result.get("response","No answer") print(f"Chat response: {answer}") response = QueryResponseMessage( query_received=request, is_executed=True, body=answer.encode('utf-8') ) self.cq_client.send_response_message(response) except Exception as e: print(f"Error processing chat message: {str(e)}") self.cq_client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def pull_from_queue(self): while not self.shutdown_event.is_set(): try: result = self.queues_client.pull("rag-sources-queue", 10, 1) if result.is_error: print(f"Error pulling message from queue: {result.error}") continue sources = [] for message in result.messages: source = message.body.decode('utf-8') print(f"Received source: {source}, adding to knowledge graph") sources.append(URL(message.body.decode('utf-8'))) if sources: self.kg.process_sources(sources) except Exception as e: if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error processing sources: {str(e)}") def subscribe_to_chat_queries(self): def on_error(err: str): if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error: {err}") cancellation_token = CancellationToken() try: self.cq_client.subscribe_to_queries( subscription=QueriesSubscription( channel="rag-chat-query", on_receive_query_callback=self.handle_chat, on_error_callback=on_error, ), cancel=cancellation_token ) # Wait for shutdown signal while not self.shutdown_event.is_set(): time.sleep(0.1) # Cancel subscription when shutdown is requested cancellation_token.cancel() except Exception as e: if not self.shutdown_event.is_set(): print(f"Error in subscription thread: {str(e)}") def run(self): chat_thread = threading.Thread(target=self.subscribe_to_chat_queries) queue_thread = threading.Thread(target=self.pull_from_queue) self.threads.extend([chat_thread, queue_thread]) for thread in self.threads: thread.daemon = True # Make threads daemon so they exit when main thread exits thread.start() print("RAG server started") try: while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down gracefully...") self.shutdown() self.cq_client.close() self.queues_client.close() def shutdown(self): print("Initiating shutdown sequence...") self.shutdown_event.set() # Signal all threads to stop for thread in self.threads: thread.join(timeout=5.0) # Wait up to 5 seconds for each thread if thread.is_alive(): print(f"Warning: Thread {thread.name} did not shutdown cleanly") print("Shutdown complete") if __name__ == "__main__": rag_server = RAGServer() rag_server.run()
このコードは、KubeMQ のキュー システムを通じてムービー URL を RAG サーバーに送信する単純なクライアントを実装します。具体的には、KubeMQ に接続する SourceClient クラスを作成し、RAG サーバーが監視するのと同じキューである「rag-sources-queue」チャネルにメッセージを送信します。メイン プログラムとして実行すると、Rotten Tomatoes の映画 URL (マトリックス映画、ジョン ウィック、スピードを含む) のリストが送信され、RAG サーバーによって処理され、ナレッジ グラフに追加されます。
質問と回答の送受信
# sources_client.py from kubemq.queues import * class SourceClient: def __init__(self, address="localhost:50000"): self.client = Client(address=address) def send_source(self, message: str) : send_result = self.client.send_queues_message( QueueMessage( channel="rag-sources-queue", body=message.encode("utf-8"), ) ) if send_result.is_error: print(f"message send error, error:{send_result.error}") if __name__ == "__main__": client = SourceClient() urls = ["https://www.rottentomatoes.com/m/side_by_side_2012", "https://www.rottentomatoes.com/m/matrix", "https://www.rottentomatoes.com/m/matrix_revolutions", "https://www.rottentomatoes.com/m/matrix_reloaded", "https://www.rottentomatoes.com/m/speed_1994", "https://www.rottentomatoes.com/m/john_wick_chapter_4"] for url in urls: client.send_source(url) print("done")
このコードは、KubeMQ のクエリ システムを通じて RAG サーバーと通信するチャット クライアントを実装します。 ChatClient クラスは、「rag-chat-query」チャネルにメッセージを送信し、クエリごとに 30 秒のタイムアウトで応答を待ちます。メイン プログラムとして実行すると、マトリックスの監督とキアヌ リーブスとの関係に関する 2 つの関連する質問を送信し、受信した各応答を出力することでクライアントの機能を示します。
コードリポジトリ
すべてのコード例は、元の GitHub リポジトリの私のフォークにあります。
結論
KubeMQ を GenAI アプリケーションの RAG パイプラインに統合すると、連続データ ストリームと複雑なプロセス間通信を処理するためのスケーラブルで信頼性が高く、効率的なメカニズムが提供されます。 KubeMQ は、汎用性の高いメッセージング パターンを備えた統合ルーターとして機能することで、アーキテクチャ全体を簡素化し、カスタム ルーティング ソリューションの必要性を減らし、開発サイクルを加速します。
さらに、FalkorDB を組み込むことで、KubeMQ とシームレスに統合された高性能のナレッジ ベースが提供され、データ管理が強化されます。この組み合わせにより、最適化されたデータの取得と保存が保証され、RAG プロセスの動的な要件がサポートされます。
高スループットのシナリオを処理する機能と、永続性やフォールト トレランスなどの機能を組み合わせることで、高負荷下やシステム中断に直面した場合でも、GenAI アプリケーションの応答性と信頼性が確保されます。
KubeMQ と FalkorDB を活用することで、組織はデータ ルーティング インフラストラクチャが堅牢で最新の AI ワークフローの要求を満たすことができると確信して、AI モデルの強化と貴重な洞察とサービスの提供に集中できます。
以上がKubeMQ による GenAI アプリケーションの強化: 検索拡張生成 (RAG) の効率的なスケーリングの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

この記事では、Pythonライブラリである美しいスープを使用してHTMLを解析する方法について説明します。 find()、find_all()、select()、およびget_text()などの一般的な方法は、データ抽出、多様なHTML構造とエラーの処理、および代替案(SEL

LinuxターミナルでPythonバージョンを表示する際の許可の問題の解決策PythonターミナルでPythonバージョンを表示しようとするとき、Pythonを入力してください...

Pythonオブジェクトのシリアル化と脱介入は、非自明のプログラムの重要な側面です。 Pythonファイルに何かを保存すると、構成ファイルを読み取る場合、またはHTTPリクエストに応答する場合、オブジェクトシリアル化と脱滑り化を行います。 ある意味では、シリアル化と脱派化は、世界で最も退屈なものです。これらすべての形式とプロトコルを気にするのは誰ですか? Pythonオブジェクトを維持またはストリーミングし、後で完全に取得したいと考えています。 これは、概念レベルで世界を見るのに最適な方法です。ただし、実用的なレベルでは、選択したシリアル化スキーム、形式、またはプロトコルは、プログラムの速度、セキュリティ、メンテナンスの自由、およびその他の側面を決定する場合があります。

Pythonの統計モジュールは、強力なデータ統計分析機能を提供して、生物統計やビジネス分析などのデータの全体的な特性を迅速に理解できるようにします。データポイントを1つずつ見る代わりに、平均や分散などの統計を見て、無視される可能性のある元のデータの傾向と機能を発見し、大きなデータセットをより簡単かつ効果的に比較してください。 このチュートリアルでは、平均を計算し、データセットの分散の程度を測定する方法を説明します。特に明記しない限り、このモジュールのすべての関数は、単に平均を合計するのではなく、平均()関数の計算をサポートします。 浮動小数点数も使用できます。 ランダムをインポートします インポート統計 fractiから

この記事では、深い学習のためにTensorflowとPytorchを比較しています。 関連する手順、データの準備、モデルの構築、トレーニング、評価、展開について詳しく説明しています。 特に計算グラップに関して、フレームワーク間の重要な違い

このチュートリアルは、単純なツリーナビゲーションを超えたDOM操作に焦点を当てた、美しいスープの以前の紹介に基づいています。 HTML構造を変更するための効率的な検索方法と技術を探ります。 1つの一般的なDOM検索方法はExです

この記事では、numpy、pandas、matplotlib、scikit-learn、tensorflow、django、flask、and requestsなどの人気のあるPythonライブラリについて説明し、科学的コンピューティング、データ分析、視覚化、機械学習、Web開発、Hの使用について説明します。

この記事では、コマンドラインインターフェイス(CLI)の構築に関するPython開発者をガイドします。 Typer、Click、Argparseなどのライブラリを使用して、入力/出力の処理を強調し、CLIの使いやすさを改善するためのユーザーフレンドリーな設計パターンを促進することを詳述しています。


ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

EditPlus 中国語クラック版
サイズが小さく、構文の強調表示、コード プロンプト機能はサポートされていません

VSCode Windows 64 ビットのダウンロード
Microsoft によって発売された無料で強力な IDE エディター

Dreamweaver Mac版
ビジュアル Web 開発ツール

MinGW - Minimalist GNU for Windows
このプロジェクトは osdn.net/projects/mingw に移行中です。引き続きそこでフォローしていただけます。 MinGW: GNU Compiler Collection (GCC) のネイティブ Windows ポートであり、ネイティブ Windows アプリケーションを構築するための自由に配布可能なインポート ライブラリとヘッダー ファイルであり、C99 機能をサポートする MSVC ランタイムの拡張機能が含まれています。すべての MinGW ソフトウェアは 64 ビット Windows プラットフォームで実行できます。

PhpStorm Mac バージョン
最新(2018.2.1)のプロフェッショナル向けPHP統合開発ツール

ホットトピック



