Maison  >  Article  >  développement back-end  >  Utilisation de Python RabbitMQ (2)

Utilisation de Python RabbitMQ (2)

黄舟
黄舟original
2017-01-17 14:51:411281parcourir

L'article précédent présentait l'installation de RabbitMQ et le classique Hello World ! Exemple. Ici, nous aurons une compréhension des files d’attente de travail. Parce qu'il s'agit d'une suite de l'article précédent, si vous n'avez pas lu l'article précédent, cet article peut être difficile à comprendre. L'adresse de l'article précédent est : Comment installer RabbitMQ et Python sur Ubuntu


Les messages peuvent également être compris comme des tâches, et l'expéditeur du message peut être compris comme l'allocateur de tâches et le destinataire du message.Il peut être compris comme un travailleur.Lorsque le travailleur reçoit une tâche et ne l'a pas terminée, l'allocateur de tâches envoie une autre tâche, et elle est trop occupée, donc plusieurs travailleurs sont nécessaires pour gérer ces tâches ensemble. sont appelées files d’attente de travail. Le diagramme de structure est le suivant :

Utilisation de Python RabbitMQ (2)

file d'attente de travail de l'instance Python de Rabbitmq


Préparation


Dans l'exemple de programme, utilisez new_task.py pour simuler l'allocateur de tâches et worker.py pour simuler le travailleur.


Modifiez send.py pour recevoir les informations des paramètres de ligne de commande et envoyez

import sys
message= ' '.join(sys.argv[1:])or "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print " [x] Sent %r" % (message,)

Modifiez la fonction de rappel de recevoir.py.

import time
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"

Ouvrez deux terminaux ici, tous deux exécutent Worker.py et sont en état d'écoute. Cela équivaut à deux Workers. Ouvrez le troisième terminal, exécutez new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....

et observez que worker.py reçoit des tâches :

$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'

Un autre travailleur a reçu 2 tâches :

$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'
Du point de vue ci-dessus, chaque travailleur se verra attribuer des tâches à tour de rôle. Ainsi, si un travailleur décède pendant l’exécution d’une tâche, celle-ci n’est pas terminée et doit être confiée à d’autres travailleurs. Il devrait donc y avoir un mécanisme qui fournira un retour d'information lorsqu'un travailleur termine une tâche.

Accusé de réception du message


L'accusé de réception du message est lorsque le travailleur termine la tâche, il sera renvoyé à Rabbitmq. Modifiez la fonction de rappel dans worker.py :

def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep(5)
print " [x] Done"
ch.basic_ack(delivery_tag= method.delivery_tag)

Faites une pause ici pendant 5 secondes pour faciliter la sortie de ctrl c.

Vous pouvez également supprimer le paramètre no_ack=True ou le définir sur False.

channel.basic_consume(callback, queue='hello', no_ack=False)
Exécutez avec ce code, même si l'un des Workers ctrl c quitte, les tâches en cours d'exécution ne seront pas perdues et Rabbitmq redistribuera les tâches aux autres Workers.


Durabilité des messages (Durabilité des messages)


Bien qu'il existe un mécanisme de retour de message, si RabbitMQ lui-même se bloque Si vous laissez-le tomber, la mission sera toujours perdue. Par conséquent, les tâches doivent être stockées de manière persistante. Déclarer le stockage persistant :

channel.queue_declare(queue='hello', durable=True)
Mais ce programme exécutera une erreur car la file d'attente hello existe déjà et est non persistante. RabbitMQ ne permet pas l'utilisation de différents paramètres pour redéfinir les files d'attente existantes. Redéfinir une file d'attente :

channel.queue_declare(queue='task_queue', durable=True)
Lors de l'envoi d'une tâche, utilisez delivery_mode=2 pour marquer la tâche comme stockage persistant :


channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode= 2,# make message persistent
))
Répartition équitable


Dans l'exemple ci-dessus, bien que chaque travailleur soit affecté à des tâches à tour de rôle, chaque tâche n'est pas forcément le même. Certaines tâches peuvent être plus lourdes et prendre plus de temps à s'exécuter ; d'autres peuvent être plus légères et prendre plus de temps à s'exécuter. Il serait préférable qu'il puisse être planifié de manière équitable. Utilisez basic_qos pour définir prefetch_count=1 afin que Rabbitmq n'attribue pas plusieurs tâches aux travailleurs en même temps, ce n'est qu'une fois que le travailleur a terminé la tâche qu'il recevra à nouveau la tâche. .

channel.basic_qos(prefetch_count=1)
code complet de new_task.py

#!/usr/bin/env python
import pika
import sys
connection= pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel= connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message= ' '.join(sys.argv[1:])or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode= 2,# make message persistent
))
print " [x] Sent %r" % (message,)
connection.close()
code complet de worker.py

#!/usr/bin/env python
import pika
import time
connection= pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel= connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag= method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
Ce qui précède est le contenu de l'utilisation de Python Rabbitmq (2), plus Pour plus de contenu connexe, veuillez faire attention au site Web PHP chinois (www.php.cn) !


Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn