Maison > Article > développement back-end > Python exploite le retour des résultats à distance de la file d'attente de messages du serveur RabbitMQ
RabbitMQ est un serveur basé sur MQ. Python peut être utilisé pour le contrôle de programme via la bibliothèque Pika. Nous expliquerons ici en détail le retour des résultats à distance du fonctionnement de Python de la file d'attente de messages du serveur RabbitMQ :
Allons-y. parlez d'abord du test de l'auteur. Environnement : Ubuntu14.04 Python 2.7.4
Serveur RabbitMQ
sudo apt-get install rabbitmq-server
Python utilisant RabbitMQ nécessite la bibliothèque Pika
sudo pip install pika
Retour du résultat à distance
Aucun résultat n'est renvoyé une fois que l'expéditeur du message a envoyé le message. Si vous envoyez simplement un message, il n'y a bien sûr aucun problème, mais dans la pratique, le destinataire doit souvent traiter le message reçu et le renvoyer à l'expéditeur.
Description de la méthode de traitement : Avant d'envoyer des informations, l'expéditeur génère une file d'attente temporaire pour recevoir les messages. Cette file d'attente est utilisée pour recevoir les résultats renvoyés. En fait, les concepts d'extrémité réceptrice et d'expéditeur sont ici relativement flous, car l'extrémité émettrice doit également recevoir des messages, et l'extrémité réceptrice doit également envoyer des messages, j'utilise donc ici un autre exemple pour démontrer ce processus.
Exemple de contenu : Supposons qu'il existe un centre de contrôle et un nœud informatique. Le centre de contrôle enverra un nombre naturel N au nœud informatique. Le nœud informatique ajoute 1 à la valeur N et le renvoie au centre de contrôle. . Ici, center.py est utilisé pour simuler le centre de contrôle et computation.py est utilisé pour simuler les nœuds informatiques.
Analyse du code compute.py
#!/usr/bin/env python #coding=utf8 import pika #连接rabbitmq服务器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #定义队列 channel.queue_declare(queue='compute_queue') print ' [*] Waiting for n' #将n值加1 def increase(n): return n + 1 #定义接收到消息的处理方法 def request(ch, method, properties, body): print " [.] increase(%s)" % (body,) response = increase(int(body)) #将计算结果发送回控制中心 ch.basic_publish(exchange='', routing_key=properties.reply_to, body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(request, queue='compute_queue') channel.start_consuming()
Le code du nœud de calcul est relativement simple. Il convient de mentionner que la réception d'origine. La méthode est Le message est imprimé directement, le calcul de plus un est effectué ici et le résultat est renvoyé au centre de contrôle.
Analyse du code center.py
#!/usr/bin/env python #coding=utf8 import pika class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #定义接收返回消息的队列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #定义接收到返回消息的处理方法 def on_response(self, ch, method, props, body): self.response = body def request(self, n): self.response = None #发送计算请求,并声明返回队列 self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, ), body=str(n)) #接收返回的数据 while self.response is None: self.connection.process_data_events() return int(self.response) center = Center() print " [x] Requesting increase(30)" response = center.request(30) print " [.] Got %r" % (response,)
L'exemple de code ci-dessus définit la file d'attente et la méthode de traitement pour recevoir les données renvoyées, et envoie la requête Lors de l'affectation de la file d'attente à réponse_to, ce paramètre permet d'obtenir la file d'attente de retour dans le code du nœud de calcul.
Ouvrez deux terminaux, l'un exécute le code python calculate.py et l'autre terminal exécute center.py Si l'exécution réussit, vous devriez pouvoir voir l'effet.
Pendant le test, l'auteur a rencontré quelques problèmes mineurs, c'est-à-dire que la file d'attente de retour n'a pas été spécifiée lorsque center.py a envoyé le message. En conséquence, calculate.py a signalé une erreur lors du renvoi des données après. calcul des résultats, indiquant que la clé de routage n'a pas été trouvée et qu'une erreur est signalée lors de la nouvelle exécution. Utilisez Rabbitmqctl list_queues pour vérifier la file d'attente et constater qu'il y a 1 élément de données dans la file d'attente Compute_queue. Ces données seront retraitées à chaque fois que Compute.py est réexécuté. Plus tard, j'ai utilisé /etc/init.d/rabbitmq-server restart pour redémarrer Rabbitmq et tout allait bien.
Identifiant de corrélation
L'exemple précédent a montré un exemple de retour de résultat à distance, mais une chose n'a pas été mentionnée, à savoir l'identifiant de corrélation. Qu'est-ce que c'est ?
Supposons qu'il existe plusieurs nœuds de calcul et que le centre de contrôle démarre plusieurs threads, envoie des nombres à ces nœuds de calcul, exige les résultats des calculs et les renvoie, mais le centre de contrôle n'ouvre qu'une seule file d'attente et tous les threads démarrent à partir de cette file d'attente. Pour obtenir un message, comment chaque thread détermine-t-il que le message reçu correspond à ce thread ? Il s'agit de l'utilisation de l'identifiant de corrélation. La corrélation se traduit en chinois par corrélation mutuelle, ce qui exprime également ce sens.
Principe de fonctionnement de l'identifiant de corrélation : le centre de contrôle définit l'identifiant de corrélation lors de l'envoi d'une demande de calcul, puis le nœud de calcul renvoie le résultat du calcul avec l'identifiant de corrélation reçu, afin que le centre de contrôle puisse identifier la demande via l'identifiant de corrélation. En fait, l'identifiant de corrélation peut également être compris comme le code d'identification unique de la requête.
Exemple de contenu : le centre de contrôle démarre plusieurs threads et chaque thread lance une demande de calcul grâce à l'identifiant de corrélation, chaque thread peut recevoir avec précision les résultats de calcul correspondants.
Analyse du code compute.py
Par rapport à l'article précédent, un seul endroit doit être modifié : lors du renvoi des résultats du calcul au centre de contrôle, ajoutez le paramètre corrélation_id, la valeur de ce paramètre En fait, il a été envoyé depuis le centre de contrôle, et il vient d'être renvoyé. Le code est le suivant :
#!/usr/bin/env python #coding=utf8 import pika #连接rabbitmq服务器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #定义队列 channel.queue_declare(queue='compute_queue') print ' [*] Waiting for n' #将n值加1 def increase(n): return n + 1 #定义接收到消息的处理方法 def request(ch, method, props, body): print " [.] increase(%s)" % (body,) response = increase(int(body)) #将计算结果发送回控制中心,增加correlation_id的设定 ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(request, queue='compute_queue') channel.start_consuming()
analyse du code center.py
Le code du centre de contrôle est légèrement plus compliqué, et il y a trois points clés :
Utilisez l'uuid de python pour générer un corrélation_id unique.
Lors de l'envoi d'une demande de calcul, définissez le paramètre corrélation_id.
Définissez un dictionnaire pour enregistrer les données renvoyées, et la valeur clé est le corrélation_id généré par le thread correspondant.
Le code est le suivant :
#!/usr/bin/env python #coding=utf8 import pika, threading, uuid #自定义线程类,继承threading.Thread class MyThread(threading.Thread): def __init__(self, func, num): super(MyThread, self).__init__() self.func = func self.num = num def run(self): print " [x] Requesting increase(%d)" % self.num response = self.func(self.num) print " [.] increase(%d)=%d" % (self.num, response) #控制中心类 class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #定义接收返回消息的队列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #返回的结果都会存储在该字典里 self.response = {} #定义接收到返回消息的处理方法 def on_response(self, ch, method, props, body): self.response[props.correlation_id] = body def request(self, n): corr_id = str(uuid.uuid4()) self.response[corr_id] = None #发送计算请求,并设定返回队列和correlation_id self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = corr_id, ), body=str(n)) #接收返回的数据 while self.response[corr_id] is None: self.connection.process_data_events() return int(self.response[corr_id]) center = Center() #发起5次计算请求 nums= [10, 20, 30, 40 ,50] threads = [] for num in nums: threads.append(MyThread(center.request, num)) for thread in threads: thread.start() for thread in threads: thread.join()
L'auteur a ouvert deux terminaux pour exécuter computing.py, a ouvert un terminal pour exécuter center.py, et enfin La capture d'écran de sortie du résultat est la suivante :
Vous pouvez voir que bien que les résultats obtenus ne soient pas sortis séquentiellement, les résultats correspondent aux données source.
L'exemple ici est de créer une file d'attente et d'utiliser l'identifiant de corrélation pour identifier chaque demande. Il existe également un moyen de ne pas utiliser l'identifiant de corrélation, qui consiste à créer une file d'attente temporaire à chaque fois qu'une requête est effectuée. Cependant, cela consomme trop de performances et n'est pas officiellement recommandé.
Pour plus d'articles sur Python exploitant la file d'attente de messages du serveur RabbitMQ et renvoyant des résultats à distance, veuillez faire attention au site Web PHP chinois !