


Alors que l'adoption de l'IA générative (GenAI) se développe dans tous les secteurs, les organisations exploitent de plus en plus les techniques de génération augmentée par récupération (RAG) pour renforcer leurs modèles d'IA avec des données en temps réel et riches en contexte. données. La gestion du flux complexe d'informations dans de telles applications pose des défis importants, en particulier lorsqu'il s'agit de données générées en continu à grande échelle. KubeMQ, un courtier de messages robuste, apparaît comme une solution pour rationaliser le routage de plusieurs processus RAG, garantissant une gestion efficace des données dans les applications GenAI.
Pour améliorer encore l'efficacité et l'évolutivité des flux de travail RAG, l'intégration d'une base de données hautes performances telle que FalkorDB est essentielle. FalkorDB fournit une solution de stockage fiable et évolutive pour les bases de connaissances dynamiques dont dépendent les systèmes RAG, garantissant une récupération rapide des données et une intégration transparente avec des systèmes de messagerie comme KubeMQ.
Comprendre RAG dans les workflows GenAI
RAG est un paradigme qui améliore les modèles d'IA génératifs en intégrant un mécanisme de récupération, permettant aux modèles d'accéder à des bases de connaissances externes lors de l'inférence. Cette approche améliore considérablement l'exactitude, la pertinence et la rapidité des réponses générées en les fondant sur les informations disponibles les plus récentes et les plus pertinentes.
Dans les flux de travail GenAI typiques employant RAG, le processus comporte plusieurs étapes :
Traitement des requêtes : interprétation de la saisie de l'utilisateur pour comprendre l'intention et le contexte
Récupération : Récupération de documents ou de données pertinents à partir d'une base de connaissances dynamique, telle que FalkorDB, qui garantit un accès rapide et efficace aux informations les plus récentes et pertinentes.
Génération : Produire une réponse en utilisant à la fois les données saisies et les données récupérées
Livraison de la réponse : fourniture du résultat final enrichi à l'utilisateur
La mise à l'échelle de ces étapes, en particulier dans les environnements où les données sont générées et mises à jour en permanence, nécessite un mécanisme efficace et fiable pour le flux de données entre les différents composants du pipeline RAG.
Le rôle critique de KubeMQ dans le traitement RAG
Gérer des flux de données continus à grande échelle
Dans des scénarios tels que les réseaux IoT, les plateformes de médias sociaux ou les systèmes d'analyse en temps réel, de nouvelles données sont sans cesse produites et les modèles d'IA doivent s'adapter rapidement pour intégrer ces informations. Les architectures requête-réponse traditionnelles peuvent devenir des goulots d'étranglement dans des conditions de haut débit, entraînant des problèmes de latence et une dégradation des performances.
KubeMQ gère des scénarios de messagerie à haut débit en fournissant une infrastructure évolutive et robuste pour un routage efficace des données entre les services. En intégrant KubeMQ dans le pipeline RAG, chaque nouveau point de données est publié dans une file d'attente ou un flux de messages, garantissant ainsi que les composants de récupération ont un accès immédiat aux dernières informations sans surcharger le système. Cette capacité de traitement des données en temps réel est cruciale pour maintenir la pertinence et l'exactitude des résultats de GenAI.
Servir de routeur optimal
KubeMQ offre une variété de modèles de messagerie, notamment les files d'attente, les flux, la publication-abonnement (pub/sub) et les appels de procédure à distance (RPC), ce qui en fait un routeur polyvalent et puissant au sein d'un pipeline RAG. Sa faible latence et ses caractéristiques hautes performances garantissent une livraison rapide des messages, ce qui est essentiel pour les applications GenAI en temps réel où les retards peuvent avoir un impact significatif sur l'expérience utilisateur et l'efficacité du système.
De plus, la capacité de KubeMQ à gérer une logique de routage complexe permet des stratégies sophistiquées de distribution de données. Cela garantit que les différents composants du système d'IA reçoivent précisément les données dont ils ont besoin, quand ils en ont besoin, sans duplication ni retard inutiles.
Intégration de FalkorDB pour une gestion améliorée des données
Alors que KubeMQ achemine efficacement les messages entre les services, FalkorDB complète cela en fournissant une solution de base de données graphique évolutive et hautes performances pour stocker et récupérer les grandes quantités de données requises par les processus RAG. Cette intégration garantit que lorsque les nouvelles données transitent par KubeMQ, elles sont stockées de manière transparente dans FalkorDB, ce qui les rend facilement disponibles pour les opérations de récupération sans introduire de latence ni de goulots d'étranglement.
Améliorer l'évolutivité et la fiabilité
À mesure que les applications GenAI augmentent à la fois en termes de base d'utilisateurs et de volume de données, l'évolutivité devient une préoccupation primordiale. KubeMQ est évolutif et prend en charge la mise à l'échelle horizontale pour s'adapter de manière transparente à une charge accrue. Il garantit qu'à mesure que le nombre de processus RAG augmente ou que la génération de données s'accélère, l'infrastructure de messagerie reste robuste et réactive.
De plus, KubeMQ assure la persistance des messages et la tolérance aux pannes. En cas de panne du système ou de perturbation du réseau, KubeMQ garantit que les messages ne sont pas perdus et que le système peut récupérer correctement. Cette fiabilité est essentielle au maintien de l'intégrité des applications d'IA dont les utilisateurs dépendent pour obtenir des informations précises et opportunes.
Éliminer le besoin de services de routage dédiés
La mise en œuvre de services de routage personnalisés pour le traitement des données dans les pipelines RAG peut être complexe et gourmande en ressources. La création, la maintenance et la mise à l'échelle de ces services nécessitent souvent des efforts de développement importants, ce qui détourne l'attention du développement d'applications d'IA de base.
En adoptant KubeMQ, les organisations éliminent le besoin de créer des solutions de routage sur mesure. KubeMQ fournit des fonctionnalités prêtes à l'emploi qui répondent aux besoins de routage des processus RAG, notamment des modèles de routage complexes, le filtrage des messages et la gestion des priorités. Cela réduit non seulement les frais de développement et de maintenance, mais accélère également la mise sur le marché des solutions GenAI.
Accès unifié via REST et SDK
KubeMQ propose plusieurs interfaces pour interagir avec ses capacités de courtier de messages :
API REST : permet une intégration indépendante du langage, permettant aux services écrits dans n'importe quel langage de programmation d'envoyer et de recevoir des messages via HTTP
SDK : fournit des bibliothèques client pour divers langages de programmation (tels que Python, Java, Go et .NET), facilitant des modèles de communication plus efficaces et de meilleures performances grâce à des intégrations natives
Cette flexibilité permet aux développeurs de choisir la méthode la plus appropriée pour leur cas d'utilisation spécifique, simplifiant l'architecture et accélérant les cycles de développement. Un point de contact unique pour le routage des données rationalise la communication entre les différents composants du pipeline RAG, améliorant ainsi la cohérence globale du système.
Implémentation de KubeMQ dans un pipeline RAG : un exemple détaillé
L'exemple de code montre comment créer un système de récupération d'informations sur les films en intégrant KubeMQ dans un pipeline RAG. Il configure un serveur qui ingère les URL de films de Rotten Tomatoes pour créer un graphique de connaissances à l'aide de GPT-4. Les utilisateurs peuvent interagir avec ce système via un client de chat, en envoyant des requêtes liées aux films et en recevant des réponses générées par l'IA. Ce cas d'utilisation montre comment gérer l'ingestion continue de données et le traitement des requêtes en temps réel dans une application pratique, en utilisant KubeMQ pour une gestion efficace des messages et une communication interservices dans le contexte de films.
Présentation de l'architecture
Service d'ingestion de données : capture et publie de nouvelles données dans les flux KubeMQ dès qu'elles deviennent disponibles
Service de récupération : Abonnez-vous au flux KubeMQ pour recevoir les mises à jour et actualiser la base de connaissances
Service de génération : écoute les demandes de requête, interagit avec le modèle d'IA et génère des réponses
Service de réponse : renvoie les réponses générées aux utilisateurs via les canaux appropriés
Configuration de KubeMQ
Assurez-vous que KubeMQ est opérationnel, ce qui peut être réalisé en le déployant à l'aide de Docker :
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
Cette commande démarre KubeMQ avec les ports nécessaires exposés pour les communications REST et gRPC.
Côté serveur RAG
Ce code (dépôt GitHub) implémente un serveur RAG qui traite les requêtes de chat et gère les sources de connaissances à l'aide de KubeMQ pour la gestion des messages.
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
Le serveur exécute deux threads principaux : un qui s'abonne aux requêtes de chat via un canal appelé "rag-chat-query" et les traite à l'aide d'un graphe de connaissances avec GPT-4, et un autre qui extrait continuellement d'une file d'attente appelée "rag -sources-queue" pour ajouter de nouvelles sources au graphe de connaissances. Le graphe de connaissances est initialisé avec une ontologie personnalisée chargée à partir d'un fichier JSON et utilise le modèle GPT-4 d'OpenAI pour le traitement. Le serveur implémente une gestion des arrêts et des erreurs en douceur, garantissant que tous les threads sont correctement terminés lorsque le serveur est arrêté.
Envoi de données sources à ingérer dans RAG Knowledge Graph
# server.py import json import threading from typing import List from dotenv import load_dotenv load_dotenv() import time from kubemq.common import CancellationToken from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription from kubemq.queues import Client as QueuesClient from graphrag_sdk.models.openai import OpenAiGenerativeModel from graphrag_sdk.model_config import KnowledgeGraphModelConfig from graphrag_sdk import KnowledgeGraph, Ontology from graphrag_sdk.source import URL class RAGServer: def __init__(self): self.cq_client = CQClient(address="localhost:50000") self.queues_client = QueuesClient(address="localhost:50000") model = OpenAiGenerativeModel(model_name="gpt-4o") with open("ontology.json", "r") as f: ontology = json.load(f) ontology = Ontology.from_json(ontology) self.kg = KnowledgeGraph( name="movies", model_config=KnowledgeGraphModelConfig.with_model(model), ontology=ontology) self.chat = self.kg.chat_session() self.shutdown_event = threading.Event() self.threads: List[threading.Thread] = [] def handle_chat(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') print(f"Received chat message: {message}") result= self.chat.send_message(message) answer = result.get("response","No answer") print(f"Chat response: {answer}") response = QueryResponseMessage( query_received=request, is_executed=True, body=answer.encode('utf-8') ) self.cq_client.send_response_message(response) except Exception as e: print(f"Error processing chat message: {str(e)}") self.cq_client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def pull_from_queue(self): while not self.shutdown_event.is_set(): try: result = self.queues_client.pull("rag-sources-queue", 10, 1) if result.is_error: print(f"Error pulling message from queue: {result.error}") continue sources = [] for message in result.messages: source = message.body.decode('utf-8') print(f"Received source: {source}, adding to knowledge graph") sources.append(URL(message.body.decode('utf-8'))) if sources: self.kg.process_sources(sources) except Exception as e: if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error processing sources: {str(e)}") def subscribe_to_chat_queries(self): def on_error(err: str): if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error: {err}") cancellation_token = CancellationToken() try: self.cq_client.subscribe_to_queries( subscription=QueriesSubscription( channel="rag-chat-query", on_receive_query_callback=self.handle_chat, on_error_callback=on_error, ), cancel=cancellation_token ) # Wait for shutdown signal while not self.shutdown_event.is_set(): time.sleep(0.1) # Cancel subscription when shutdown is requested cancellation_token.cancel() except Exception as e: if not self.shutdown_event.is_set(): print(f"Error in subscription thread: {str(e)}") def run(self): chat_thread = threading.Thread(target=self.subscribe_to_chat_queries) queue_thread = threading.Thread(target=self.pull_from_queue) self.threads.extend([chat_thread, queue_thread]) for thread in self.threads: thread.daemon = True # Make threads daemon so they exit when main thread exits thread.start() print("RAG server started") try: while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down gracefully...") self.shutdown() self.cq_client.close() self.queues_client.close() def shutdown(self): print("Initiating shutdown sequence...") self.shutdown_event.set() # Signal all threads to stop for thread in self.threads: thread.join(timeout=5.0) # Wait up to 5 seconds for each thread if thread.is_alive(): print(f"Warning: Thread {thread.name} did not shutdown cleanly") print("Shutdown complete") if __name__ == "__main__": rag_server = RAGServer() rag_server.run()
Ce code implémente un client simple qui envoie les URL des films au serveur RAG via le système de file d'attente de KubeMQ. Plus précisément, il crée une classe SourceClient qui se connecte à KubeMQ et envoie des messages au canal « rag-sources-queue », qui est la même file d'attente que celle surveillée par le serveur RAG. Lorsqu'il est exécuté en tant que programme principal, il envoie une liste d'URL de films Rotten Tomatoes (y compris les films Matrix, John Wick et Speed) à traiter et à ajouter au graphique de connaissances par le serveur RAG.
Envoyer et recevoir des questions et des réponses
# sources_client.py from kubemq.queues import * class SourceClient: def __init__(self, address="localhost:50000"): self.client = Client(address=address) def send_source(self, message: str) : send_result = self.client.send_queues_message( QueueMessage( channel="rag-sources-queue", body=message.encode("utf-8"), ) ) if send_result.is_error: print(f"message send error, error:{send_result.error}") if __name__ == "__main__": client = SourceClient() urls = ["https://www.rottentomatoes.com/m/side_by_side_2012", "https://www.rottentomatoes.com/m/matrix", "https://www.rottentomatoes.com/m/matrix_revolutions", "https://www.rottentomatoes.com/m/matrix_reloaded", "https://www.rottentomatoes.com/m/speed_1994", "https://www.rottentomatoes.com/m/john_wick_chapter_4"] for url in urls: client.send_source(url) print("done")
Ce code implémente un client de chat qui communique avec le serveur RAG via le système de requêtes de KubeMQ. La classe ChatClient envoie des messages au canal "rag-chat-query" et attend les réponses, avec un délai d'expiration de 30 secondes pour chaque requête. Lorsqu'il est exécuté en tant que programme principal, il démontre la fonctionnalité du client en envoyant deux questions connexes sur le réalisateur de Matrix et sa connexion à Keanu Reeves, en imprimant chaque réponse au fur et à mesure qu'il les reçoit.
Référentiel de codes
Tous les exemples de code peuvent être trouvés dans mon fork du référentiel GitHub d'origine.
Conclusion
L'intégration de KubeMQ dans les pipelines RAG pour les applications GenAI fournit un mécanisme évolutif, fiable et efficace pour gérer les flux de données continus et les communications inter-processus complexes. En servant de routeur unifié avec des modèles de messagerie polyvalents, KubeMQ simplifie l'architecture globale, réduit le besoin de solutions de routage personnalisées et accélère les cycles de développement.
De plus, l'intégration de FalkorDB améliore la gestion des données en offrant une base de connaissances hautes performances qui s'intègre parfaitement à KubeMQ. Cette combinaison garantit une récupération et un stockage optimisés des données, prenant en charge les exigences dynamiques des processus RAG.
La capacité à gérer des scénarios à haut débit, combinée à des fonctionnalités telles que la persistance et la tolérance aux pannes, garantit que les applications GenAI restent réactives et fiables, même sous de lourdes charges ou face à des perturbations du système.
En tirant parti de KubeMQ et FalkorDB, les organisations peuvent se concentrer sur l'amélioration de leurs modèles d'IA et fournir des informations et des services précieux, en étant sûres que leur infrastructure de routage de données est robuste et capable de répondre aux exigences des flux de travail d'IA modernes.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Solution aux problèmes d'autorisation Lors de la visualisation de la version Python dans Linux Terminal Lorsque vous essayez d'afficher la version Python dans Linux Terminal, entrez Python ...

Cet article explique comment utiliser la belle soupe, une bibliothèque Python, pour analyser HTML. Il détaille des méthodes courantes comme find (), find_all (), select () et get_text () pour l'extraction des données, la gestion de diverses structures et erreurs HTML et alternatives (Sel

Cet article compare TensorFlow et Pytorch pour l'apprentissage en profondeur. Il détaille les étapes impliquées: préparation des données, construction de modèles, formation, évaluation et déploiement. Différences clés entre les cadres, en particulier en ce qui concerne le raisin informatique

Lorsque vous utilisez la bibliothèque Pandas de Python, comment copier des colonnes entières entre deux frames de données avec différentes structures est un problème courant. Supposons que nous ayons deux dats ...

Cet article guide les développeurs Python sur la construction d'interfaces de ligne de commande (CLI). Il détaille à l'aide de bibliothèques comme Typer, Click et Argparse, mettant l'accent sur la gestion des entrées / sorties et promouvant des modèles de conception conviviaux pour une meilleure convivialité par la CLI.

L'article traite des bibliothèques Python populaires comme Numpy, Pandas, Matplotlib, Scikit-Learn, Tensorflow, Django, Flask et Demandes, détaillant leurs utilisations dans le calcul scientifique, l'analyse des données, la visualisation, l'apprentissage automatique, le développement Web et H et H

