Heim >Backend-Entwicklung >Python-Tutorial >RabbitMQ-Schnellstart-Python-Tutorial

RabbitMQ-Schnellstart-Python-Tutorial

高洛峰
高洛峰Original
2017-03-09 09:28:191918Durchsuche

HelloWorld

Einführung

RabbitMQ: Wenn es Nachrichten annimmt und dann zustellt, kann es als „Postamt“ betrachtet werden. Sender und Empfänger interagieren über Warteschlangen. Die Größe der Warteschlange kann als unbegrenzt angesehen werden. Mehrere Absender können Nachrichten an eine Warteschlange senden, und mehrere Empfänger können auch Nachrichten von einer Warteschlange empfangen.

Code

Das von Rabbitmq verwendete Protokoll ist amqp und der empfohlene Client für Python ist pika

pip install pika -i https://pypi.douban.com/simple/

send.py

# coding: utf8
import pika

# 建立一个连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
           'localhost'))  # 连接本地的RabbitMQ服务器
channel = connection.channel()  # 获得channel

Der Link hier gilt für diesen Computer. Wenn Sie eine Verbindung zu einem Server auf einem anderen Computer herstellen möchten, geben Sie einfach die Adresse oder den Hostnamen ein.

Als nächstes stellen wir sicher, dass die Warteschlange, die die Nachricht akzeptiert, vorhanden ist, andernfalls verwirft RabbitMQ die Nachricht.

channel.queue_declare(queue='hello')  # 在RabbitMQ中创建hello这个队列
channel.basic_publish(exchange='',  # 使用默认的exchange来发送消息到队列
                  routing_key='hello',  # 发送到该队列 hello 中
                  body='Hello World!')  # 消息内容

connection.close()  # 关闭 同时flush

RabbitMQ benötigt standardmäßig 1 GB freien Speicherplatz Das Senden schlägt fehl.

Zu diesem Zeitpunkt wurde eine Nachricht in der lokalen Warteschlange „Hallo“ gespeichert. Wenn Sie Rabbitmqctl list_queues verwenden, können Sie

hello 1

sehen, dass eine Nachricht in der „Hallo“-Warteschlange gespeichert ist

receive.py

# coding: utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()

Stellen Sie immer noch zuerst eine Verbindung zum Server her, genau wie beim vorherigen Senden

channel.queue_declare(queue='hello')  # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错

def callback(ch, method, properties, body):  # 用于接收到消息后的回调
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',  # 收指定队列hello的消息
                      no_ack=True)  #在处理完消息后不发送ack给服务器
channel.start_consuming()  # 启动消息接受 这会进入一个死循环

Arbeitswarteschlange (Aufgabenwarteschlange)

Arbeitswarteschlange wird verwendet, um zeitaufwändige Aufgaben auf mehrere Arbeitsprozesse zu verteilen. Anstatt ressourcenintensive Aufgaben sofort auszuführen (Sie müssen warten, bis diese Aufgaben abgeschlossen sind), planen Sie diese Aufgaben für die spätere Ausführung. Beispielsweise senden wir die Aufgabe als Nachricht an die Warteschlange, starten einen Worker-Prozess, um sie anzunehmen und schließlich auszuführen, und können mehrere Worker-Prozesse starten, um zu arbeiten. Dies gilt für Webanwendungen, bei denen komplexe Aufgaben nicht innerhalb des Verarbeitungsfensters einer http-Anfrage erledigt werden sollen.

channel.basic_publish(exchange='',
                  routing_key='task_queue',
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # 使得消息持久化
                  ))

Die Art und Weise, Nachrichten zu verteilen, ist Polling, das heißt, jeder Arbeitsprozess erhält die gleiche Anzahl von Nachrichten.

Nachrichtenbestätigung

Wenn eine Nachricht einem Arbeitsprozess zugewiesen wird, der Arbeitsprozess jedoch abstürzt, bevor die Verarbeitung abgeschlossen ist, kann die Nachricht verloren gehen, da Rabbitmq einmal eine Nachricht an den Arbeitsprozess verteilt , löscht es die Nachricht.

Um den Verlust von Nachrichten zu verhindern, stellt Rabbitmq eine Bestätigung bereit. Das heißt, nachdem der Arbeitsprozess die Nachricht empfangen und verarbeitet hat, sendet er eine Bestätigung an Rabbitmq, um Rabbitmq darüber zu informieren, dass die Nachricht zu diesem Zeitpunkt aus der Warteschlange gelöscht werden kann Zeit. Wenn der Arbeitsprozess abstürzt und Rabbitmq die Bestätigung nicht erhält, wird die Nachricht an andere Arbeitprozesse weitergegeben. Es ist nicht erforderlich, ein Timeout festzulegen, auch wenn die Aufgabe lange dauert, kann sie bearbeitet werden.

ack ist standardmäßig aktiviert. Zuvor gab unser Arbeitsprozess no_ack=True

channel.basic_consume(callback, queue='hello')  # 会启用ack

Rückruf mit ack:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack

Nachrichtenpersistenz

Aber manchmal wird RabbitMQ neu gestartet und Nachrichten gehen verloren. Die Persistenz kann beim Erstellen der Warteschlange festgelegt werden:

