隨著生成式人工智慧(GenAI) 在各行業的應用激增,組織越來越多地利用檢索增強生成(RAG) 技術透過即時、上下文豐富的內容來支援其人工智慧模型數據。管理此類應用程式中複雜的資訊流帶來了重大挑戰,特別是在處理大規模連續生成的資料時。 KubeMQ 是一個強大的訊息代理,作為簡化多個 RAG 進程的路由的解決方案而出現,確保 GenAI 應用程式中的高效資料處理。
為了進一步提高 RAG 工作流程的效率和可擴展性,整合 FalkorDB 這樣的高效能資料庫至關重要。 FalkorDB 為 RAG 系統所依賴的動態知識庫提供可靠且可擴展的儲存解決方案,確保快速資料檢索以及與 KubeMQ 等訊息傳遞系統的無縫整合。
了解 GenAI 工作流程中的 RAG
RAG 是一種透過整合檢索機制來增強生成式 AI 模型的範例,可讓模型在推理過程中存取外部知識庫。這種方法透過將產生的回應基於可用的最新相關信息,顯著提高了產生回應的準確性、相關性和及時性。
在使用 RAG 的典型 GenAI 工作流程中,該過程涉及多個步驟:
查詢處理:解釋使用者的輸入以了解意圖和上下文
檢索:從動態知識庫(例如 FalkorDB)中取得相關文件或數據,確保快速且有效率地存取最新且相關的資訊。
產生:使用輸入和檢索到的資料產生回應
回應交付:提供使用者最終的、豐富的輸出
擴展這些步驟,尤其是在資料不斷產生和更新的環境中,需要一種高效可靠的機制來在 RAG 管道的各個組件之間傳輸資料。
KubeMQ 在 RAG 處理中的關鍵作用
大規模處理連續資料流
在物聯網網路、社群媒體平台或即時分析系統等場景中,不斷產生新數據,人工智慧模型必須迅速適應以合併這些資訊。傳統的請求-回應架構在高吞吐量條件下可能成為瓶頸,導致延遲問題和效能下降。
KubeMQ 透過提供可擴展且強大的基礎設施來管理高吞吐量訊息傳遞場景,以實現服務之間的高效資料路由。透過將 KubeMQ 整合到 RAG 管道中,每個新資料點都會發佈到訊息佇列或串流中,確保檢索元件可以立即存取最新訊息,而不會壓垮系統。這種即時數據處理能力對於維持 GenAI 輸出的相關性和準確性至關重要。
作為最佳路由器
KubeMQ 提供各種訊息傳遞模式 - 包括佇列、串流、發布-訂閱 (pub/sub) 和遠端過程呼叫 (RPC) - 使其成為 RAG 管道中多功能且功能強大的路由器。其低延遲和高效能特性可確保及時的訊息傳遞,這對於即時 GenAI 應用程式至關重要,因為延遲會嚴重影響使用者體驗和系統效率。
此外,KubeMQ 處理複雜路由邏輯的能力允許複雜的資料分發策略。這確保了人工智慧系統的不同組件在需要時準確接收所需的數據,而不會出現不必要的重複或延遲。
整合 FalkorDB 以增強資料管理
雖然 KubeMQ 在服務之間有效地路由訊息,FalkorDB 透過提供可擴展且高效能的圖形資料庫解決方案來儲存和檢索 RAG 流程所需的大量資料來補充這一點。這種整合確保當新資料流經 KubeMQ 時,它會無縫儲存在 FalkorDB 中,使其可隨時用於檢索操作,而不會引入延遲或瓶頸。
增強可擴展性和可靠性
隨著 GenAI 應用程式的用戶群和資料量不斷增長,可擴展性成為最重要的問題。 KubeMQ 具有可擴展性,支援水平擴展以無縫適應增加的負載。它確保隨著 RAG 進程數量的增加或資料產生的加速,訊息傳遞基礎設施保持穩健和回應能力。
此外,KubeMQ 還提供訊息持久化和容錯能力。當發生系統故障或網路中斷時,KubeMQ 可確保訊息不會遺失且系統可以正常復原。這種可靠性對於維護人工智慧應用程式的完整性至關重要,用戶依賴這些應用程式來獲取及時、準確的資訊。
消除專用路由服務的需求
在 RAG 管道中實作用於資料處理的自訂路由服務可能會佔用大量資源且複雜。通常需要大量的開發工作來建立、維護和擴展這些服務,從而分散了核心人工智慧應用程式開發的注意力。
透過採用 KubeMQ,組織無需建立客製化路由解決方案。 KubeMQ 提供開箱即用的功能,可滿足 RAG 程序的路由需求,包括複雜的路由模式、訊息過濾和優先權處理。這不僅減少了開發和維護開銷,還加快了 GenAI 解決方案的上市時間。
透過REST和SDK統一訪問
KubeMQ 提供了多個與其訊息代理功能互動的介面:
REST API:支援與語言無關的集成,允許以任何程式語言編寫的服務透過 HTTP 發送和接收訊息
SDK:為各種程式語言(例如Python、Java、Go 和.NET)提供用戶端程式庫,透過本機整合促進更有效率的通訊模式和更好的效能
這種靈活性允許開發人員為其特定用例選擇最合適的方法,從而簡化架構並加快開發週期。資料路由的單一接觸點簡化了 RAG 管道不同組件之間的通信,從而增強了整體系統的一致性。
在 RAG 管道中實現 KubeMQ:詳細範例
程式碼範例展示如何透過將 KubeMQ 整合到 RAG 管道來建立電影資訊檢索系統。它設定了一個伺服器,從爛番茄中提取電影 URL,以使用 GPT-4 建立知識圖譜。用戶可以透過聊天用戶端與該系統交互,發送與電影相關的查詢並接收人工智慧產生的回應。此用例示範如何在實際應用程式中處理連續資料攝取和即時查詢處理,利用 KubeMQ 在影片上下文中進行高效的訊息處理和服務間通訊。
架構概述
資料攝取服務:擷取新資料並發佈到可用的 KubeMQ 串流
檢索服務:訂閱KubeMQ流以接收更新並刷新知識庫
產生服務:監聽查詢請求,與AI模型交互,並產生回應
回應服務:將產生的回應透過適當的管道傳回使用者
設定 KubeMQ
確保 KubeMQ 可以運行,這可以透過使用 Docker 部署來實現:
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
此指令啟動 KubeMQ,並為 REST 和 gRPC 通訊公開必要的連接埠。
RAG伺服器端
此程式碼(GitHub 儲存庫)實作了一個 RAG 伺服器,該伺服器處理聊天查詢並使用 KubeMQ 進行訊息處理來管理知識來源。
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
伺服器執行兩個主執行緒:一個透過名為「rag-chat-query」的通道訂閱聊天查詢,並使用GPT-4 的知識圖來處理它們,另一個從名為「rag」的佇列中持續拉取-sources-queue」將新來源加入知識圖譜。知識圖譜使用從JSON 檔案載入的自訂本體進行初始化,並使用OpenAI 的GPT-4模型進行處理。
發送來源資料以攝取到 RAG 知識圖譜中# server.py import json import threading from typing import List from dotenv import load_dotenv load_dotenv() import time from kubemq.common import CancellationToken from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription from kubemq.queues import Client as QueuesClient from graphrag_sdk.models.openai import OpenAiGenerativeModel from graphrag_sdk.model_config import KnowledgeGraphModelConfig from graphrag_sdk import KnowledgeGraph, Ontology from graphrag_sdk.source import URL class RAGServer: def __init__(self): self.cq_client = CQClient(address="localhost:50000") self.queues_client = QueuesClient(address="localhost:50000") model = OpenAiGenerativeModel(model_name="gpt-4o") with open("ontology.json", "r") as f: ontology = json.load(f) ontology = Ontology.from_json(ontology) self.kg = KnowledgeGraph( name="movies", model_config=KnowledgeGraphModelConfig.with_model(model), ontology=ontology) self.chat = self.kg.chat_session() self.shutdown_event = threading.Event() self.threads: List[threading.Thread] = [] def handle_chat(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') print(f"Received chat message: {message}") result= self.chat.send_message(message) answer = result.get("response","No answer") print(f"Chat response: {answer}") response = QueryResponseMessage( query_received=request, is_executed=True, body=answer.encode('utf-8') ) self.cq_client.send_response_message(response) except Exception as e: print(f"Error processing chat message: {str(e)}") self.cq_client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def pull_from_queue(self): while not self.shutdown_event.is_set(): try: result = self.queues_client.pull("rag-sources-queue", 10, 1) if result.is_error: print(f"Error pulling message from queue: {result.error}") continue sources = [] for message in result.messages: source = message.body.decode('utf-8') print(f"Received source: {source}, adding to knowledge graph") sources.append(URL(message.body.decode('utf-8'))) if sources: self.kg.process_sources(sources) except Exception as e: if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error processing sources: {str(e)}") def subscribe_to_chat_queries(self): def on_error(err: str): if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error: {err}") cancellation_token = CancellationToken() try: self.cq_client.subscribe_to_queries( subscription=QueriesSubscription( channel="rag-chat-query", on_receive_query_callback=self.handle_chat, on_error_callback=on_error, ), cancel=cancellation_token ) # Wait for shutdown signal while not self.shutdown_event.is_set(): time.sleep(0.1) # Cancel subscription when shutdown is requested cancellation_token.cancel() except Exception as e: if not self.shutdown_event.is_set(): print(f"Error in subscription thread: {str(e)}") def run(self): chat_thread = threading.Thread(target=self.subscribe_to_chat_queries) queue_thread = threading.Thread(target=self.pull_from_queue) self.threads.extend([chat_thread, queue_thread]) for thread in self.threads: thread.daemon = True # Make threads daemon so they exit when main thread exits thread.start() print("RAG server started") try: while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down gracefully...") self.shutdown() self.cq_client.close() self.queues_client.close() def shutdown(self): print("Initiating shutdown sequence...") self.shutdown_event.set() # Signal all threads to stop for thread in self.threads: thread.join(timeout=5.0) # Wait up to 5 seconds for each thread if thread.is_alive(): print(f"Warning: Thread {thread.name} did not shutdown cleanly") print("Shutdown complete") if __name__ == "__main__": rag_server = RAGServer() rag_server.run()此程式碼實作了一個簡單的用戶端,透過 KubeMQ 的佇列系統將影片 URL 傳送到 RAG 伺服器。具體來說,它會建立一個連接到 KubeMQ 的 SourceClient 類,並將訊息傳送到「rag-sources-queue」通道,該通道與 RAG 伺服器監控的佇列相同。當作為主程式運行時,它會發送一個爛番茄電影 URL 清單(包括《駭客任務》電影、《疾速追殺》和《生死時速》),由 RAG 伺服器處理並添加到知識圖譜中。
發送和接收問題和解答
# sources_client.py from kubemq.queues import * class SourceClient: def __init__(self, address="localhost:50000"): self.client = Client(address=address) def send_source(self, message: str) : send_result = self.client.send_queues_message( QueueMessage( channel="rag-sources-queue", body=message.encode("utf-8"), ) ) if send_result.is_error: print(f"message send error, error:{send_result.error}") if __name__ == "__main__": client = SourceClient() urls = ["https://www.rottentomatoes.com/m/side_by_side_2012", "https://www.rottentomatoes.com/m/matrix", "https://www.rottentomatoes.com/m/matrix_revolutions", "https://www.rottentomatoes.com/m/matrix_reloaded", "https://www.rottentomatoes.com/m/speed_1994", "https://www.rottentomatoes.com/m/john_wick_chapter_4"] for url in urls: client.send_source(url) print("done")此程式碼實作了一個聊天用戶端,透過 KubeMQ 的查詢系統與 RAG 伺服器進行通訊。 ChatClient 類別將訊息傳送到「rag-chat-query」通道並等待回應,每個查詢都有 30 秒的逾時時間。當作為主程式運行時,它會透過發送兩個有關《駭客任務》導演及其與基努·裡維斯的聯繫的相關問題來演示客戶端的功能,並在收到問題時列印每個回應。
程式碼庫
所有程式碼範例都可以在我的原始 GitHub 儲存庫的分支中找到。
結論
將 KubeMQ 整合到 GenAI 應用程式的 RAG 管道中,為處理連續資料流和複雜的進程間通訊提供了可擴展、可靠且高效的機制。透過充當具有多種訊息傳遞模式的統一路由器,KubeMQ 簡化了整體架構,減少了對自訂路由解決方案的需求,並加快了開發週期。
此外,合併 FalkorDB 透過提供與 KubeMQ 無縫整合的高效能知識庫來增強資料管理。這種組合可確保優化資料檢索和存儲,支援 RAG 流程的動態要求。
處理高吞吐量場景的能力,與持久性和容錯等功能相結合,確保 GenAI 應用程式即使在重負載或面臨系統中斷的情況下也能保持響應能力和可靠性。
透過利用 KubeMQ 和 FalkorDB,組織可以專注於增強其 AI 模型並提供有價值的見解和服務,並確信其資料路由基礎設施強大且能夠滿足現代 AI 工作流程的需求。
以上是使用 KubeMQ 增強 GenAI 應用程式:有效擴展檢索增強生成 (RAG)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本文解釋瞭如何使用美麗的湯庫來解析html。 它詳細介紹了常見方法,例如find(),find_all(),select()和get_text(),以用於數據提取,處理不同的HTML結構和錯誤以及替代方案(SEL)

Python的statistics模塊提供強大的數據統計分析功能,幫助我們快速理解數據整體特徵,例如生物統計學和商業分析等領域。無需逐個查看數據點,只需查看均值或方差等統計量,即可發現原始數據中可能被忽略的趨勢和特徵,並更輕鬆、有效地比較大型數據集。 本教程將介紹如何計算平均值和衡量數據集的離散程度。除非另有說明,本模塊中的所有函數都支持使用mean()函數計算平均值,而非簡單的求和平均。 也可使用浮點數。 import random import statistics from fracti

Python 對象的序列化和反序列化是任何非平凡程序的關鍵方面。如果您將某些內容保存到 Python 文件中,如果您讀取配置文件,或者如果您響應 HTTP 請求,您都會進行對象序列化和反序列化。 從某種意義上說,序列化和反序列化是世界上最無聊的事情。誰會在乎所有這些格式和協議?您想持久化或流式傳輸一些 Python 對象,並在以後完整地取回它們。 這是一種在概念層面上看待世界的好方法。但是,在實際層面上,您選擇的序列化方案、格式或協議可能會決定程序運行的速度、安全性、維護狀態的自由度以及與其他系

本文比較了Tensorflow和Pytorch的深度學習。 它詳細介紹了所涉及的步驟:數據準備,模型構建,培訓,評估和部署。 框架之間的關鍵差異,特別是關於計算刻度的

本文討論了諸如Numpy,Pandas,Matplotlib,Scikit-Learn,Tensorflow,Tensorflow,Django,Blask和請求等流行的Python庫,並詳細介紹了它們在科學計算,數據分析,可視化,機器學習,網絡開發和H中的用途

本文指導Python開發人員構建命令行界面(CLIS)。 它使用Typer,Click和ArgParse等庫詳細介紹,強調輸入/輸出處理,並促進用戶友好的設計模式,以提高CLI可用性。

該教程建立在先前對美麗湯的介紹基礎上,重點是簡單的樹導航之外的DOM操縱。 我們將探索有效的搜索方法和技術,以修改HTML結構。 一種常見的DOM搜索方法是EX

文章討論了虛擬環境在Python中的作用,重點是管理項目依賴性並避免衝突。它詳細介紹了他們在改善項目管理和減少依賴問題方面的創建,激活和利益。


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

SAP NetWeaver Server Adapter for Eclipse
將Eclipse與SAP NetWeaver應用伺服器整合。

SublimeText3 Linux新版
SublimeText3 Linux最新版

MinGW - Minimalist GNU for Windows
這個專案正在遷移到osdn.net/projects/mingw的過程中,你可以繼續在那裡關注我們。 MinGW:GNU編譯器集合(GCC)的本機Windows移植版本,可自由分發的導入函式庫和用於建置本機Windows應用程式的頭檔;包括對MSVC執行時間的擴展,以支援C99功能。 MinGW的所有軟體都可以在64位元Windows平台上運作。

WebStorm Mac版
好用的JavaScript開發工具

VSCode Windows 64位元 下載
微軟推出的免費、功能強大的一款IDE編輯器