Les expressions régulières sont des outils puissants pour la correspondance des motifs et la manipulation du texte dans la programmation, améliorant l'efficacité du traitement de texte sur diverses applications.

L'article traite du rôle des environnements virtuels dans Python, en se concentrant sur la gestion des dépendances du projet et l'évitement des conflits. Il détaille leur création, leur activation et leurs avantages pour améliorer la gestion de projet et réduire les problèmes de dépendance.


Outils d'IA chauds

Undresser.AI Undress
Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover
Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool
Images de déshabillage gratuites

Clothoff.io
Dissolvant de vêtements AI

AI Hentai Generator
Générez AI Hentai gratuitement.

Article chaud

Outils chauds

Version crackée d'EditPlus en chinois
Petite taille, coloration syntaxique, ne prend pas en charge la fonction d'invite de code

Navigateur d'examen sécurisé
Safe Exam Browser est un environnement de navigation sécurisé permettant de passer des examens en ligne en toute sécurité. Ce logiciel transforme n'importe quel ordinateur en poste de travail sécurisé. Il contrôle l'accès à n'importe quel utilitaire et empêche les étudiants d'utiliser des ressources non autorisées.

mPDF
mPDF est une bibliothèque PHP qui peut générer des fichiers PDF à partir de HTML encodé en UTF-8. L'auteur original, Ian Back, a écrit mPDF pour générer des fichiers PDF « à la volée » depuis son site Web et gérer différentes langues. Il est plus lent et produit des fichiers plus volumineux lors de l'utilisation de polices Unicode que les scripts originaux comme HTML2FPDF, mais prend en charge les styles CSS, etc. et présente de nombreuses améliorations. Prend en charge presque toutes les langues, y compris RTL (arabe et hébreu) et CJK (chinois, japonais et coréen). Prend en charge les éléments imbriqués au niveau du bloc (tels que P, DIV),

SublimeText3 version Mac
Logiciel d'édition de code au niveau de Dieu (SublimeText3)

MantisBT
Mantis est un outil Web de suivi des défauts facile à déployer, conçu pour faciliter le suivi des défauts des produits. Cela nécessite PHP, MySQL et un serveur Web. Découvrez nos services de démonstration et d'hébergement.