首頁 >後端開發 >Python教學 >Python操作RabbitMQ伺服器實作訊息佇列的路由

Python操作RabbitMQ伺服器實作訊息佇列的路由

高洛峰
高洛峰原創
2017-03-01 14:03:111308瀏覽

RabbitMQ是一個訊息佇列伺服器,這裡我們針對Python+Pika+RabbitMQ的伺服器端環境,來看一下如何使用Python操作RabbitMQ伺服器實現訊息佇列的路由功能

Python使用Pika函式庫(安裝:sudo pip install pika)可以操作RabbitMQ訊息佇列伺服器(安裝:sudo apt-get install rabbitmq-server),這裡我們來看看MQ相關的路由功能。

路由鍵的實作

例如有一個需要給所有接收端發送訊息的場景,但是如果需要自由定制,有的訊息發給其中一些接收端,有些訊息發送給另外一些接收端,該怎麼辦呢?這種情況下就要用到路由鍵了。

路由鍵的工作原理:每個接收端的訊息佇列在綁定交換器的時候,可以設定對應的路由鍵。當傳送端透過交換器傳送訊息時,可以指明路由鍵 ,交換機會根據路由鍵把訊息傳送到對應的訊息佇列,這樣接收端就能接收到訊息了。

這邊繼上一篇,還是用send.py和receive.py來模擬實作路由鍵的功能。 send.py表示發送端,receive.py表示接收端。實例的功能就是將info、warning、error三種層級的資訊傳送到不同的接收端。

send.py程式碼分析

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
#定义交换机,设置类型为direct
channel.exchange_declare(exchange='messages', type='direct')
 
#定义三个路由键
routings = ['info', 'warning', 'error']
 
#将消息依次发送到交换机,并设置路由键
for routing in routings:
  message = '%s message.' % routing
  channel.basic_publish(exchange='messages',
             routing_key=routing,
             body=message)
  print message
 
connection.close()

#receive.py程式碼分析

#!/usr/bin/env python
#coding=utf8
import pika, sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
#定义交换机,设置类型为direct
channel.exchange_declare(exchange='messages', type='direct')
 
#从命令行获取路由键参数,如果没有,则设置为info
routings = sys.argv[1:]
if not routings:
  routings = ['info']
 
#生成临时队列,并绑定到交换机上,设置路由键
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for routing in routings:
  channel.queue_bind(exchange='messages',
            queue=queue_name,
            routing_key=routing)
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue=queue_name, no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

#開啟兩個終端,一個執行程式碼python receive.py info warning,表示只接收info和warning的訊息。另外一個終端運行send.py,可以觀察到接收終端只接收到了info和warning的訊息。如果開啟多個終端運行receive.py,並傳入不同的路由鍵參數,可以看到更明顯的效果。

當接收端正在執行時,可以使用rabbitmqctl list_bindings來查看綁定狀況。

路由鍵模糊匹配
路由鍵模糊匹配,就是可以使用正規表示式,和常用的正規表示式不同,這裡的話「#」表示所有、全部的意思;「*」只配對到一個字。看完範例就能明白了。

這邊繼上面的例子,還是用send.py和receive.py來實現路由鍵模糊匹配的功能。 send.py表示發送端,receive.py表示接收端。實例的功能大概是這樣:例如你有個知心好朋友,不管開心、傷心、工作上的還是生活上的事情都可以和她說;還有一些朋友可以分享開心的事情;還有一些朋友,你可以把不開心的事跟她說。

send.py程式碼分析

因為要進行路由鍵模糊匹配,所以交換器的類型要設定為topic,設定為topic,就可以使用#,*的匹配符號了。

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
#定义交换机,设置类型为topic
channel.exchange_declare(exchange='messages', type='topic')
 
#定义路由键
routings = ['happy.work', 'happy.life', 'sad.work', 'sad.life']
 
#将消息依次发送到交换机,并设定路由键
for routing in routings:
  message = '%s message.' % routing
  channel.basic_publish(exchange='messages',
             routing_key=routing,
             body=message)
  print message
 
connection.close()

上例中定義了四種類型的訊息,容易理解,就不解釋了,然後依序發送出去。

receive.py程式碼分析

同樣,交換器的類型要設定為topic就可以了。從命令列接收參數的功能稍微調整了一下,就是沒有參數時報錯退出。

#!/usr/bin/env python
#coding=utf8
import pika, sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
#定义交换机,设置类型为topic
channel.exchange_declare(exchange='messages', type='topic')
 
#从命令行获取路由参数,如果没有,则报错退出
routings = sys.argv[1:]
if not routings:
  print >> sys.stderr, "Usage: %s [routing_key]..." % (sys.argv[0],)
  exit()
 
#生成临时队列,并绑定到交换机上,设置路由键
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for routing in routings:
  channel.queue_bind(exchange='messages',
            queue=queue_name,
            routing_key=routing)
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue=queue_name, no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

開啟四個終端,一個運行如下,表示任何事情都可以和她說:

python receive.py "#"

另外一個終端運行如下,表示可以和她分享開心的事:

python receive.py "happy.*"

第三個運行如下,表示工作上的事情可以和她分享:

python receive.py "*.work"

最後一個運行python send.py。結果不難想像出來,就不貼出來了。

更多Python操作RabbitMQ伺服器實作訊息佇列的路由相關文章請關注PHP中文網!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn