Heim >Backend-Entwicklung >Python-Tutorial >Bereitstellung der Python+Pika+RabbitMQ-Umgebung und Implementierung der Arbeitswarteschlange
Die chinesische Übersetzung von Rabbitmq bezieht sich hauptsächlich auf die Buchstaben mq: Message Queue, was Nachrichtenwarteschlange bedeutet. Davor steht auch das Wort „Kaninchen“, was bedeutet, dass die Python-Sprache ziemlich humorvoll ist. Der Rabbitmq-Dienst ähnelt den MySQL- und Apache-Diensten, die bereitgestellten Funktionen sind jedoch unterschiedlich. Mit rabbimq wird ein Dienst zum Versenden von Nachrichten bereitgestellt, der zur Kommunikation zwischen verschiedenen Anwendungen genutzt werden kann.
Rabbitmq installieren
Unter Ubuntu 12.04 können Sie es direkt über apt-get installieren:
sudo apt-get install rabbitmq-server
Nach der Installation wurde der Rabbitmq-Dienst gestartet. Schauen wir uns als Nächstes ein Beispiel für das Schreiben von „Hello World!“ in Python an. Der Inhalt des Beispiels besteht darin, „Hallo Welt!“ von send.py an Rabbitmq zu senden, und Empfang.py empfängt die von send.py gesendeten Informationen von Rabbitmq.
P steht für Produce, was Produzent bedeutet, und kann auch als Sender bezeichnet werden. Im Beispiel wird es als send.py dargestellt. C steht für Consumer Das heißt, es kann auch als Empfänger bezeichnet werden, was im Beispiel durch „receive.py“ dargestellt wird. Das rote Symbol in der Mitte stellt die Bedeutung der Warteschlange dar, die im Beispiel durch „Hallo Warteschlange“ dargestellt wird.
Python nutzt den Rabbitmq-Dienst. Sie können die vorgefertigten Klassenbibliotheken pika, txAMQP oder py-amqplib verwenden.
Pika installieren
Um Pika zu installieren, können Sie pip verwenden, ein Python-Softwareverwaltungspaket. Wenn es nicht installiert ist, können Sie es über apt- installieren. get
sudo apt-get install python-pip
Pika über Pip installieren:
sudo pip install pika
send.py Der Code
stellt eine Verbindung zum Rabbitmq-Server her. Da er lokal getestet wird, verwenden Sie einfach localhost.
connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
Deklariert eine Nachrichtenwarteschlange, in der Nachrichten zugestellt werden. Wenn Nachrichten an eine Warteschlange gesendet werden, die nicht existiert, löscht Rabbitmq diese Nachrichten automatisch.
channel.queue_declare(queue='hello')
Senden Sie die Nachricht an die oben deklarierte Hallo-Warteschlange, wobei Exchange den Austauscher darstellt, der genau angeben kann, an welche Warteschlange die Nachricht gesendet werden soll. und Routing_key ist auf die Warteschlange eingestellt. Der Name und der Text sind der zu sendende Inhalt. Auf die spezifischen Sendedetails werden wir vorerst nicht achten.
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
Verbindung schließen
connection.close()
Vollständiger Code
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()
Führen Sie zuerst dieses Programm aus. Wenn die Ausführung erfolgreich ist, sollte Rabbitmqctl die Hallo-Warteschlange erfolgreich hinzufügen und es sollte eine Nachricht in der Warteschlange vorhanden sein um es zu überprüfen.
rabbitmqctl list_queues
Geben Sie die folgenden Informationen auf dem Computer des Autors aus:
Es gibt tatsächlich eine Hallo-Warteschlange und eine Nachricht in der Warteschlange. Verwenden Sie als Nächstes „receive.py“, um die Informationen in der Warteschlange abzurufen.
receive.py-Code
ist derselbe wie die beiden vorherigen Schritte von send.py, die beide zuerst eine Verbindung zum Server und dann die Deklaration der Nachrichtenwarteschlange erfordern , was hier nicht besprochen wird. Derselbe Code wurde gepostet.
Der Empfang von Nachrichten ist komplizierter und erfordert die Definition einer Rückruffunktion, um sie zu verarbeiten. Die Rückruffunktion dient hier zum Ausdrucken der Informationen.
def callback(ch, method, properties, body): print "Received %r" % (body,)
Sagen Sie Rabbitmq, dass es Callback zum Empfangen von Nachrichten verwenden soll
channel.basic_consume(callback, queue='hello', no_ack=True)
Starten Sie den Empfang von Informationen und geben Sie den Blockierungsstatus ein. Erst wenn sich Informationen in der Warteschlange befinden, wird der Rückruf zur Verarbeitung aufgerufen. Drücken Sie Strg+C, um den Vorgang zu beenden.
channel.start_consuming()
Komplettcode
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='hello', no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()
Führen Sie das Programm aus und Sie werden es tun in der Lage sein, die Nachricht „Hallo Welt!“ an die „Hallo“-Warteschlange zu senden und dann auf dem Bildschirm auszugeben. Wechseln Sie ein Terminal und führen Sie send.py erneut aus. Sie können sehen, dass „receiver.py“ erneut Informationen empfängt.
Beispiel für eine Arbeitswarteschlange
1. Vorbereitung
Verwenden Sie im Beispielprogramm new_task .py zur Simulation den Task-Allokator und worker.py zur Simulation des Workers.
Ändern Sie send.py, um Informationen von Befehlszeilenparametern zu empfangen und zu senden
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
Ändern Sie die Rückruffunktion von require.py.
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
Öffnen Sie hier zunächst zwei Terminals, beide führen worker.py aus und befinden sich im Überwachungsstatus. Dies entspricht zwei Workern. Öffnen Sie das dritte Terminal, führen Sie new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
aus und beobachten Sie, dass worker.py Aufgaben erhält und ein Worker 3 Aufgaben erhält:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
Ein anderer Arbeiter hat 2 Aufgaben erhalten:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。
2.消息确认(Message acknowledgment)
消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag)
这边停顿5秒,可以方便ctrl+c退出。
去除no_ack=True参数或者设置为False也可以。
channel.basic_consume(callback, queue='hello', no_ack=False)
用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。
3.消息持久化存储(Message durability)
虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:
channel.queue_declare(queue='hello', durable=True)
但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:
channel.queue_declare(queue='task_queue', durable=True)
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
4.公平调度(Fair dispatch)
上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。
channel.basic_qos(prefetch_count=1)
new_task.py完整代码
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close() worker.py完整代码 #!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
更多Bereitstellung der Python+Pika+RabbitMQ-Umgebung und Implementierung der Arbeitswarteschlange相关文章请关注PHP中文网!