(die Art der Warteschlange kann nach ihrer Festlegung nicht mehr geändert werden)

channel.queue_declare(queue='task_queue', durable=True)
Gleichzeitig muss beim Senden auch das Persistenzattribut der Nachricht festgelegt werden die Nachricht:

channel.basic_publish(exchange='',

                  routing_key="task_queue",
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # make message persistent
                  ))
Wenn RabbitMQ jedoch gerade eine Nachricht empfangen hat und keine Zeit hatte, sie zu speichern, wird die Nachricht trotzdem angezeigt Gleichzeitig empfängt RabbitMQ nicht jede Nachricht. Wenn Sie eine vollständigere Garantie benötigen, müssen Sie die

Faire Nachrichtenverteilung verwenden Die Nachrichtenverteilung im Modus ist möglicherweise nicht fair, z. B. sind alle ungeraden Nachrichten bei schweren Aufgaben ausgeführt, selbst wenn bei einem bestimmten Arbeitsprozess ein Rückstand an Nachrichten vorhanden ist, die beispielsweise nicht verarbeitet wurden. Viele Bestätigungen wurden nicht gesendet, RabbitMQ sendet weiterhin nacheinander Nachrichten an ihn. Fügen Sie Einstellungen hinzu:

Informieren Sie RabbitMQ, damit, wenn ein Arbeitsprozess keine Bestätigung zurücksendet, keine Nachrichten zugewiesen werden

channel.basic_qos(prefetch_count=1)
Gruppenversand

Im Allgemeinen wird eine Nachricht an einen Arbeitsprozess gesendet und dann abgeschlossen. Manchmal möchte ich eine Nachricht an mehrere Prozesse gleichzeitig senden

Austausch

Sendet der Absender die Nachricht direkt an die Warteschlange? Der Absender weiß tatsächlich nicht, an welche Warteschlange die Nachricht gesendet wird Einerseits empfängt die Vermittlungsstelle die Nachrichten des Produzenten und schiebt sie andererseits in die Warteschlange. Als Vermittlungsstelle müssen Sie also wissen, was zu tun ist, wenn eine Nachricht empfangen wird, und ob sie einer Sondernachricht hinzugefügt werden soll Es gibt Direkt-, Themen-, Header- und Fanout-Typen sowie Massenversand, bei denen der Wert „Fanout“ lautet. Der Standardaustausch wurde verwendet. , kann beim Senden oder Empfangen verwendet werden.

Bind-Austausch und Warteschlange

Protokolle werden beim Senden auch an Hallo gesendet Beim Senden wird die Nachricht über den neu erstellten Protokollaustausch
channel.exchange_declare(exchange='logs', type='fanout')  # 该exchange会把消息发送给所有它知道的队列中

result = channel.queue_declare()  # 创建一个随机队列
result = channel.queue_declare(exclusive=True)  # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它
queue_name = result.method.queue

weitergeleitet. Bind wurde zuvor verwendet, das heißt, die Beziehung zwischen dem Austausch und der Warteschlange wird hergestellt ist an Nachrichten vom Austausch interessiert. Sie können beim Binden auch die Option „routing_key“ angeben. Verwenden Sie den direkten Austausch

, um die Nachricht, die dem Routing-Schlüssel entspricht, an die Warteschlange zu senden, die an denselben Routing-Schlüssel gebunden ist

channel.queue_bind(exchange='logs',
               queue='hello')
Sendefunktion, Veröffentlichen von Nachrichten mit unterschiedlichem Schweregrad:

Binden Sie den entsprechenden Schweregrad in der Akzeptanzfunktion:

   channel.basic_publish(exchange='logs',
                  routing_key='',
                  body=message)

Themenaustausch verwenden

Der zuvor verwendete direkte Austausch kann nur zum Binden eines Routing-Schlüssels verwendet werden. Sie können diesen Themenaustausch verwenden, der die Routing-Schlüssel trennt, zum Beispiel:

"stock.usd.nyse" "nyse.vmw"

和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:

* 代表1个单词
# 代表0个或多个单词

如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。

Q1:
*.orange.*  对应的是中间的colour都为orange的

Q2:
*.*.rabbit  对应的是最后部分的species为rabbit的
lazy.#      对应的是第一部分是lazy的

qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。

 RPC

在远程机器上运行一个函数然后获得结果。

1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列

    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
                               queue=self.callback_queue)

2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复

注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了

channel.basic_publish(exchange='',
                       routing_key='rpc_queue',
                       properties=pika.BasicProperties(
                             reply_to = self.callback_queue,
                             correlation_id = self.corr_id,
                             ),
                       body=str(n))  # 发出调用

while self.response is None:  # 这边就相当于阻塞了
    self.connection.process_data_events()  # 查看回调队列
return int(self.response)

3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复

channel.basic_consume(on_request, queue='rpc_queue')  # 绑定 等待请求

# 处理之后:
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                 body=str(response))  # 发送回复到回调队列
ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack

5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作

if self.corr_id == props.correlation_id:
        self.response = body

                                               

Das obige ist der detaillierte Inhalt vonRabbitMQ-Schnellstart-Python-Tutorial. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn