Maison >développement back-end >Tutoriel Python >Améliorer les applications GenAI avec KubeMQ : mise à l'échelle efficace de la génération augmentée par récupération (RAG)
Alors que l'adoption de l'IA générative (GenAI) se développe dans tous les secteurs, les organisations exploitent de plus en plus les techniques de génération augmentée par récupération (RAG) pour renforcer leurs modèles d'IA avec des données en temps réel et riches en contexte. données. La gestion du flux complexe d'informations dans de telles applications pose des défis importants, en particulier lorsqu'il s'agit de données générées en continu à grande échelle. KubeMQ, un courtier de messages robuste, apparaît comme une solution pour rationaliser le routage de plusieurs processus RAG, garantissant une gestion efficace des données dans les applications GenAI.
Pour améliorer encore l'efficacité et l'évolutivité des flux de travail RAG, l'intégration d'une base de données hautes performances telle que FalkorDB est essentielle. FalkorDB fournit une solution de stockage fiable et évolutive pour les bases de connaissances dynamiques dont dépendent les systèmes RAG, garantissant une récupération rapide des données et une intégration transparente avec des systèmes de messagerie comme KubeMQ.
RAG est un paradigme qui améliore les modèles d'IA génératifs en intégrant un mécanisme de récupération, permettant aux modèles d'accéder à des bases de connaissances externes lors de l'inférence. Cette approche améliore considérablement l'exactitude, la pertinence et la rapidité des réponses générées en les fondant sur les informations disponibles les plus récentes et les plus pertinentes.
Dans les flux de travail GenAI typiques employant RAG, le processus comporte plusieurs étapes :
Traitement des requêtes : interprétation de la saisie de l'utilisateur pour comprendre l'intention et le contexte
Récupération : Récupération de documents ou de données pertinents à partir d'une base de connaissances dynamique, telle que FalkorDB, qui garantit un accès rapide et efficace aux informations les plus récentes et pertinentes.
Génération : Produire une réponse en utilisant à la fois les données saisies et les données récupérées
Livraison de la réponse : fourniture du résultat final enrichi à l'utilisateur
La mise à l'échelle de ces étapes, en particulier dans les environnements où les données sont générées et mises à jour en permanence, nécessite un mécanisme efficace et fiable pour le flux de données entre les différents composants du pipeline RAG.
Dans des scénarios tels que les réseaux IoT, les plateformes de médias sociaux ou les systèmes d'analyse en temps réel, de nouvelles données sont sans cesse produites et les modèles d'IA doivent s'adapter rapidement pour intégrer ces informations. Les architectures requête-réponse traditionnelles peuvent devenir des goulots d'étranglement dans des conditions de haut débit, entraînant des problèmes de latence et une dégradation des performances.
KubeMQ gère des scénarios de messagerie à haut débit en fournissant une infrastructure évolutive et robuste pour un routage efficace des données entre les services. En intégrant KubeMQ dans le pipeline RAG, chaque nouveau point de données est publié dans une file d'attente ou un flux de messages, garantissant ainsi que les composants de récupération ont un accès immédiat aux dernières informations sans surcharger le système. Cette capacité de traitement des données en temps réel est cruciale pour maintenir la pertinence et l'exactitude des résultats de GenAI.
KubeMQ offre une variété de modèles de messagerie, notamment les files d'attente, les flux, la publication-abonnement (pub/sub) et les appels de procédure à distance (RPC), ce qui en fait un routeur polyvalent et puissant au sein d'un pipeline RAG. Sa faible latence et ses caractéristiques hautes performances garantissent une livraison rapide des messages, ce qui est essentiel pour les applications GenAI en temps réel où les retards peuvent avoir un impact significatif sur l'expérience utilisateur et l'efficacité du système.
De plus, la capacité de KubeMQ à gérer une logique de routage complexe permet des stratégies sophistiquées de distribution de données. Cela garantit que les différents composants du système d'IA reçoivent précisément les données dont ils ont besoin, quand ils en ont besoin, sans duplication ni retard inutiles.
Alors que KubeMQ achemine efficacement les messages entre les services, FalkorDB complète cela en fournissant une solution de base de données graphique évolutive et hautes performances pour stocker et récupérer les grandes quantités de données requises par les processus RAG. Cette intégration garantit que lorsque les nouvelles données transitent par KubeMQ, elles sont stockées de manière transparente dans FalkorDB, ce qui les rend facilement disponibles pour les opérations de récupération sans introduire de latence ni de goulots d'étranglement.
À mesure que les applications GenAI augmentent à la fois en termes de base d'utilisateurs et de volume de données, l'évolutivité devient une préoccupation primordiale. KubeMQ est évolutif et prend en charge la mise à l'échelle horizontale pour s'adapter de manière transparente à une charge accrue. Il garantit qu'à mesure que le nombre de processus RAG augmente ou que la génération de données s'accélère, l'infrastructure de messagerie reste robuste et réactive.
De plus, KubeMQ assure la persistance des messages et la tolérance aux pannes. En cas de panne du système ou de perturbation du réseau, KubeMQ garantit que les messages ne sont pas perdus et que le système peut récupérer correctement. Cette fiabilité est essentielle au maintien de l'intégrité des applications d'IA dont les utilisateurs dépendent pour obtenir des informations précises et opportunes.
La mise en œuvre de services de routage personnalisés pour le traitement des données dans les pipelines RAG peut être complexe et gourmande en ressources. La création, la maintenance et la mise à l'échelle de ces services nécessitent souvent des efforts de développement importants, ce qui détourne l'attention du développement d'applications d'IA de base.
En adoptant KubeMQ, les organisations éliminent le besoin de créer des solutions de routage sur mesure. KubeMQ fournit des fonctionnalités prêtes à l'emploi qui répondent aux besoins de routage des processus RAG, notamment des modèles de routage complexes, le filtrage des messages et la gestion des priorités. Cela réduit non seulement les frais de développement et de maintenance, mais accélère également la mise sur le marché des solutions GenAI.
KubeMQ propose plusieurs interfaces pour interagir avec ses capacités de courtier de messages :
API REST : permet une intégration indépendante du langage, permettant aux services écrits dans n'importe quel langage de programmation d'envoyer et de recevoir des messages via HTTP
SDK : fournit des bibliothèques client pour divers langages de programmation (tels que Python, Java, Go et .NET), facilitant des modèles de communication plus efficaces et de meilleures performances grâce à des intégrations natives
Cette flexibilité permet aux développeurs de choisir la méthode la plus appropriée pour leur cas d'utilisation spécifique, simplifiant l'architecture et accélérant les cycles de développement. Un point de contact unique pour le routage des données rationalise la communication entre les différents composants du pipeline RAG, améliorant ainsi la cohérence globale du système.
L'exemple de code montre comment créer un système de récupération d'informations sur les films en intégrant KubeMQ dans un pipeline RAG. Il configure un serveur qui ingère les URL de films de Rotten Tomatoes pour créer un graphique de connaissances à l'aide de GPT-4. Les utilisateurs peuvent interagir avec ce système via un client de chat, en envoyant des requêtes liées aux films et en recevant des réponses générées par l'IA. Ce cas d'utilisation montre comment gérer l'ingestion continue de données et le traitement des requêtes en temps réel dans une application pratique, en utilisant KubeMQ pour une gestion efficace des messages et une communication interservices dans le contexte de films.
Service d'ingestion de données : capture et publie de nouvelles données dans les flux KubeMQ dès qu'elles deviennent disponibles
Service de récupération : Abonnez-vous au flux KubeMQ pour recevoir les mises à jour et actualiser la base de connaissances
Service de génération : écoute les demandes de requête, interagit avec le modèle d'IA et génère des réponses
Service de réponse : renvoie les réponses générées aux utilisateurs via les canaux appropriés
Assurez-vous que KubeMQ est opérationnel, ce qui peut être réalisé en le déployant à l'aide de Docker :
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
Cette commande démarre KubeMQ avec les ports nécessaires exposés pour les communications REST et gRPC.
Ce code (dépôt GitHub) implémente un serveur RAG qui traite les requêtes de chat et gère les sources de connaissances à l'aide de KubeMQ pour la gestion des messages.
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
Le serveur exécute deux threads principaux : un qui s'abonne aux requêtes de chat via un canal appelé "rag-chat-query" et les traite à l'aide d'un graphe de connaissances avec GPT-4, et un autre qui extrait continuellement d'une file d'attente appelée "rag -sources-queue" pour ajouter de nouvelles sources au graphe de connaissances. Le graphe de connaissances est initialisé avec une ontologie personnalisée chargée à partir d'un fichier JSON et utilise le modèle GPT-4 d'OpenAI pour le traitement. Le serveur implémente une gestion des arrêts et des erreurs en douceur, garantissant que tous les threads sont correctement terminés lorsque le serveur est arrêté.
# server.py import json import threading from typing import List from dotenv import load_dotenv load_dotenv() import time from kubemq.common import CancellationToken from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription from kubemq.queues import Client as QueuesClient from graphrag_sdk.models.openai import OpenAiGenerativeModel from graphrag_sdk.model_config import KnowledgeGraphModelConfig from graphrag_sdk import KnowledgeGraph, Ontology from graphrag_sdk.source import URL class RAGServer: def __init__(self): self.cq_client = CQClient(address="localhost:50000") self.queues_client = QueuesClient(address="localhost:50000") model = OpenAiGenerativeModel(model_name="gpt-4o") with open("ontology.json", "r") as f: ontology = json.load(f) ontology = Ontology.from_json(ontology) self.kg = KnowledgeGraph( name="movies", model_config=KnowledgeGraphModelConfig.with_model(model), ontology=ontology) self.chat = self.kg.chat_session() self.shutdown_event = threading.Event() self.threads: List[threading.Thread] = [] def handle_chat(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') print(f"Received chat message: {message}") result= self.chat.send_message(message) answer = result.get("response","No answer") print(f"Chat response: {answer}") response = QueryResponseMessage( query_received=request, is_executed=True, body=answer.encode('utf-8') ) self.cq_client.send_response_message(response) except Exception as e: print(f"Error processing chat message: {str(e)}") self.cq_client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def pull_from_queue(self): while not self.shutdown_event.is_set(): try: result = self.queues_client.pull("rag-sources-queue", 10, 1) if result.is_error: print(f"Error pulling message from queue: {result.error}") continue sources = [] for message in result.messages: source = message.body.decode('utf-8') print(f"Received source: {source}, adding to knowledge graph") sources.append(URL(message.body.decode('utf-8'))) if sources: self.kg.process_sources(sources) except Exception as e: if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error processing sources: {str(e)}") def subscribe_to_chat_queries(self): def on_error(err: str): if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error: {err}") cancellation_token = CancellationToken() try: self.cq_client.subscribe_to_queries( subscription=QueriesSubscription( channel="rag-chat-query", on_receive_query_callback=self.handle_chat, on_error_callback=on_error, ), cancel=cancellation_token ) # Wait for shutdown signal while not self.shutdown_event.is_set(): time.sleep(0.1) # Cancel subscription when shutdown is requested cancellation_token.cancel() except Exception as e: if not self.shutdown_event.is_set(): print(f"Error in subscription thread: {str(e)}") def run(self): chat_thread = threading.Thread(target=self.subscribe_to_chat_queries) queue_thread = threading.Thread(target=self.pull_from_queue) self.threads.extend([chat_thread, queue_thread]) for thread in self.threads: thread.daemon = True # Make threads daemon so they exit when main thread exits thread.start() print("RAG server started") try: while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down gracefully...") self.shutdown() self.cq_client.close() self.queues_client.close() def shutdown(self): print("Initiating shutdown sequence...") self.shutdown_event.set() # Signal all threads to stop for thread in self.threads: thread.join(timeout=5.0) # Wait up to 5 seconds for each thread if thread.is_alive(): print(f"Warning: Thread {thread.name} did not shutdown cleanly") print("Shutdown complete") if __name__ == "__main__": rag_server = RAGServer() rag_server.run()
Ce code implémente un client simple qui envoie les URL des films au serveur RAG via le système de file d'attente de KubeMQ. Plus précisément, il crée une classe SourceClient qui se connecte à KubeMQ et envoie des messages au canal « rag-sources-queue », qui est la même file d'attente que celle surveillée par le serveur RAG. Lorsqu'il est exécuté en tant que programme principal, il envoie une liste d'URL de films Rotten Tomatoes (y compris les films Matrix, John Wick et Speed) à traiter et à ajouter au graphique de connaissances par le serveur RAG.
# sources_client.py from kubemq.queues import * class SourceClient: def __init__(self, address="localhost:50000"): self.client = Client(address=address) def send_source(self, message: str) : send_result = self.client.send_queues_message( QueueMessage( channel="rag-sources-queue", body=message.encode("utf-8"), ) ) if send_result.is_error: print(f"message send error, error:{send_result.error}") if __name__ == "__main__": client = SourceClient() urls = ["https://www.rottentomatoes.com/m/side_by_side_2012", "https://www.rottentomatoes.com/m/matrix", "https://www.rottentomatoes.com/m/matrix_revolutions", "https://www.rottentomatoes.com/m/matrix_reloaded", "https://www.rottentomatoes.com/m/speed_1994", "https://www.rottentomatoes.com/m/john_wick_chapter_4"] for url in urls: client.send_source(url) print("done")
Ce code implémente un client de chat qui communique avec le serveur RAG via le système de requêtes de KubeMQ. La classe ChatClient envoie des messages au canal "rag-chat-query" et attend les réponses, avec un délai d'expiration de 30 secondes pour chaque requête. Lorsqu'il est exécuté en tant que programme principal, il démontre la fonctionnalité du client en envoyant deux questions connexes sur le réalisateur de Matrix et sa connexion à Keanu Reeves, en imprimant chaque réponse au fur et à mesure qu'il les reçoit.
Tous les exemples de code peuvent être trouvés dans mon fork du référentiel GitHub d'origine.
L'intégration de KubeMQ dans les pipelines RAG pour les applications GenAI fournit un mécanisme évolutif, fiable et efficace pour gérer les flux de données continus et les communications inter-processus complexes. En servant de routeur unifié avec des modèles de messagerie polyvalents, KubeMQ simplifie l'architecture globale, réduit le besoin de solutions de routage personnalisées et accélère les cycles de développement.
De plus, l'intégration de FalkorDB améliore la gestion des données en offrant une base de connaissances hautes performances qui s'intègre parfaitement à KubeMQ. Cette combinaison garantit une récupération et un stockage optimisés des données, prenant en charge les exigences dynamiques des processus RAG.
La capacité à gérer des scénarios à haut débit, combinée à des fonctionnalités telles que la persistance et la tolérance aux pannes, garantit que les applications GenAI restent réactives et fiables, même sous de lourdes charges ou face à des perturbations du système.
En tirant parti de KubeMQ et FalkorDB, les organisations peuvent se concentrer sur l'amélioration de leurs modèles d'IA et fournir des informations et des services précieux, en étant sûres que leur infrastructure de routage de données est robuste et capable de répondre aux exigences des flux de travail d'IA modernes.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!