搜索
首页后端开发Python教程使用 KubeMQ 增强 GenAI 应用程序:有效扩展检索增强生成 (RAG)

Enhancing GenAI Applications With KubeMQ: Efficiently Scaling Retrieval-Augmented Generation (RAG)

随着生成式人工智能 (GenAI) 在各行业的应用激增,组织越来越多地利用检索增强生成 (RAG) 技术通过实时、上下文丰富的内容来支持其人工智能模型数据。管理此类应用程序中复杂的信息流带来了重大挑战,特别是在处理大规模连续生成的数据时。 KubeMQ 是一个强大的消息代理,作为简化多个 RAG 进程的路由的解决方案而出现,确保 GenAI 应用程序中的高效数据处理。

为了进一步提高 RAG 工作流程的效率和可扩展性,集成 FalkorDB 这样的高性能数据库至关重要。 FalkorDB 为 RAG 系统所依赖的动态知识库提供可靠且可扩展的存储解决方案,确保快速数据检索以及与 KubeMQ 等消息传递系统的无缝集成。

了解 GenAI 工作流程中的 RAG

RAG 是一种通过集成检索机制来增强生成式 AI 模型的范例,允许模型在推理过程中访问外部知识库。这种方法通过将生成的响应基于可用的最新相关信息,显着提高了生成响应的准确性、相关性和及时性。

在使用 RAG 的典型 GenAI 工作流程中,该过程涉及多个步骤:

  1. 查询处理:解释用户的输入以了解意图和上下文

  2. 检索:从动态知识库(例如 FalkorDB)中获取相关文档或数据,确保快速高效地访问最新且相关的信息。

  3. 生成:使用输入和检索到的数据生成响应

  4. 响应交付:向用户提供最终的、丰富的输出

扩展这些步骤,尤其是在数据不断生成和更新的环境中,需要一种高效可靠的机制来在 RAG 管道的各个组件之间传输数据。

KubeMQ 在 RAG 处理中的关键作用

大规模处理连续数据流

在物联网网络、社交媒体平台或实时分析系统等场景中,不断产生新数据,人工智能模型必须迅速适应以合并这些信息。传统的请求-响应架构在高吞吐量条件下可能成为瓶颈,导致延迟问题和性能下降。

KubeMQ 通过提供可扩展且强大的基础设施来管理高吞吐量消息传递场景,以实现服务之间的高效数据路由。通过将 KubeMQ 集成到 RAG 管道中,每个新数据点都会发布到消息队列或流中,确保检索组件可以立即访问最新信息,而不会压垮系统。这种实时数据处理能力对于维持 GenAI 输出的相关性和准确性至关重要。

作为最佳路由器

KubeMQ 提供各种消息传递模式 - 包括队列、流、发布-订阅 (pub/sub) 和远程过程调用 (RPC) - 使其成为 RAG 管道中多功能且功能强大的路由器。其低延迟和高性能特性可确保及时的消息传递,这对于实时 GenAI 应用程序至关重要,因为延迟会严重影响用户体验和系统效率。

此外,KubeMQ 处理复杂路由逻辑的能力允许复杂的数据分发策略。这确保了人工智能系统的不同组件在需要时准确接收所需的数据,而不会出现不必要的重复或延迟。

集成 FalkorDB 以增强数据管理

虽然 KubeMQ 在服务之间有效地路由消息,FalkorDB 通过提供可扩展且高性能的图形数据库解决方案来存储和检索 RAG 流程所需的大量数据来补充这一点。这种集成确保当新数据流经 KubeMQ 时,它会无缝存储在 FalkorDB 中,使其可随时用于检索操作,而不会引入延迟或瓶颈。

增强可扩展性和可靠性

随着 GenAI 应用程序的用户群和数据量不断增长,可扩展性成为最重要的问题。 KubeMQ 具有可扩展性,支持水平扩展以无缝适应增加的负载。它确保随着 RAG 进程数量的增加或数据生成的加速,消息传递基础设施保持稳健和响应能力。

