Maison > Article > développement back-end > Utilisation de Python RabbitMQ (2)
L'article précédent présentait l'installation de RabbitMQ et le classique Hello World ! Exemple. Ici, nous aurons une compréhension des files d’attente de travail. Parce qu'il s'agit d'une suite de l'article précédent, si vous n'avez pas lu l'article précédent, cet article peut être difficile à comprendre. L'adresse de l'article précédent est : Comment installer RabbitMQ et Python sur Ubuntu
Les messages peuvent également être compris comme des tâches, et l'expéditeur du message peut être compris comme l'allocateur de tâches et le destinataire du message.Il peut être compris comme un travailleur.Lorsque le travailleur reçoit une tâche et ne l'a pas terminée, l'allocateur de tâches envoie une autre tâche, et elle est trop occupée, donc plusieurs travailleurs sont nécessaires pour gérer ces tâches ensemble. sont appelées files d’attente de travail. Le diagramme de structure est le suivant :
file d'attente de travail de l'instance Python de Rabbitmq
Préparation
Dans l'exemple de programme, utilisez new_task.py pour simuler l'allocateur de tâches et worker.py pour simuler le travailleur.
Modifiez send.py pour recevoir les informations des paramètres de ligne de commande et envoyez
import sys message= ' '.join(sys.argv[1:])or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
Modifiez la fonction de rappel de recevoir.py.
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
Ouvrez deux terminaux ici, tous deux exécutent Worker.py et sont en état d'écoute. Cela équivaut à deux Workers. Ouvrez le troisième terminal, exécutez new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
et observez que worker.py reçoit des tâches :
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
Un autre travailleur a reçu 2 tâches :
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'Du point de vue ci-dessus, chaque travailleur se verra attribuer des tâches à tour de rôle. Ainsi, si un travailleur décède pendant l’exécution d’une tâche, celle-ci n’est pas terminée et doit être confiée à d’autres travailleurs. Il devrait donc y avoir un mécanisme qui fournira un retour d'information lorsqu'un travailleur termine une tâche. Accusé de réception du message
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag= method.delivery_tag)
Faites une pause ici pendant 5 secondes pour faciliter la sortie de ctrl c.
Vous pouvez également supprimer le paramètre no_ack=True ou le définir sur False.
channel.basic_consume(callback, queue='hello', no_ack=False)Exécutez avec ce code, même si l'un des Workers ctrl c quitte, les tâches en cours d'exécution ne seront pas perdues et Rabbitmq redistribuera les tâches aux autres Workers.
channel.queue_declare(queue='hello', durable=True)Mais ce programme exécutera une erreur car la file d'attente hello existe déjà et est non persistante. RabbitMQ ne permet pas l'utilisation de différents paramètres pour redéfinir les files d'attente existantes. Redéfinir une file d'attente :
channel.queue_declare(queue='task_queue', durable=True)Lors de l'envoi d'une tâche, utilisez delivery_mode=2 pour marquer la tâche comme stockage persistant :
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode= 2,# make message persistent ))Répartition équitable
channel.basic_qos(prefetch_count=1)code complet de new_task.py
#!/usr/bin/env python import pika import sys connection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel= connection.channel() channel.queue_declare(queue='task_queue', durable=True) message= ' '.join(sys.argv[1:])or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode= 2,# make message persistent )) print " [x] Sent %r" % (message,) connection.close()code complet de worker.py
#!/usr/bin/env python import pika import time connection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel= connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag= method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()Ce qui précède est le contenu de l'utilisation de Python Rabbitmq (2), plus Pour plus de contenu connexe, veuillez faire attention au site Web PHP chinois (www.php.cn) !