搜索
首页后端开发Python教程使用 KubeMQ 增强 GenAI 应用程序:有效扩展检索增强生成 (RAG)

Enhancing GenAI Applications With KubeMQ: Efficiently Scaling Retrieval-Augmented Generation (RAG)

随着生成式人工智能 (GenAI) 在各行业的应用激增,组织越来越多地利用检索增强生成 (RAG) 技术通过实时、上下文丰富的内容来支持其人工智能模型数据。管理此类应用程序中复杂的信息流带来了重大挑战,特别是在处理大规模连续生成的数据时。 KubeMQ 是一个强大的消息代理,作为简化多个 RAG 进程的路由的解决方案而出现,确保 GenAI 应用程序中的高效数据处理。

为了进一步提高 RAG 工作流程的效率和可扩展性,集成 FalkorDB 这样的高性能数据库至关重要。 FalkorDB 为 RAG 系统所依赖的动态知识库提供可靠且可扩展的存储解决方案,确保快速数据检索以及与 KubeMQ 等消息传递系统的无缝集成。

了解 GenAI 工作流程中的 RAG

RAG 是一种通过集成检索机制来增强生成式 AI 模型的范例,允许模型在推理过程中访问外部知识库。这种方法通过将生成的响应基于可用的最新相关信息,显着提高了生成响应的准确性、相关性和及时性。

在使用 RAG 的典型 GenAI 工作流程中,该过程涉及多个步骤:

  1. 查询处理:解释用户的输入以了解意图和上下文

  2. 检索:从动态知识库(例如 FalkorDB)中获取相关文档或数据,确保快速高效地访问最新且相关的信息。

  3. 生成:使用输入和检索到的数据生成响应

  4. 响应交付:向用户提供最终的、丰富的输出

扩展这些步骤,尤其是在数据不断生成和更新的环境中,需要一种高效可靠的机制来在 RAG 管道的各个组件之间传输数据。

KubeMQ 在 RAG 处理中的关键作用

大规模处理连续数据流

在物联网网络、社交媒体平台或实时分析系统等场景中,不断产生新数据,人工智能模型必须迅速适应以合并这些信息。传统的请求-响应架构在高吞吐量条件下可能成为瓶颈,导致延迟问题和性能下降。

KubeMQ 通过提供可扩展且强大的基础设施来管理高吞吐量消息传递场景,以实现服务之间的高效数据路由。通过将 KubeMQ 集成到 RAG 管道中,每个新数据点都会发布到消息队列或流中,确保检索组件可以立即访问最新信息,而不会压垮系统。这种实时数据处理能力对于维持 GenAI 输出的相关性和准确性至关重要。

作为最佳路由器

KubeMQ 提供各种消息传递模式 - 包括队列、流、发布-订阅 (pub/sub) 和远程过程调用 (RPC) - 使其成为 RAG 管道中多功能且功能强大的路由器。其低延迟和高性能特性可确保及时的消息传递,这对于实时 GenAI 应用程序至关重要,因为延迟会严重影响用户体验和系统效率。

此外,KubeMQ 处理复杂路由逻辑的能力允许复杂的数据分发策略。这确保了人工智能系统的不同组件在需要时准确接收所需的数据,而不会出现不必要的重复或延迟。

集成 FalkorDB 以增强数据管理

虽然 KubeMQ 在服务之间有效地路由消息,FalkorDB 通过提供可扩展且高性能的图形数据库解决方案来存储和检索 RAG 流程所需的大量数据来补充这一点。这种集成确保当新数据流经 KubeMQ 时,它会无缝存储在 FalkorDB 中,使其可随时用于检索操作,而不会引入延迟或瓶颈。

增强可扩展性和可靠性

随着 GenAI 应用程序的用户群和数据量不断增长,可扩展性成为最重要的问题。 KubeMQ 具有可扩展性,支持水平扩展以无缝适应增加的负载。它确保随着 RAG 进程数量的增加或数据生成的加速,消息传递基础设施保持稳健和响应能力。

此外,KubeMQ 还提供消息持久化和容错能力。当发生系统故障或网络中断时,KubeMQ 可确保消息不会丢失并且系统可以正常恢复。这种可靠性对于维护人工智能应用程序的完整性至关重要,用户依赖这些应用程序来获取及时、准确的信息。

消除对专用路由服务的需求

在 RAG 管道中实现用于数据处理的自定义路由服务可能会占用大量资源且复杂。通常需要大量的开发工作来构建、维护和扩展这些服务,从而分散了核心人工智能应用程序开发的注意力。

通过采用 KubeMQ,组织无需创建定制路由解决方案。 KubeMQ 提供开箱即用的功能,可满足 RAG 进程的路由需求,包括复杂的路由模式、消息过滤和优先级处理。这不仅减少了开发和维护开销,还加快了 GenAI 解决方案的上市时间。

通过REST和SDK统一访问

KubeMQ 提供了多个与其消息代理功能交互的接口:

  • REST API:支持与语言无关的集成,允许以任何编程语言编写的服务通过 HTTP 发送和接收消息

  • SDK:为各种编程语言(例如 Python、Java、Go 和 .NET)提供客户端库,通过本机集成促进更高效的通信模式和更好的性能

这种灵活性允许开发人员为其特定用例选择最合适的方法,从而简化架构并加快开发周期。数据路由的单一接触点简化了 RAG 管道不同组件之间的通信,从而增强了整体系统的一致性。

在 RAG 管道中实现 KubeMQ:详细示例

代码示例展示了如何通过将 KubeMQ 集成到 RAG 管道来构建电影信息检索系统。它设置了一个服务器,从烂番茄中提取电影 URL,以使用 GPT-4 构建知识图谱。用户可以通过聊天客户端与该系统交互,发送与电影相关的查询并接收人工智能生成的响应。此用例演示了如何在实际应用程序中处理连续数据摄取和实时查询处理,利用 KubeMQ 在电影上下文中进行高效的消息处理和服务间通信。

架构概述

  1. 数据摄取服务:捕获新数据并将其发布到可用的 KubeMQ 流

  2. 检索服务:订阅KubeMQ流以接收更新并刷新知识库

  3. 生成服务:监听查询请求,与AI模型交互,并生成响应

  4. 响应服务:将生成的响应通过适当的渠道发送回用户

设置 KubeMQ

确保 KubeMQ 可以运行,这可以通过使用 Docker 部署来实现:

docker run -d --rm \
  -p 8080:8080 \
  -p 50000:50000 \
  -p 9090:9090 \
  -e KUBEMQ_TOKEN="your token"

此命令启动 KubeMQ,并为 REST 和 gRPC 通信公开必要的端口。

RAG服务器端

此代码(GitHub 存储库)实现了一个 RAG 服务器,该服务器处理聊天查询并使用 KubeMQ 进行消息处理来管理知识源。

docker run -d --rm \
  -p 8080:8080 \
  -p 50000:50000 \
  -p 9090:9090 \
  -e KUBEMQ_TOKEN="your token"

服务器运行两个主线程:一个通过名为“rag-chat-query”的通道订阅聊天查询,并使用 GPT-4 的知识图来处理它们,另一个从名为“rag”的队列中持续拉取-sources-queue”将新源添加到知识图谱中。知识图谱使用从 JSON 文件加载的自定义本体进行初始化,并使用 OpenAI 的 GPT-4 模型进行处理。服务器实现了优雅的关闭处理和错误管理,确保服务器停止时所有线程都正确终止。

发送源数据以摄取到 RAG 知识图谱中

# server.py

import json
import threading
from typing import List

from dotenv import load_dotenv
load_dotenv()
import time
from kubemq.common import CancellationToken
from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription
from kubemq.queues import Client as QueuesClient
from graphrag_sdk.models.openai import OpenAiGenerativeModel
from graphrag_sdk.model_config import KnowledgeGraphModelConfig
from graphrag_sdk import KnowledgeGraph, Ontology
from graphrag_sdk.source import URL

class RAGServer:
   def __init__(self):
       self.cq_client = CQClient(address="localhost:50000")
       self.queues_client = QueuesClient(address="localhost:50000")
       model = OpenAiGenerativeModel(model_name="gpt-4o")
       with open("ontology.json", "r") as f:
           ontology = json.load(f)
       ontology = Ontology.from_json(ontology)
       self.kg = KnowledgeGraph(
           name="movies",
           model_config=KnowledgeGraphModelConfig.with_model(model),
           ontology=ontology)
       self.chat = self.kg.chat_session()
       self.shutdown_event = threading.Event()
       self.threads: List[threading.Thread] = []

   def handle_chat(self, request: QueryMessageReceived):
       try:
           message = request.body.decode('utf-8')
           print(f"Received chat message: {message}")
           result= self.chat.send_message(message)
           answer = result.get("response","No answer")
           print(f"Chat response: {answer}")
           response = QueryResponseMessage(
               query_received=request,
               is_executed=True,
               body=answer.encode('utf-8')
           )
           self.cq_client.send_response_message(response)
       except Exception as e:
           print(f"Error processing chat message: {str(e)}")
           self.cq_client.send_response_message(QueryResponseMessage(
               query_received=request,
               is_executed=False,
               error=str(e)
           ))

   def pull_from_queue(self):
       while not self.shutdown_event.is_set():
           try:
               result = self.queues_client.pull("rag-sources-queue", 10, 1)
               if result.is_error:
                   print(f"Error pulling message from queue: {result.error}")
                   continue
               sources = []
               for message in result.messages:
                   source = message.body.decode('utf-8')
                   print(f"Received source: {source}, adding to knowledge graph")
                   sources.append(URL(message.body.decode('utf-8')))
               if sources:
                   self.kg.process_sources(sources)
           except Exception as e:
               if not self.shutdown_event.is_set():  # Only log if not shutting down
                   print(f"Error processing sources: {str(e)}")

   def subscribe_to_chat_queries(self):
       def on_error(err: str):
           if not self.shutdown_event.is_set():  # Only log if not shutting down
               print(f"Error: {err}")

       cancellation_token = CancellationToken()

       try:
           self.cq_client.subscribe_to_queries(
               subscription=QueriesSubscription(
                   channel="rag-chat-query",
                   on_receive_query_callback=self.handle_chat,
                   on_error_callback=on_error,
               ),
               cancel=cancellation_token
           )

           # Wait for shutdown signal
           while not self.shutdown_event.is_set():
               time.sleep(0.1)


           # Cancel subscription when shutdown is requested
           cancellation_token.cancel()

       except Exception as e:
           if not self.shutdown_event.is_set():
               print(f"Error in subscription thread: {str(e)}")
   def run(self):

       chat_thread = threading.Thread(target=self.subscribe_to_chat_queries)
       queue_thread = threading.Thread(target=self.pull_from_queue)

       self.threads.extend([chat_thread, queue_thread])

       for thread in self.threads:
           thread.daemon = True  # Make threads daemon so they exit when main thread exits
           thread.start()

       print("RAG server started")
       try:
           while True:
               time.sleep(1)
       except KeyboardInterrupt:
           print("\nShutting down gracefully...")
           self.shutdown()
           self.cq_client.close()
           self.queues_client.close()

   def shutdown(self):

       print("Initiating shutdown sequence...")
       self.shutdown_event.set()  # Signal all threads to stop

       for thread in self.threads:
           thread.join(timeout=5.0)  # Wait up to 5 seconds for each thread
           if thread.is_alive():
               print(f"Warning: Thread {thread.name} did not shutdown cleanly")

       print("Shutdown complete")
if __name__ == "__main__":
   rag_server = RAGServer()
   rag_server.run()

此代码实现了一个简单的客户端,通过 KubeMQ 的队列系统将电影 URL 发送到 RAG 服务器。具体来说,它创建一个连接到 KubeMQ 的 SourceClient 类,并将消息发送到“rag-sources-queue”通道,该通道与 RAG 服务器监控的队列相同。当作为主程序运行时,它会发送一个烂番茄电影 URL 列表(包括《黑客帝国》电影、《疾速追杀》和《生死时速》),由 RAG 服务器处理并添加到知识图谱中。

发送和接收问题和解答

# sources_client.py

from kubemq.queues import *

class SourceClient:
   def __init__(self, address="localhost:50000"):
       self.client = Client(address=address)

   def send_source(self, message: str) :
       send_result = self.client.send_queues_message(
           QueueMessage(
               channel="rag-sources-queue",
               body=message.encode("utf-8"),
           )
       )
       if send_result.is_error:
           print(f"message send error, error:{send_result.error}")

if __name__ == "__main__":
   client = SourceClient()
   urls = ["https://www.rottentomatoes.com/m/side_by_side_2012",
       "https://www.rottentomatoes.com/m/matrix",
       "https://www.rottentomatoes.com/m/matrix_revolutions",
       "https://www.rottentomatoes.com/m/matrix_reloaded",
       "https://www.rottentomatoes.com/m/speed_1994",
       "https://www.rottentomatoes.com/m/john_wick_chapter_4"]
   for url in urls:
       client.send_source(url)
   print("done")

此代码实现了一个聊天客户端,通过 KubeMQ 的查询系统与 RAG 服务器进行通信。 ChatClient 类将消息发送到“rag-chat-query”通道并等待响应,每个查询有 30 秒的超时时间。当作为主程序运行时,它通过发送两个有关《黑客帝国》导演及其与基努·里维斯的联系的相关问题来演示客户端的功能,并在收到问题时打印每个响应。

代码库

所有代码示例都可以在我的原始 GitHub 存储库的分支中找到。

结论

将 KubeMQ 集成到 GenAI 应用程序的 RAG 管道中,为处理连续数据流和复杂的进程间通信提供了可扩展、可靠且高效的机制。通过充当具有多种消息传递模式的统一路由器,KubeMQ 简化了整体架构,减少了对自定义路由解决方案的需求,并加快了开发周期。

此外,合并 FalkorDB 通过提供与 KubeMQ 无缝集成的高性能知识库来增强数据管理。这种组合可确保优化数据检索和存储,支持 RAG 流程的动态要求。

处理高吞吐量场景的能力,与持久性和容错等功能相结合,确保 GenAI 应用程序即使在重负载或面临系统中断的情况下也能保持响应能力和可靠性。

通过利用 KubeMQ 和 FalkorDB,组织可以专注于增强其 AI 模型并提供有价值的见解和服务,并确信其数据路由基础设施强大且能够满足现代 AI 工作流程的需求。

以上是使用 KubeMQ 增强 GenAI 应用程序:有效扩展检索增强生成 (RAG)的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
如何解决Linux终端中查看Python版本时遇到的权限问题?如何解决Linux终端中查看Python版本时遇到的权限问题?Apr 01, 2025 pm 05:09 PM

Linux终端中查看Python版本时遇到权限问题的解决方法当你在Linux终端中尝试查看Python的版本时,输入python...

我如何使用美丽的汤来解析HTML?我如何使用美丽的汤来解析HTML?Mar 10, 2025 pm 06:54 PM

本文解释了如何使用美丽的汤库来解析html。 它详细介绍了常见方法,例如find(),find_all(),select()和get_text(),以用于数据提取,处理不同的HTML结构和错误以及替代方案(SEL)

Python中的数学模块:统计Python中的数学模块:统计Mar 09, 2025 am 11:40 AM

