Maison >développement back-end >Tutoriel Python >Déploiement de l'environnement Python Pika RabbitMQ et mise en œuvre de la file d'attente de travail
La traduction chinoise de lapinmq fait principalement référence aux lettres mq : Message Queue, qui signifie file d'attente de messages. Il y a aussi le mot "lapin" devant, qui signifie lapin. C'est la même chose que le langage python appelé python. Les étrangers sont assez humoristiques. Le service RabbitMQ est similaire aux services MySQL et Apache, mais les fonctions fournies sont différentes. rabbimq est utilisé pour fournir un service d'envoi de messages, qui peut être utilisé pour communiquer entre différentes applications.
Installez Rabbitmq
Installez d'abord Rabbitmq Sous Ubuntu 12.04, vous pouvez l'installer directement via apt-get :
sudo apt-get install rabbitmq-server
Après l'installation, le service lapinmq a été démarré. Examinons ensuite un exemple d’écriture de Hello World en python. Le contenu de l'exemple est d'envoyer "Hello World!" de send.py à lapinmq, et recevoir.py reçoit les informations envoyées par send.py depuis lapinmq.
P signifie product, qui signifie producteur, et peut également être appelé expéditeur. Dans l'exemple, il est affiché sous la forme send.py ; , ce qui signifie consommateur. Signification, il peut également être appelé le récepteur, qui est représenté par recevoir.py dans l'exemple ; le rouge au milieu représente la signification de la file d'attente, qui est représentée par la file d'attente hello dans l'exemple ;
Python utilise le service Rabbitmq. Vous pouvez utiliser les bibliothèques de classes prêtes à l'emploi pika, txAMQP ou py-amqplib. Ici, j'ai choisi pika.
Installer pika
Pour installer pika, vous pouvez utiliser pip est un package de gestion de logiciel python. S'il n'est pas installé, vous pouvez l'installer via apt-. get
sudo apt-get install python-pip
Installer pika via pip :
sudo pip install pika
send.py Le code
se connecte au serveur Rabbitmq Parce qu'il est testé localement, utilisez simplement localhost.
connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
Déclare une file d'attente de messages dans laquelle les messages seront livrés. Si des messages sont envoyés vers une file d'attente qui n'existe pas, Rabbitmq effacera automatiquement ces messages.
channel.queue_declare(queue='hello')
Envoyer le message à la file d'attente hello déclarée ci-dessus, où échange représente l'échangeur, qui peut spécifier avec précision à quelle file d'attente le message doit être envoyé, et router_key est défini sur la file d'attente. Le nom et le corps sont le contenu à envoyer. Nous ne prêterons pas attention aux détails spécifiques de l'envoi pour l'instant.
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
Fermer la connexion
connection.close()
Code complet
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()
Exécutez d'abord ce programme. Si l'exécution réussit, Rabbitmqctl devrait ajouter avec succès la file d'attente hello et il devrait y avoir un message dans la file d'attente. Utilisez la commande Rabbitmqctl. pour le vérifier.
rabbitmqctl list_queues
Sortez les informations suivantes sur l'ordinateur de l'auteur :
Il y a bien une file d'attente bonjour, et il y a un message dans la file d'attente. Ensuite, utilisez recevoir.py pour obtenir les informations dans la file d'attente.
le code receive.py
est le même que les deux étapes précédentes de send.py, qui nécessitent toutes deux d'abord de se connecter au serveur, puis de déclarer la file d'attente des messages , qui ne sera pas discuté ici. Publié le même code.
La réception de messages est plus compliquée et vous devez définir une fonction de rappel pour les traiter. La fonction de rappel ici consiste à imprimer les informations.
def callback(ch, method, properties, body): print "Received %r" % (body,)
Dites à Rabbitmq d'utiliser le rappel pour recevoir des messages
channel.basic_consume(callback, queue='hello', no_ack=True)
Commencez à recevoir des informations et entrez dans l'état de blocage. Ce n'est que lorsqu'il y a des informations dans la file d'attente que le rappel sera appelé pour traitement. Appuyez sur ctrl c pour quitter.
channel.start_consuming()
Code complet
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='hello', no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()
Exécutez le programme et vous pouvoir recevoir Le message Hello World! est envoyé à la file d'attente hello, puis imprimé à l'écran. Changez de terminal et exécutez à nouveau send.py. Vous pouvez voir que contain.py recevra à nouveau des informations.
Exemple de file d'attente de travail
1 Préparation
Dans l'exemple de programme, utilisez new_task .py pour simuler. l'allocateur de tâches et worker.py pour simuler le travailleur.
Modifiez send.py pour recevoir les informations des paramètres de ligne de commande et envoyez
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
Modifiez la fonction de rappel de recevoir.py.
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
Ici, ouvrez d'abord deux terminaux, tous deux exécutent worker.py et sont en état d'écoute. Ce côté équivaut à deux travailleurs. Ouvrez le troisième terminal et exécutez new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
Observez que travailleur.py reçoit des tâches et qu'un travailleur reçoit 3 tâches :
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
Un autre travailleur a reçu 2 tâches :
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。
2.消息确认(Message acknowledgment)
消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag)
这边停顿5秒,可以方便ctrl+c退出。
去除no_ack=True参数或者设置为False也可以。
channel.basic_consume(callback, queue='hello', no_ack=False)
用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。
3.消息持久化存储(Message durability)
虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:
channel.queue_declare(queue='hello', durable=True)
但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:
channel.queue_declare(queue='task_queue', durable=True)
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
4.公平调度(Fair dispatch)
上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。
channel.basic_qos(prefetch_count=1)
new_task.py完整代码
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close() worker.py完整代码 #!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
更多Python+Pika+RabbitMQ环境部署及实现工作队列相关文章请关注PHP中文网!