隨著生成式人工智慧(GenAI) 在各行業的應用激增,組織越來越多地利用檢索增強生成(RAG) 技術透過即時、上下文豐富的內容來支援其人工智慧模型數據。管理此類應用程式中複雜的資訊流帶來了重大挑戰,特別是在處理大規模連續生成的資料時。 KubeMQ 是一個強大的訊息代理,作為簡化多個 RAG 進程的路由的解決方案而出現,確保 GenAI 應用程式中的高效資料處理。
為了進一步提高 RAG 工作流程的效率和可擴展性,整合 FalkorDB 這樣的高效能資料庫至關重要。 FalkorDB 為 RAG 系統所依賴的動態知識庫提供可靠且可擴展的儲存解決方案,確保快速資料檢索以及與 KubeMQ 等訊息傳遞系統的無縫整合。
RAG 是一種透過整合檢索機制來增強生成式 AI 模型的範例,可讓模型在推理過程中存取外部知識庫。這種方法透過將產生的回應基於可用的最新相關信息,顯著提高了產生回應的準確性、相關性和及時性。
在使用 RAG 的典型 GenAI 工作流程中,該過程涉及多個步驟:
查詢處理:解釋使用者的輸入以了解意圖和上下文
檢索:從動態知識庫(例如 FalkorDB)中取得相關文件或數據,確保快速且有效率地存取最新且相關的資訊。
產生:使用輸入和檢索到的資料產生回應
回應交付:提供使用者最終的、豐富的輸出
擴展這些步驟,尤其是在資料不斷產生和更新的環境中,需要一種高效可靠的機制來在 RAG 管道的各個組件之間傳輸資料。
在物聯網網路、社群媒體平台或即時分析系統等場景中,不斷產生新數據,人工智慧模型必須迅速適應以合併這些資訊。傳統的請求-回應架構在高吞吐量條件下可能成為瓶頸,導致延遲問題和效能下降。
KubeMQ 透過提供可擴展且強大的基礎設施來管理高吞吐量訊息傳遞場景,以實現服務之間的高效資料路由。透過將 KubeMQ 整合到 RAG 管道中,每個新資料點都會發佈到訊息佇列或串流中,確保檢索元件可以立即存取最新訊息,而不會壓垮系統。這種即時數據處理能力對於維持 GenAI 輸出的相關性和準確性至關重要。
KubeMQ 提供各種訊息傳遞模式 - 包括佇列、串流、發布-訂閱 (pub/sub) 和遠端過程呼叫 (RPC) - 使其成為 RAG 管道中多功能且功能強大的路由器。其低延遲和高效能特性可確保及時的訊息傳遞,這對於即時 GenAI 應用程式至關重要,因為延遲會嚴重影響使用者體驗和系統效率。
此外,KubeMQ 處理複雜路由邏輯的能力允許複雜的資料分發策略。這確保了人工智慧系統的不同組件在需要時準確接收所需的數據,而不會出現不必要的重複或延遲。
雖然 KubeMQ 在服務之間有效地路由訊息,FalkorDB 透過提供可擴展且高效能的圖形資料庫解決方案來儲存和檢索 RAG 流程所需的大量資料來補充這一點。這種整合確保當新資料流經 KubeMQ 時,它會無縫儲存在 FalkorDB 中,使其可隨時用於檢索操作,而不會引入延遲或瓶頸。
隨著 GenAI 應用程式的用戶群和資料量不斷增長,可擴展性成為最重要的問題。 KubeMQ 具有可擴展性,支援水平擴展以無縫適應增加的負載。它確保隨著 RAG 進程數量的增加或資料產生的加速,訊息傳遞基礎設施保持穩健和回應能力。
此外,KubeMQ 還提供訊息持久化和容錯能力。當發生系統故障或網路中斷時,KubeMQ 可確保訊息不會遺失且系統可以正常復原。這種可靠性對於維護人工智慧應用程式的完整性至關重要,用戶依賴這些應用程式來獲取及時、準確的資訊。
在 RAG 管道中實作用於資料處理的自訂路由服務可能會佔用大量資源且複雜。通常需要大量的開發工作來建立、維護和擴展這些服務,從而分散了核心人工智慧應用程式開發的注意力。
透過採用 KubeMQ,組織無需建立客製化路由解決方案。 KubeMQ 提供開箱即用的功能,可滿足 RAG 程序的路由需求,包括複雜的路由模式、訊息過濾和優先權處理。這不僅減少了開發和維護開銷,還加快了 GenAI 解決方案的上市時間。
KubeMQ 提供了多個與其訊息代理功能互動的介面:
REST API:支援與語言無關的集成,允許以任何程式語言編寫的服務透過 HTTP 發送和接收訊息
SDK:為各種程式語言(例如Python、Java、Go 和.NET)提供用戶端程式庫,透過本機整合促進更有效率的通訊模式和更好的效能
這種靈活性允許開發人員為其特定用例選擇最合適的方法,從而簡化架構並加快開發週期。資料路由的單一接觸點簡化了 RAG 管道不同組件之間的通信,從而增強了整體系統的一致性。
程式碼範例展示如何透過將 KubeMQ 整合到 RAG 管道來建立電影資訊檢索系統。它設定了一個伺服器,從爛番茄中提取電影 URL,以使用 GPT-4 建立知識圖譜。用戶可以透過聊天用戶端與該系統交互,發送與電影相關的查詢並接收人工智慧產生的回應。此用例示範如何在實際應用程式中處理連續資料攝取和即時查詢處理,利用 KubeMQ 在影片上下文中進行高效的訊息處理和服務間通訊。
資料攝取服務:擷取新資料並發佈到可用的 KubeMQ 串流
檢索服務:訂閱KubeMQ流以接收更新並刷新知識庫
產生服務:監聽查詢請求,與AI模型交互,並產生回應
回應服務:將產生的回應透過適當的管道傳回使用者
確保 KubeMQ 可以運行,這可以透過使用 Docker 部署來實現:
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
此指令啟動 KubeMQ,並為 REST 和 gRPC 通訊公開必要的連接埠。
此程式碼(GitHub 儲存庫)實作了一個 RAG 伺服器,該伺服器處理聊天查詢並使用 KubeMQ 進行訊息處理來管理知識來源。
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
伺服器執行兩個主執行緒:一個透過名為「rag-chat-query」的通道訂閱聊天查詢,並使用GPT-4 的知識圖來處理它們,另一個從名為「rag」的佇列中持續拉取-sources-queue」將新來源加入知識圖譜。知識圖譜使用從JSON 檔案載入的自訂本體進行初始化,並使用OpenAI 的GPT-4模型進行處理。
發送來源資料以攝取到 RAG 知識圖譜中# server.py import json import threading from typing import List from dotenv import load_dotenv load_dotenv() import time from kubemq.common import CancellationToken from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription from kubemq.queues import Client as QueuesClient from graphrag_sdk.models.openai import OpenAiGenerativeModel from graphrag_sdk.model_config import KnowledgeGraphModelConfig from graphrag_sdk import KnowledgeGraph, Ontology from graphrag_sdk.source import URL class RAGServer: def __init__(self): self.cq_client = CQClient(address="localhost:50000") self.queues_client = QueuesClient(address="localhost:50000") model = OpenAiGenerativeModel(model_name="gpt-4o") with open("ontology.json", "r") as f: ontology = json.load(f) ontology = Ontology.from_json(ontology) self.kg = KnowledgeGraph( name="movies", model_config=KnowledgeGraphModelConfig.with_model(model), ontology=ontology) self.chat = self.kg.chat_session() self.shutdown_event = threading.Event() self.threads: List[threading.Thread] = [] def handle_chat(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') print(f"Received chat message: {message}") result= self.chat.send_message(message) answer = result.get("response","No answer") print(f"Chat response: {answer}") response = QueryResponseMessage( query_received=request, is_executed=True, body=answer.encode('utf-8') ) self.cq_client.send_response_message(response) except Exception as e: print(f"Error processing chat message: {str(e)}") self.cq_client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def pull_from_queue(self): while not self.shutdown_event.is_set(): try: result = self.queues_client.pull("rag-sources-queue", 10, 1) if result.is_error: print(f"Error pulling message from queue: {result.error}") continue sources = [] for message in result.messages: source = message.body.decode('utf-8') print(f"Received source: {source}, adding to knowledge graph") sources.append(URL(message.body.decode('utf-8'))) if sources: self.kg.process_sources(sources) except Exception as e: if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error processing sources: {str(e)}") def subscribe_to_chat_queries(self): def on_error(err: str): if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error: {err}") cancellation_token = CancellationToken() try: self.cq_client.subscribe_to_queries( subscription=QueriesSubscription( channel="rag-chat-query", on_receive_query_callback=self.handle_chat, on_error_callback=on_error, ), cancel=cancellation_token ) # Wait for shutdown signal while not self.shutdown_event.is_set(): time.sleep(0.1) # Cancel subscription when shutdown is requested cancellation_token.cancel() except Exception as e: if not self.shutdown_event.is_set(): print(f"Error in subscription thread: {str(e)}") def run(self): chat_thread = threading.Thread(target=self.subscribe_to_chat_queries) queue_thread = threading.Thread(target=self.pull_from_queue) self.threads.extend([chat_thread, queue_thread]) for thread in self.threads: thread.daemon = True # Make threads daemon so they exit when main thread exits thread.start() print("RAG server started") try: while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down gracefully...") self.shutdown() self.cq_client.close() self.queues_client.close() def shutdown(self): print("Initiating shutdown sequence...") self.shutdown_event.set() # Signal all threads to stop for thread in self.threads: thread.join(timeout=5.0) # Wait up to 5 seconds for each thread if thread.is_alive(): print(f"Warning: Thread {thread.name} did not shutdown cleanly") print("Shutdown complete") if __name__ == "__main__": rag_server = RAGServer() rag_server.run()此程式碼實作了一個簡單的用戶端,透過 KubeMQ 的佇列系統將影片 URL 傳送到 RAG 伺服器。具體來說,它會建立一個連接到 KubeMQ 的 SourceClient 類,並將訊息傳送到「rag-sources-queue」通道,該通道與 RAG 伺服器監控的佇列相同。當作為主程式運行時,它會發送一個爛番茄電影 URL 清單(包括《駭客任務》電影、《疾速追殺》和《生死時速》),由 RAG 伺服器處理並添加到知識圖譜中。
發送和接收問題和解答
# sources_client.py from kubemq.queues import * class SourceClient: def __init__(self, address="localhost:50000"): self.client = Client(address=address) def send_source(self, message: str) : send_result = self.client.send_queues_message( QueueMessage( channel="rag-sources-queue", body=message.encode("utf-8"), ) ) if send_result.is_error: print(f"message send error, error:{send_result.error}") if __name__ == "__main__": client = SourceClient() urls = ["https://www.rottentomatoes.com/m/side_by_side_2012", "https://www.rottentomatoes.com/m/matrix", "https://www.rottentomatoes.com/m/matrix_revolutions", "https://www.rottentomatoes.com/m/matrix_reloaded", "https://www.rottentomatoes.com/m/speed_1994", "https://www.rottentomatoes.com/m/john_wick_chapter_4"] for url in urls: client.send_source(url) print("done")此程式碼實作了一個聊天用戶端,透過 KubeMQ 的查詢系統與 RAG 伺服器進行通訊。 ChatClient 類別將訊息傳送到「rag-chat-query」通道並等待回應,每個查詢都有 30 秒的逾時時間。當作為主程式運行時,它會透過發送兩個有關《駭客任務》導演及其與基努·裡維斯的聯繫的相關問題來演示客戶端的功能,並在收到問題時列印每個回應。
程式碼庫
結論
此外,合併 FalkorDB 透過提供與 KubeMQ 無縫整合的高效能知識庫來增強資料管理。這種組合可確保優化資料檢索和存儲,支援 RAG 流程的動態要求。
處理高吞吐量場景的能力,與持久性和容錯等功能相結合,確保 GenAI 應用程式即使在重負載或面臨系統中斷的情況下也能保持響應能力和可靠性。
透過利用 KubeMQ 和 FalkorDB,組織可以專注於增強其 AI 模型並提供有價值的見解和服務,並確信其資料路由基礎設施強大且能夠滿足現代 AI 工作流程的需求。
以上是使用 KubeMQ 增強 GenAI 應用程式:有效擴展檢索增強生成 (RAG)的詳細內容。更多資訊請關注PHP中文網其他相關文章!