recherche
Maisondéveloppement back-endTutoriel PythonAméliorer les applications GenAI avec KubeMQ : mise à l'échelle efficace de la génération augmentée par récupération (RAG)

Enhancing GenAI Applications With KubeMQ: Efficiently Scaling Retrieval-Augmented Generation (RAG)

Alors que l'adoption de l'IA générative (GenAI) se développe dans tous les secteurs, les organisations exploitent de plus en plus les techniques de génération augmentée par récupération (RAG) pour renforcer leurs modèles d'IA avec des données en temps réel et riches en contexte. données. La gestion du flux complexe d'informations dans de telles applications pose des défis importants, en particulier lorsqu'il s'agit de données générées en continu à grande échelle. KubeMQ, un courtier de messages robuste, apparaît comme une solution pour rationaliser le routage de plusieurs processus RAG, garantissant une gestion efficace des données dans les applications GenAI.

Pour améliorer encore l'efficacité et l'évolutivité des flux de travail RAG, l'intégration d'une base de données hautes performances telle que FalkorDB est essentielle. FalkorDB fournit une solution de stockage fiable et évolutive pour les bases de connaissances dynamiques dont dépendent les systèmes RAG, garantissant une récupération rapide des données et une intégration transparente avec des systèmes de messagerie comme KubeMQ.

Comprendre RAG dans les workflows GenAI

RAG est un paradigme qui améliore les modèles d'IA génératifs en intégrant un mécanisme de récupération, permettant aux modèles d'accéder à des bases de connaissances externes lors de l'inférence. Cette approche améliore considérablement l'exactitude, la pertinence et la rapidité des réponses générées en les fondant sur les informations disponibles les plus récentes et les plus pertinentes.

Dans les flux de travail GenAI typiques employant RAG, le processus comporte plusieurs étapes :

  1. Traitement des requêtes : interprétation de la saisie de l'utilisateur pour comprendre l'intention et le contexte

  2. Récupération : Récupération de documents ou de données pertinents à partir d'une base de connaissances dynamique, telle que FalkorDB, qui garantit un accès rapide et efficace aux informations les plus récentes et pertinentes.

  3. Génération : Produire une réponse en utilisant à la fois les données saisies et les données récupérées

  4. Livraison de la réponse : fourniture du résultat final enrichi à l'utilisateur

La mise à l'échelle de ces étapes, en particulier dans les environnements où les données sont générées et mises à jour en permanence, nécessite un mécanisme efficace et fiable pour le flux de données entre les différents composants du pipeline RAG.

Le rôle critique de KubeMQ dans le traitement RAG

Gérer des flux de données continus à grande échelle

Dans des scénarios tels que les réseaux IoT, les plateformes de médias sociaux ou les systèmes d'analyse en temps réel, de nouvelles données sont sans cesse produites et les modèles d'IA doivent s'adapter rapidement pour intégrer ces informations. Les architectures requête-réponse traditionnelles peuvent devenir des goulots d'étranglement dans des conditions de haut débit, entraînant des problèmes de latence et une dégradation des performances.

KubeMQ gère des scénarios de messagerie à haut débit en fournissant une infrastructure évolutive et robuste pour un routage efficace des données entre les services. En intégrant KubeMQ dans le pipeline RAG, chaque nouveau point de données est publié dans une file d'attente ou un flux de messages, garantissant ainsi que les composants de récupération ont un accès immédiat aux dernières informations sans surcharger le système. Cette capacité de traitement des données en temps réel est cruciale pour maintenir la pertinence et l'exactitude des résultats de GenAI.

Servir de routeur optimal

KubeMQ offre une variété de modèles de messagerie, notamment les files d'attente, les flux, la publication-abonnement (pub/sub) et les appels de procédure à distance (RPC), ce qui en fait un routeur polyvalent et puissant au sein d'un pipeline RAG. Sa faible latence et ses caractéristiques hautes performances garantissent une livraison rapide des messages, ce qui est essentiel pour les applications GenAI en temps réel où les retards peuvent avoir un impact significatif sur l'expérience utilisateur et l'efficacité du système.

De plus, la capacité de KubeMQ à gérer une logique de routage complexe permet des stratégies sophistiquées de distribution de données. Cela garantit que les différents composants du système d'IA reçoivent précisément les données dont ils ont besoin, quand ils en ont besoin, sans duplication ni retard inutiles.

Intégration de FalkorDB pour une gestion améliorée des données

Alors que KubeMQ achemine efficacement les messages entre les services, FalkorDB complète cela en fournissant une solution de base de données graphique évolutive et hautes performances pour stocker et récupérer les grandes quantités de données requises par les processus RAG. Cette intégration garantit que lorsque les nouvelles données transitent par KubeMQ, elles sont stockées de manière transparente dans FalkorDB, ce qui les rend facilement disponibles pour les opérations de récupération sans introduire de latence ni de goulots d'étranglement.

Améliorer l'évolutivité et la fiabilité

À mesure que les applications GenAI augmentent à la fois en termes de base d'utilisateurs et de volume de données, l'évolutivité devient une préoccupation primordiale. KubeMQ est évolutif et prend en charge la mise à l'échelle horizontale pour s'adapter de manière transparente à une charge accrue. Il garantit qu'à mesure que le nombre de processus RAG augmente ou que la génération de données s'accélère, l'infrastructure de messagerie reste robuste et réactive.

De plus, KubeMQ assure la persistance des messages et la tolérance aux pannes. En cas de panne du système ou de perturbation du réseau, KubeMQ garantit que les messages ne sont pas perdus et que le système peut récupérer correctement. Cette fiabilité est essentielle au maintien de l'intégrité des applications d'IA dont les utilisateurs dépendent pour obtenir des informations précises et opportunes.

Éliminer le besoin de services de routage dédiés

La mise en œuvre de services de routage personnalisés pour le traitement des données dans les pipelines RAG peut être complexe et gourmande en ressources. La création, la maintenance et la mise à l'échelle de ces services nécessitent souvent des efforts de développement importants, ce qui détourne l'attention du développement d'applications d'IA de base.

En adoptant KubeMQ, les organisations éliminent le besoin de créer des solutions de routage sur mesure. KubeMQ fournit des fonctionnalités prêtes à l'emploi qui répondent aux besoins de routage des processus RAG, notamment des modèles de routage complexes, le filtrage des messages et la gestion des priorités. Cela réduit non seulement les frais de développement et de maintenance, mais accélère également la mise sur le marché des solutions GenAI.

Accès unifié via REST et SDK

KubeMQ propose plusieurs interfaces pour interagir avec ses capacités de courtier de messages :

  • API REST : permet une intégration indépendante du langage, permettant aux services écrits dans n'importe quel langage de programmation d'envoyer et de recevoir des messages via HTTP

  • SDK : fournit des bibliothèques client pour divers langages de programmation (tels que Python, Java, Go et .NET), facilitant des modèles de communication plus efficaces et de meilleures performances grâce à des intégrations natives

Cette flexibilité permet aux développeurs de choisir la méthode la plus appropriée pour leur cas d'utilisation spécifique, simplifiant l'architecture et accélérant les cycles de développement. Un point de contact unique pour le routage des données rationalise la communication entre les différents composants du pipeline RAG, améliorant ainsi la cohérence globale du système.

Implémentation de KubeMQ dans un pipeline RAG : un exemple détaillé

L'exemple de code montre comment créer un système de récupération d'informations sur les films en intégrant KubeMQ dans un pipeline RAG. Il configure un serveur qui ingère les URL de films de Rotten Tomatoes pour créer un graphique de connaissances à l'aide de GPT-4. Les utilisateurs peuvent interagir avec ce système via un client de chat, en envoyant des requêtes liées aux films et en recevant des réponses générées par l'IA. Ce cas d'utilisation montre comment gérer l'ingestion continue de données et le traitement des requêtes en temps réel dans une application pratique, en utilisant KubeMQ pour une gestion efficace des messages et une communication interservices dans le contexte de films.

Présentation de l'architecture

  1. Service d'ingestion de données : capture et publie de nouvelles données dans les flux KubeMQ dès qu'elles deviennent disponibles

  2. Service de récupération : Abonnez-vous au flux KubeMQ pour recevoir les mises à jour et actualiser la base de connaissances

  3. Service de génération : écoute les demandes de requête, interagit avec le modèle d'IA et génère des réponses

  4. Service de réponse : renvoie les réponses générées aux utilisateurs via les canaux appropriés

Configuration de KubeMQ

Assurez-vous que KubeMQ est opérationnel, ce qui peut être réalisé en le déployant à l'aide de Docker :

docker run -d --rm \
  -p 8080:8080 \
  -p 50000:50000 \
  -p 9090:9090 \
  -e KUBEMQ_TOKEN="your token"

Cette commande démarre KubeMQ avec les ports nécessaires exposés pour les communications REST et gRPC.

Côté serveur RAG

Ce code (dépôt GitHub) implémente un serveur RAG qui traite les requêtes de chat et gère les sources de connaissances à l'aide de KubeMQ pour la gestion des messages.

docker run -d --rm \
  -p 8080:8080 \
  -p 50000:50000 \
  -p 9090:9090 \
  -e KUBEMQ_TOKEN="your token"

Le serveur exécute deux threads principaux : un qui s'abonne aux requêtes de chat via un canal appelé "rag-chat-query" et les traite à l'aide d'un graphe de connaissances avec GPT-4, et un autre qui extrait continuellement d'une file d'attente appelée "rag -sources-queue" pour ajouter de nouvelles sources au graphe de connaissances. Le graphe de connaissances est initialisé avec une ontologie personnalisée chargée à partir d'un fichier JSON et utilise le modèle GPT-4 d'OpenAI pour le traitement. Le serveur implémente une gestion des arrêts et des erreurs en douceur, garantissant que tous les threads sont correctement terminés lorsque le serveur est arrêté.

Envoi de données sources à ingérer dans RAG Knowledge Graph

# server.py

import json
import threading
from typing import List

from dotenv import load_dotenv
load_dotenv()
import time
from kubemq.common import CancellationToken
from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription
from kubemq.queues import Client as QueuesClient
from graphrag_sdk.models.openai import OpenAiGenerativeModel
from graphrag_sdk.model_config import KnowledgeGraphModelConfig
from graphrag_sdk import KnowledgeGraph, Ontology
from graphrag_sdk.source import URL

class RAGServer:
   def __init__(self):
       self.cq_client = CQClient(address="localhost:50000")
       self.queues_client = QueuesClient(address="localhost:50000")
       model = OpenAiGenerativeModel(model_name="gpt-4o")
       with open("ontology.json", "r") as f:
           ontology = json.load(f)
       ontology = Ontology.from_json(ontology)
       self.kg = KnowledgeGraph(
           name="movies",
           model_config=KnowledgeGraphModelConfig.with_model(model),
           ontology=ontology)
       self.chat = self.kg.chat_session()
       self.shutdown_event = threading.Event()
       self.threads: List[threading.Thread] = []

   def handle_chat(self, request: QueryMessageReceived):
       try:
           message = request.body.decode('utf-8')
           print(f"Received chat message: {message}")
           result= self.chat.send_message(message)
           answer = result.get("response","No answer")
           print(f"Chat response: {answer}")
           response = QueryResponseMessage(
               query_received=request,
               is_executed=True,
               body=answer.encode('utf-8')
           )
           self.cq_client.send_response_message(response)
       except Exception as e:
           print(f"Error processing chat message: {str(e)}")
           self.cq_client.send_response_message(QueryResponseMessage(
               query_received=request,
               is_executed=False,
               error=str(e)
           ))

   def pull_from_queue(self):
       while not self.shutdown_event.is_set():
           try:
               result = self.queues_client.pull("rag-sources-queue", 10, 1)
               if result.is_error:
                   print(f"Error pulling message from queue: {result.error}")
                   continue
               sources = []
               for message in result.messages:
                   source = message.body.decode('utf-8')
                   print(f"Received source: {source}, adding to knowledge graph")
                   sources.append(URL(message.body.decode('utf-8')))
               if sources:
                   self.kg.process_sources(sources)
           except Exception as e:
               if not self.shutdown_event.is_set():  # Only log if not shutting down
                   print(f"Error processing sources: {str(e)}")

   def subscribe_to_chat_queries(self):
       def on_error(err: str):
           if not self.shutdown_event.is_set():  # Only log if not shutting down
               print(f"Error: {err}")

       cancellation_token = CancellationToken()

       try:
           self.cq_client.subscribe_to_queries(
               subscription=QueriesSubscription(
                   channel="rag-chat-query",
                   on_receive_query_callback=self.handle_chat,
                   on_error_callback=on_error,
               ),
               cancel=cancellation_token
           )

           # Wait for shutdown signal
           while not self.shutdown_event.is_set():
               time.sleep(0.1)


           # Cancel subscription when shutdown is requested
           cancellation_token.cancel()

       except Exception as e:
           if not self.shutdown_event.is_set():
               print(f"Error in subscription thread: {str(e)}")
   def run(self):

       chat_thread = threading.Thread(target=self.subscribe_to_chat_queries)
       queue_thread = threading.Thread(target=self.pull_from_queue)

       self.threads.extend([chat_thread, queue_thread])

       for thread in self.threads:
           thread.daemon = True  # Make threads daemon so they exit when main thread exits
           thread.start()

       print("RAG server started")
       try:
           while True:
               time.sleep(1)
       except KeyboardInterrupt:
           print("\nShutting down gracefully...")
           self.shutdown()
           self.cq_client.close()
           self.queues_client.close()

   def shutdown(self):

       print("Initiating shutdown sequence...")
       self.shutdown_event.set()  # Signal all threads to stop

       for thread in self.threads:
           thread.join(timeout=5.0)  # Wait up to 5 seconds for each thread
           if thread.is_alive():
               print(f"Warning: Thread {thread.name} did not shutdown cleanly")

       print("Shutdown complete")
if __name__ == "__main__":
   rag_server = RAGServer()
   rag_server.run()

Ce code implémente un client simple qui envoie les URL des films au serveur RAG via le système de file d'attente de KubeMQ. Plus précisément, il crée une classe SourceClient qui se connecte à KubeMQ et envoie des messages au canal « rag-sources-queue », qui est la même file d'attente que celle surveillée par le serveur RAG. Lorsqu'il est exécuté en tant que programme principal, il envoie une liste d'URL de films Rotten Tomatoes (y compris les films Matrix, John Wick et Speed) à traiter et à ajouter au graphique de connaissances par le serveur RAG.

Envoyer et recevoir des questions et des réponses

# sources_client.py

from kubemq.queues import *

class SourceClient:
   def __init__(self, address="localhost:50000"):
       self.client = Client(address=address)

   def send_source(self, message: str) :
       send_result = self.client.send_queues_message(
           QueueMessage(
               channel="rag-sources-queue",
               body=message.encode("utf-8"),
           )
       )
       if send_result.is_error:
           print(f"message send error, error:{send_result.error}")

if __name__ == "__main__":
   client = SourceClient()
   urls = ["https://www.rottentomatoes.com/m/side_by_side_2012",
       "https://www.rottentomatoes.com/m/matrix",
       "https://www.rottentomatoes.com/m/matrix_revolutions",
       "https://www.rottentomatoes.com/m/matrix_reloaded",
       "https://www.rottentomatoes.com/m/speed_1994",
       "https://www.rottentomatoes.com/m/john_wick_chapter_4"]
   for url in urls:
       client.send_source(url)
   print("done")

Ce code implémente un client de chat qui communique avec le serveur RAG via le système de requêtes de KubeMQ. La classe ChatClient envoie des messages au canal "rag-chat-query" et attend les réponses, avec un délai d'expiration de 30 secondes pour chaque requête. Lorsqu'il est exécuté en tant que programme principal, il démontre la fonctionnalité du client en envoyant deux questions connexes sur le réalisateur de Matrix et sa connexion à Keanu Reeves, en imprimant chaque réponse au fur et à mesure qu'il les reçoit.

Référentiel de codes

Tous les exemples de code peuvent être trouvés dans mon fork du référentiel GitHub d'origine.

Conclusion

L'intégration de KubeMQ dans les pipelines RAG pour les applications GenAI fournit un mécanisme évolutif, fiable et efficace pour gérer les flux de données continus et les communications inter-processus complexes. En servant de routeur unifié avec des modèles de messagerie polyvalents, KubeMQ simplifie l'architecture globale, réduit le besoin de solutions de routage personnalisées et accélère les cycles de développement.

De plus, l'intégration de FalkorDB améliore la gestion des données en offrant une base de connaissances hautes performances qui s'intègre parfaitement à KubeMQ. Cette combinaison garantit une récupération et un stockage optimisés des données, prenant en charge les exigences dynamiques des processus RAG.

La capacité à gérer des scénarios à haut débit, combinée à des fonctionnalités telles que la persistance et la tolérance aux pannes, garantit que les applications GenAI restent réactives et fiables, même sous de lourdes charges ou face à des perturbations du système.

En tirant parti de KubeMQ et FalkorDB, les organisations peuvent se concentrer sur l'amélioration de leurs modèles d'IA et fournir des informations et des services précieux, en étant sûres que leur infrastructure de routage de données est robuste et capable de répondre aux exigences des flux de travail d'IA modernes.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Comment résoudre le problème des autorisations rencontré lors de la visualisation de la version Python dans le terminal Linux?Comment résoudre le problème des autorisations rencontré lors de la visualisation de la version Python dans le terminal Linux?Apr 01, 2025 pm 05:09 PM

Solution aux problèmes d'autorisation Lors de la visualisation de la version Python dans Linux Terminal Lorsque vous essayez d'afficher la version Python dans Linux Terminal, entrez Python ...

Comment utiliser la belle soupe pour analyser HTML?Comment utiliser la belle soupe pour analyser HTML?Mar 10, 2025 pm 06:54 PM

Cet article explique comment utiliser la belle soupe, une bibliothèque Python, pour analyser HTML. Il détaille des méthodes courantes comme find (), find_all (), select () et get_text () pour l'extraction des données, la gestion de diverses structures et erreurs HTML et alternatives (Sel

Comment effectuer l'apprentissage en profondeur avec TensorFlow ou Pytorch?Comment effectuer l'apprentissage en profondeur avec TensorFlow ou Pytorch?Mar 10, 2025 pm 06:52 PM

Cet article compare TensorFlow et Pytorch pour l'apprentissage en profondeur. Il détaille les étapes impliquées: préparation des données, construction de modèles, formation, évaluation et déploiement. Différences clés entre les cadres, en particulier en ce qui concerne le raisin informatique

Comment copier efficacement la colonne entière d'une dataframe dans une autre dataframe avec différentes structures dans Python?Comment copier efficacement la colonne entière d'une dataframe dans une autre dataframe avec différentes structures dans Python?Apr 01, 2025 pm 11:15 PM

Lorsque vous utilisez la bibliothèque Pandas de Python, comment copier des colonnes entières entre deux frames de données avec différentes structures est un problème courant. Supposons que nous ayons deux dats ...

Comment créer des interfaces de ligne de commande (CLI) avec Python?Comment créer des interfaces de ligne de commande (CLI) avec Python?Mar 10, 2025 pm 06:48 PM

Cet article guide les développeurs Python sur la construction d'interfaces de ligne de commande (CLI). Il détaille à l'aide de bibliothèques comme Typer, Click et Argparse, mettant l'accent sur la gestion des entrées / sorties et promouvant des modèles de conception conviviaux pour une meilleure convivialité par la CLI.

Quelles sont les bibliothèques Python populaires et leurs utilisations?Quelles sont les bibliothèques Python populaires et leurs utilisations?Mar 21, 2025 pm 06:46 PM

L'article traite des bibliothèques Python populaires comme Numpy, Pandas, Matplotlib, Scikit-Learn, Tensorflow, Django, Flask et Demandes, détaillant leurs utilisations dans le calcul scientifique, l'analyse des données, la visualisation, l'apprentissage automatique, le développement Web et H et H

Que sont les expressions régulières?Que sont les expressions régulières?Mar 20, 2025 pm 06:25 PM

Les expressions régulières sont des outils puissants pour la correspondance des motifs et la manipulation du texte dans la programmation, améliorant l'efficacité du traitement de texte sur diverses applications.

Expliquez le but des environnements virtuels dans Python.Expliquez le but des environnements virtuels dans Python.Mar 19, 2025 pm 02:27 PM

L'article traite du rôle des environnements virtuels dans Python, en se concentrant sur la gestion des dépendances du projet et l'évitement des conflits. Il détaille leur création, leur activation et leurs avantages pour améliorer la gestion de projet et réduire les problèmes de dépendance.

See all articles

Outils d'IA chauds

Undresser.AI Undress

Undresser.AI Undress

Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover

AI Clothes Remover

Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool

Undress AI Tool

Images de déshabillage gratuites

Clothoff.io

Clothoff.io

Dissolvant de vêtements AI

AI Hentai Generator

AI Hentai Generator

Générez AI Hentai gratuitement.

Article chaud

R.E.P.O. Crystals d'énergie expliqués et ce qu'ils font (cristal jaune)
3 Il y a quelques semainesBy尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Meilleurs paramètres graphiques
3 Il y a quelques semainesBy尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Comment réparer l'audio si vous n'entendez personne
3 Il y a quelques semainesBy尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: Comment déverrouiller tout dans Myrise
3 Il y a quelques semainesBy尊渡假赌尊渡假赌尊渡假赌

Outils chauds

Version crackée d'EditPlus en chinois

Version crackée d'EditPlus en chinois

Petite taille, coloration syntaxique, ne prend pas en charge la fonction d'invite de code

Navigateur d'examen sécurisé

Navigateur d'examen sécurisé

Safe Exam Browser est un environnement de navigation sécurisé permettant de passer des examens en ligne en toute sécurité. Ce logiciel transforme n'importe quel ordinateur en poste de travail sécurisé. Il contrôle l'accès à n'importe quel utilitaire et empêche les étudiants d'utiliser des ressources non autorisées.

mPDF

mPDF

mPDF est une bibliothèque PHP qui peut générer des fichiers PDF à partir de HTML encodé en UTF-8. L'auteur original, Ian Back, a écrit mPDF pour générer des fichiers PDF « à la volée » depuis son site Web et gérer différentes langues. Il est plus lent et produit des fichiers plus volumineux lors de l'utilisation de polices Unicode que les scripts originaux comme HTML2FPDF, mais prend en charge les styles CSS, etc. et présente de nombreuses améliorations. Prend en charge presque toutes les langues, y compris RTL (arabe et hébreu) ​​et CJK (chinois, japonais et coréen). Prend en charge les éléments imbriqués au niveau du bloc (tels que P, DIV),

SublimeText3 version Mac

SublimeText3 version Mac

Logiciel d'édition de code au niveau de Dieu (SublimeText3)

MantisBT

MantisBT

Mantis est un outil Web de suivi des défauts facile à déployer, conçu pour faciliter le suivi des défauts des produits. Cela nécessite PHP, MySQL et un serveur Web. Découvrez nos services de démonstration et d'hébergement.