此外,KubeMQ 还提供消息持久化和容错能力。当发生系统故障或网络中断时,KubeMQ 可确保消息不会丢失并且系统可以正常恢复。这种可靠性对于维护人工智能应用程序的完整性至关重要,用户依赖这些应用程序来获取及时、准确的信息。

消除对专用路由服务的需求

在 RAG 管道中实现用于数据处理的自定义路由服务可能会占用大量资源且复杂。通常需要大量的开发工作来构建、维护和扩展这些服务,从而分散了核心人工智能应用程序开发的注意力。

通过采用 KubeMQ,组织无需创建定制路由解决方案。 KubeMQ 提供开箱即用的功能,可满足 RAG 进程的路由需求,包括复杂的路由模式、消息过滤和优先级处理。这不仅减少了开发和维护开销,还加快了 GenAI 解决方案的上市时间。

通过REST和SDK统一访问

KubeMQ 提供了多个与其消息代理功能交互的接口:

  • REST API:支持与语言无关的集成,允许以任何编程语言编写的服务通过 HTTP 发送和接收消息

  • SDK:为各种编程语言(例如 Python、Java、Go 和 .NET)提供客户端库,通过本机集成促进更高效的通信模式和更好的性能

这种灵活性允许开发人员为其特定用例选择最合适的方法,从而简化架构并加快开发周期。数据路由的单一接触点简化了 RAG 管道不同组件之间的通信,从而增强了整体系统的一致性。

在 RAG 管道中实现 KubeMQ:详细示例

代码示例展示了如何通过将 KubeMQ 集成到 RAG 管道来构建电影信息检索系统。它设置了一个服务器,从烂番茄中提取电影 URL,以使用 GPT-4 构建知识图谱。用户可以通过聊天客户端与该系统交互,发送与电影相关的查询并接收人工智能生成的响应。此用例演示了如何在实际应用程序中处理连续数据摄取和实时查询处理,利用 KubeMQ 在电影上下文中进行高效的消息处理和服务间通信。

架构概述

  1. 数据摄取服务:捕获新数据并将其发布到可用的 KubeMQ 流

  2. 检索服务:订阅KubeMQ流以接收更新并刷新知识库

  3. 生成服务:监听查询请求,与AI模型交互,并生成响应

  4. 响应服务:将生成的响应通过适当的渠道发送回用户

设置 KubeMQ

确保 KubeMQ 可以运行,这可以通过使用 Docker 部署来实现:

docker run -d --rm \
  -p 8080:8080 \
  -p 50000:50000 \
  -p 9090:9090 \
  -e KUBEMQ_TOKEN="your token"

此命令启动 KubeMQ,并为 REST 和 gRPC 通信公开必要的端口。

RAG服务器端

此代码(GitHub 存储库)实现了一个 RAG 服务器,该服务器处理聊天查询并使用 KubeMQ 进行消息处理来管理知识源。

docker run -d --rm \
  -p 8080:8080 \
  -p 50000:50000 \
  -p 9090:9090 \
  -e KUBEMQ_TOKEN="your token"

服务器运行两个主线程:一个通过名为“rag-chat-query”的通道订阅聊天查询,并使用 GPT-4 的知识图来处理它们,另一个从名为“rag”的队列中持续拉取-sources-queue”将新源添加到知识图谱中。知识图谱使用从 JSON 文件加载的自定义本体进行初始化,并使用 OpenAI 的 GPT-4 模型进行处理。服务器实现了优雅的关闭处理和错误管理,确保服务器停止时所有线程都正确终止。

发送源数据以摄取到 RAG 知识图谱中

# server.py

import json
import threading
from typing import List

from dotenv import load_dotenv
load_dotenv()
import time
from kubemq.common import CancellationToken
from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription
from kubemq.queues import Client as QueuesClient
from graphrag_sdk.models.openai import OpenAiGenerativeModel
from graphrag_sdk.model_config import KnowledgeGraphModelConfig
from graphrag_sdk import KnowledgeGraph, Ontology
from graphrag_sdk.source import URL

class RAGServer:
   def __init__(self):
       self.cq_client = CQClient(address="localhost:50000")
       self.queues_client = QueuesClient(address="localhost:50000")
       model = OpenAiGenerativeModel(model_name="gpt-4o")
       with open("ontology.json", "r") as f:
           ontology = json.load(f)
       ontology = Ontology.from_json(ontology)
       self.kg = KnowledgeGraph(
           name="movies",
           model_config=KnowledgeGraphModelConfig.with_model(model),
           ontology=ontology)
       self.chat = self.kg.chat_session()
       self.shutdown_event = threading.Event()
       self.threads: List[threading.Thread] = []

   def handle_chat(self, request: QueryMessageReceived):
       try:
           message = request.body.decode('utf-8')
           print(f"Received chat message: {message}")
           result= self.chat.send_message(message)
           answer = result.get("response","No answer")
           print(f"Chat response: {answer}")
           response = QueryResponseMessage(
               query_received=request,
               is_executed=True,
               body=answer.encode('utf-8')
           )
           self.cq_client.send_response_message(response)
       except Exception as e:
           print(f"Error processing chat message: {str(e)}")
           self.cq_client.send_response_message(QueryResponseMessage(
               query_received=request,
               is_executed=False,
               error=str(e)
           ))

   def pull_from_queue(self):
       while not self.shutdown_event.is_set():
           try:
               result = self.queues_client.pull("rag-sources-queue", 10, 1)
               if result.is_error:
                   print(f"Error pulling message from queue: {result.error}")
                   continue
               sources = []
               for message in result.messages:
                   source = message.body.decode('utf-8')
                   print(f"Received source: {source}, adding to knowledge graph")
                   sources.append(URL(message.body.decode('utf-8')))
               if sources:
                   self.kg.process_sources(sources)
           except Exception as e:
               if not self.shutdown_event.is_set():  # Only log if not shutting down
                   print(f"Error processing sources: {str(e)}")

   def subscribe_to_chat_queries(self):
       def on_error(err: str):
           if not self.shutdown_event.is_set():  # Only log if not shutting down
               print(f"Error: {err}")

       cancellation_token = CancellationToken()

       try:
           self.cq_client.subscribe_to_queries(
               subscription=QueriesSubscription(
                   channel="rag-chat-query",
                   on_receive_query_callback=self.handle_chat,
                   on_error_callback=on_error,
               ),
               cancel=cancellation_token
           )

           # Wait for shutdown signal
           while not self.shutdown_event.is_set():
               time.sleep(0.1)


           # Cancel subscription when shutdown is requested
           cancellation_token.cancel()

       except Exception as e:
           if not self.shutdown_event.is_set():
               print(f"Error in subscription thread: {str(e)}")
   def run(self):

       chat_thread = threading.Thread(target=self.subscribe_to_chat_queries)
       queue_thread = threading.Thread(target=self.pull_from_queue)

       self.threads.extend([chat_thread, queue_thread])

       for thread in self.threads:
           thread.daemon = True  # Make threads daemon so they exit when main thread exits
           thread.start()

       print("RAG server started")
       try:
           while True:
               time.sleep(1)
       except KeyboardInterrupt:
           print("\nShutting down gracefully...")
           self.shutdown()
           self.cq_client.close()
           self.queues_client.close()

   def shutdown(self):

       print("Initiating shutdown sequence...")
       self.shutdown_event.set()  # Signal all threads to stop

       for thread in self.threads:
           thread.join(timeout=5.0)  # Wait up to 5 seconds for each thread
           if thread.is_alive():
               print(f"Warning: Thread {thread.name} did not shutdown cleanly")

       print("Shutdown complete")
if __name__ == "__main__":
   rag_server = RAGServer()
   rag_server.run()

此代码实现了一个简单的客户端,通过 KubeMQ 的队列系统将电影 URL 发送到 RAG 服务器。具体来说,它创建一个连接到 KubeMQ 的 SourceClient 类,并将消息发送到“rag-sources-queue”通道,该通道与 RAG 服务器监控的队列相同。当作为主程序运行时,它会发送一个烂番茄电影 URL 列表(包括《黑客帝国》电影、《疾速追杀》和《生死时速》),由 RAG 服务器处理并添加到知识图谱中。

发送和接收问题和解答

# sources_client.py

from kubemq.queues import *

class SourceClient:
   def __init__(self, address="localhost:50000"):
       self.client = Client(address=address)

   def send_source(self, message: str) :
       send_result = self.client.send_queues_message(
           QueueMessage(
               channel="rag-sources-queue",
               body=message.encode("utf-8"),
           )
       )
       if send_result.is_error:
           print(f"message send error, error:{send_result.error}")

if __name__ == "__main__":
   client = SourceClient()
   urls = ["https://www.rottentomatoes.com/m/side_by_side_2012",
       "https://www.rottentomatoes.com/m/matrix",
       "https://www.rottentomatoes.com/m/matrix_revolutions",
       "https://www.rottentomatoes.com/m/matrix_reloaded",
       "https://www.rottentomatoes.com/m/speed_1994",
       "https://www.rottentomatoes.com/m/john_wick_chapter_4"]
   for url in urls:
       client.send_source(url)
   print("done")

此代码实现了一个聊天客户端,通过 KubeMQ 的查询系统与 RAG 服务器进行通信。 ChatClient 类将消息发送到“rag-chat-query”通道并等待响应,每个查询有 30 秒的超时时间。当作为主程序运行时,它通过发送两个有关《黑客帝国》导演及其与基努·里维斯的联系的相关问题来演示客户端的功能,并在收到问题时打印每个响应。

代码库

所有代码示例都可以在我的原始 GitHub 存储库的分支中找到。

结论

将 KubeMQ 集成到 GenAI 应用程序的 RAG 管道中,为处理连续数据流和复杂的进程间通信提供了可扩展、可靠且高效的机制。通过充当具有多种消息传递模式的统一路由器,KubeMQ 简化了整体架构,减少了对自定义路由解决方案的需求,并加快了开发周期。

此外,合并 FalkorDB 通过提供与 KubeMQ 无缝集成的高性能知识库来增强数据管理。这种组合可确保优化数据检索和存储,支持 RAG 流程的动态要求。

处理高吞吐量场景的能力,与持久性和容错等功能相结合,确保 GenAI 应用程序即使在重负载或面临系统中断的情况下也能保持响应能力和可靠性。

通过利用 KubeMQ 和 FalkorDB,组织可以专注于增强其 AI 模型并提供有价值的见解和服务,并确信其数据路由基础设施强大且能够满足现代 AI 工作流程的需求。

以上是使用 KubeMQ 增强 GenAI 应用程序:有效扩展检索增强生成 (RAG)的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
python中两个列表的串联替代方案是什么?python中两个列表的串联替代方案是什么?May 09, 2025 am 12:16 AM

可以使用多种方法在Python中连接两个列表:1.使用 操作符,简单但在大列表中效率低;2.使用extend方法,效率高但会修改原列表;3.使用 =操作符,兼具效率和可读性;4.使用itertools.chain函数,内存效率高但需额外导入;5.使用列表解析,优雅但可能过于复杂。选择方法应根据代码上下文和需求。

Python:合并两个列表的有效方法Python:合并两个列表的有效方法May 09, 2025 am 12:15 AM

有多种方法可以合并Python列表:1.使用 操作符,简单但对大列表不内存高效;2.使用extend方法,内存高效但会修改原列表;3.使用itertools.chain,适用于大数据集;4.使用*操作符,一行代码合并小到中型列表;5.使用numpy.concatenate,适用于大数据集和性能要求高的场景;6.使用append方法,适用于小列表但效率低。选择方法时需考虑列表大小和应用场景。

编译的与解释的语言:优点和缺点编译的与解释的语言:优点和缺点May 09, 2025 am 12:06 AM