Python的statistics模块提供强大的数据统计分析功能,帮助我们快速理解数据整体特征,例如生物统计学和商业分析等领域。无需逐个查看数据点,只需查看均值或方差等统计量,即可发现原始数据中可能被忽略的趋势和特征,并更轻松、有效地比较大型数据集。 本教程将介绍如何计算平均值和衡量数据集的离散程度。除非另有说明,本模块中的所有函数都支持使用mean()函数计算平均值,而非简单的求和平均。 也可使用浮点数。 import random import statistics from fracti

如何使用TensorFlow或Pytorch进行深度学习?如何使用TensorFlow或Pytorch进行深度学习?Mar 10, 2025 pm 06:52 PM

本文比较了Tensorflow和Pytorch的深度学习。 它详细介绍了所涉及的步骤:数据准备,模型构建,培训,评估和部署。 框架之间的关键差异,特别是关于计算刻度的

哪些流行的Python库及其用途?哪些流行的Python库及其用途?Mar 21, 2025 pm 06:46 PM

本文讨论了诸如Numpy,Pandas,Matplotlib,Scikit-Learn,Tensorflow,Tensorflow,Django,Blask和请求等流行的Python库,并详细介绍了它们在科学计算,数据分析,可视化,机器学习,网络开发和H中的用途

在Python中如何高效地将一个DataFrame的整列复制到另一个结构不同的DataFrame中?在Python中如何高效地将一个DataFrame的整列复制到另一个结构不同的DataFrame中?Apr 01, 2025 pm 11:15 PM

在使用Python的pandas库时,如何在两个结构不同的DataFrame之间进行整列复制是一个常见的问题。假设我们有两个Dat...

如何使用Python创建命令行接口(CLI)?如何使用Python创建命令行接口(CLI)?Mar 10, 2025 pm 06:48 PM

本文指导Python开发人员构建命令行界面(CLIS)。 它使用Typer,Click和ArgParse等库详细介绍,强调输入/输出处理,并促进用户友好的设计模式,以提高CLI可用性。

解释Python中虚拟环境的目的。解释Python中虚拟环境的目的。Mar 19, 2025 pm 02:27 PM

文章讨论了虚拟环境在Python中的作用,重点是管理项目依赖性并避免冲突。它详细介绍了他们在改善项目管理和减少依赖问题方面的创建,激活和利益。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前By尊渡假赌尊渡假赌尊渡假赌

热工具

MinGW - 适用于 Windows 的极简 GNU

MinGW - 适用于 Windows 的极简 GNU

这个项目正在迁移到osdn.net/projects/mingw的过程中,你可以继续在那里关注我们。MinGW:GNU编译器集合(GCC)的本地Windows移植版本,可自由分发的导入库和用于构建本地Windows应用程序的头文件;包括对MSVC运行时的扩展,以支持C99功能。MinGW的所有软件都可以在64位Windows平台上运行。

安全考试浏览器

安全考试浏览器

Safe Exam Browser是一个安全的浏览器环境,用于安全地进行在线考试。该软件将任何计算机变成一个安全的工作站。它控制对任何实用工具的访问,并防止学生使用未经授权的资源。

适用于 Eclipse 的 SAP NetWeaver 服务器适配器

适用于 Eclipse 的 SAP NetWeaver 服务器适配器

将Eclipse与SAP NetWeaver应用服务器集成。

SublimeText3 英文版

SublimeText3 英文版

推荐:为Win版本,支持代码提示!

mPDF

mPDF

mPDF是一个PHP库,可以从UTF-8编码的HTML生成PDF文件。原作者Ian Back编写mPDF以从他的网站上“即时”输出PDF文件,并处理不同的语言。与原始脚本如HTML2FPDF相比,它的速度较慢,并且在使用Unicode字体时生成的文件较大,但支持CSS样式等,并进行了大量增强。支持几乎所有语言,包括RTL(阿拉伯语和希伯来语)和CJK(中日韩)。支持嵌套的块级元素(如P、DIV),