RabbitMQ: メッセージを受信して配信するもので、「郵便局」とみなすことができます。送信者と受信者はキューを介して対話します。キューのサイズは無制限であると考えられます。複数の送信者がキューにメッセージを送信でき、複数の受信者がキューからメッセージを受信することもできます。
rabbitmq で使用されるプロトコルは amqp で、Python の推奨クライアントは pika
pip install pika -i https://pypi.douban.com/simple/
send.py
# coding: utf8 import pika # 建立一个连接 connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) # 连接本地的RabbitMQ服务器 channel = connection.channel() # 获得channel
他のマシン上のサーバーに接続したい場合は、ここにあるリンクがあります。 just fill in アドレスまたはホスト名を入力するだけです。
次にメッセージの送信を開始します。メッセージを受け入れるキューが存在することを確認してください。存在しない場合、rabbitMQ はメッセージを破棄します
channel.queue_declare(queue='hello') # 在RabbitMQ中创建hello这个队列 channel.basic_publish(exchange='', # 使用默认的exchange来发送消息到队列 routing_key='hello', # 发送到该队列 hello 中 body='Hello World!') # 消息内容 connection.close() # 关闭 同时flush
RabbitMQ はデフォルトで 1GB の空きディスク容量を必要とし、それ以外の場合は送信が失敗します。
このとき、ローカルキューhelloにメッセージが格納されています。rabbitmqctl list_queuesを使用すると、helloキューにメッセージが格納されていることを示す
receive.py
hello 1
が表示されます。最初にサーバーに接続した方が良いです。前に送信したときと同じです
# coding: utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
ワークキュー(タスクキュー)
channel.queue_declare(queue='hello') # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错 def callback(ch, method, properties, body): # 用于接收到消息后的回调 print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', # 收指定队列hello的消息 no_ack=True) #在处理完消息后不发送ack给服务器 channel.start_consuming() # 启动消息接受 这会进入一个死循环
メッセージの配布方法はポーリングです。つまり、各ワーカー プロセスは同じ数のメッセージを取得します。
メッセージ確認
メッセージの損失を防ぐために、rabbitmq は ack を提供します。つまり、ワーカー プロセスがメッセージを受信して処理した後、rabbitmq に ack を送信して、現時点でメッセージをキューから削除できることを Rabbitmq に通知します。ワーカー プロセスが停止し、rabbitmq が ack を受信しない場合、メッセージは他のワーカー プロセスに再配布されます。タイムアウトを設定する必要がなく、長時間かかるタスクでも処理できます。
ack はデフォルトで有効になっています。以前、ワーカー プロセスは ack:
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # 使得消息持久化 ))
メッセージ永続性
を使用して no_ack=True
channel.basic_consume(callback, queue='hello') # 会启用ack
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) # 发送ack
channel.basic_publish (exchange='',
channel.queue_declare(queue='task_queue', durable=True)
ただし、RabbitMQ がメッセージを受信したばかりで、それを保存する時間がなかった場合でも、メッセージは失われます。同時に、RabbitMQ は受信したすべてのメッセージを保存しません。より完全な保証が必要な場合は、発行者確認を使用する必要があります。
公平なメッセージ配信 ポーリング モードでのメッセージ配信は公平ではない可能性があります。たとえば、奇数のメッセージが重いタスクである場合、一部のプロセスは常に重いタスクを実行します。たとえば、特定のワーカー プロセスにバックログのメッセージがある場合でも、多くの ack が送信されませんが、RabbitMQ は引き続きメッセージを受信プロセスに追加できます:routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
channel.basic_qos(prefetch_count=1)
channel.exchange_declare(exchange='logs', type='fanout') # 该exchange会把消息发送给所有它知道的队列中このように、result.method.queue はキュー名であり、 Exchange と queue
result = channel.queue_declare() # 创建一个随机队列 result = channel.queue_declare(exclusive=True) # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它 queue_name = result.method.queueのログをバインドしてコピーを送信します。メッセージを送信するときは、こんにちはメッセージを送信するときは、新しく作成されたログ交換を使用します
channel.queue_bind(exchange='logs', queue='hello')ルーティング以前はバインドを使用して、交換とキューの関係を確立しました(キューは交換からのメッセージに関心があります) )、バインドするときに routing_key オプションを指定することもできます直接交換を使用して、ルーティング キーに対応するメッセージを同じルーティング キーにバインドされたキューに送信します
channel.basic_publish(exchange='logs', routing_key='', body=message)
。
channel.exchange_declare(exchange='direct_logs', type='direct')受信関数で対応する重大度をバインドします:
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)トピック交換を使用します前に使用した直接交換は 1 つのルーティング キーのみをバインドできます。これを使用して、ルーティング キーのトピック交換を開くことができます。例:
"stock.usd.nyse" "nyse.vmw"
和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:
* 代表1个单词 # 代表0个或多个单词
如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。
Q1: *.orange.* 对应的是中间的colour都为orange的 Q2: *.*.rabbit 对应的是最后部分的species为rabbit的 lazy.# 对应的是第一部分是lazy的
qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。
在远程机器上运行一个函数然后获得结果。
1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列
self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复
注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了
channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) # 发出调用 while self.response is None: # 这边就相当于阻塞了 self.connection.process_data_events() # 查看回调队列 return int(self.response)
3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复
channel.basic_consume(on_request, queue='rpc_queue') # 绑定 等待请求 # 处理之后: ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) # 发送回复到回调队列 ch.basic_ack(delivery_tag = method.delivery_tag) # 发送ack
5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作
if self.corr_id == props.correlation_id: self.response = body
以上がRabbitMQ クイック スタート Python チュートリアルの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。