CompiledLanguagesOffersPeedAndSecurity,而interneterpretledlanguages provideeaseafuseanDoctability.1)commiledlanguageslikec arefasterandSecureButhOnderDevevelmendeclementCyclesclesclesclesclesclesclesclesclesclesclesclesclesclesclesclesclesclesandentency.2)cransportedeplatectentysenty

Python:对于循环,最完整的指南Python:对于循环,最完整的指南May 09, 2025 am 12:05 AM

Python中,for循环用于遍历可迭代对象,while循环用于条件满足时重复执行操作。1)for循环示例:遍历列表并打印元素。2)while循环示例:猜数字游戏,直到猜对为止。掌握循环原理和优化技巧可提高代码效率和可靠性。

python concatenate列表到一个字符串中python concatenate列表到一个字符串中May 09, 2025 am 12:02 AM

要将列表连接成字符串,Python中使用join()方法是最佳选择。1)使用join()方法将列表元素连接成字符串,如''.join(my_list)。2)对于包含数字的列表,先用map(str,numbers)转换为字符串再连接。3)可以使用生成器表达式进行复杂格式化,如','.join(f'({fruit})'forfruitinfruits)。4)处理混合数据类型时,使用map(str,mixed_list)确保所有元素可转换为字符串。5)对于大型列表,使用''.join(large_li

Python的混合方法:编译和解释合并Python的混合方法:编译和解释合并May 08, 2025 am 12:16 AM

pythonuseshybridapprace,ComminingCompilationTobyTecoDeAndInterpretation.1)codeiscompiledtoplatform-Indepententbybytecode.2)bytecodeisisterpretedbybythepbybythepythonvirtualmachine,增强效率和通用性。

了解python的' for”和' then”循环之间的差异了解python的' for”和' then”循环之间的差异May 08, 2025 am 12:11 AM

theKeyDifferencesBetnewpython's“ for”和“ for”和“ loopsare:1)” for“ loopsareIdealForiteringSequenceSquencesSorkNowniterations,而2)”,而“ loopsareBetterforConterContinuingUntilacTientInditionIntionismetismetistismetistwithOutpredefinedInedIterations.un

Python串联列表与重复Python串联列表与重复May 08, 2025 am 12:09 AM

在Python中,可以通过多种方法连接列表并管理重复元素:1)使用 运算符或extend()方法可以保留所有重复元素;2)转换为集合再转回列表可以去除所有重复元素,但会丢失原有顺序;3)使用循环或列表推导式结合集合可以去除重复元素并保持原有顺序。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

安全考试浏览器

安全考试浏览器

Safe Exam Browser是一个安全的浏览器环境,用于安全地进行在线考试。该软件将任何计算机变成一个安全的工作站。它控制对任何实用工具的访问,并防止学生使用未经授权的资源。

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

功能强大的PHP集成开发环境

SecLists

SecLists

SecLists是最终安全测试人员的伙伴。它是一个包含各种类型列表的集合,这些列表在安全评估过程中经常使用,都在一个地方。SecLists通过方便地提供安全测试人员可能需要的所有列表,帮助提高安全测试的效率和生产力。列表类型包括用户名、密码、URL、模糊测试有效载荷、敏感数据模式、Web shell等等。测试人员只需将此存储库拉到新的测试机上,他就可以访问到所需的每种类型的列表。

PhpStorm Mac 版本

PhpStorm Mac 版本

最新(2018.2.1 )专业的PHP集成开发工具

MinGW - 适用于 Windows 的极简 GNU

MinGW - 适用于 Windows 的极简 GNU

这个项目正在迁移到osdn.net/projects/mingw的过程中,你可以继续在那里关注我们。MinGW:GNU编译器集合(GCC)的本地Windows移植版本,可自由分发的导入库和用于构建本地Windows应用程序的头文件;包括对MSVC运行时的扩展,以支持C99功能。MinGW的所有软件都可以在64位Windows平台上运行。