由於想用Python實現一套分散式系統,來管理和監控CDN的內容與運作狀態,誤打誤撞認識了RabbitMQ,推薦的人很多,如餘鋒《我為什麼要選擇RabbitMQ》等等。
在MQ這個詞彙映入眼簾的時候,我花了好幾個小時去閱讀什麼是MQ,也就是Message Queue(訊息佇列)。顧名思義,訊息佇列,裝滿訊息的佇列,佇列,資料結構名詞,具備先進先出特性的一種資料結構。訊息佇列無非是用來傳遞訊息的而已,那麼其意義何在,應用場景又在哪裡,具備什麼特性,及其獨特優勢是什麼,為什麼要用,這些都是浮現在我腦海中的一連串問題。
經過一大串搜索,終於膚淺的理解訊息隊列是應用對應用的訊息傳遞的溝通方式。例如需要分析多台伺服器的日誌,完全可以每台伺服器都用一個進程往一個Mysql資料庫的表裡面寫入數據,也就是所需要的信息,然後再寫幾個進程,讀取表裡數據,進行數據分析不是挺好,不過很快,這種設計的醜陋之處就顯現出來了…… 你想要多個程式從一個佇列當中取資料來處理?沒問題,我們硬編碼程式的個數好了……什麼?還要能夠允許程式動態地增加和減少的時候動態進行壓力分配?這是很流行的《RabbitMQ+Python入門經典 兔子與兔子窩》裡面的例子。想想也是,當我的CDN傳輸來一大堆資料的時候,資料的分發,處理,所有的一切都會是問題。但其實我還是沒明白,Rabbit到底是怎麼樣實現這些東西的。
概念上說,RabbitMQ是AMPQ(高階訊息協定佇列)的標準實現,據說不熟悉AMQP,會看不懂RabbitMQ的文檔。但也只能建大的理解關鍵概念了。整個RabbitMQ的實作原理模型見下圖,其實就是一個有路由任務分發佇列的生產者與消費者模型。如圖所示,即生產者生產出相應的訊息,發送給路由器,路由器根據訊息中的關鍵Key訊息,將訊息分發到不同的訊息隊列中,再由消費者去不同的訊息隊列中讀取數據的過程。
Broker:簡單來說就是訊息佇列伺服器實體。
Exchange:訊息交換機,它指定訊息依什麼規則,路由到哪個佇列。
Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。
Binding:綁定,它的作用就是把exchange和queue依照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
vhost:虛擬主機,一個broker裡可以開立多個vhost,用來當作不同使用者的權限分離。
producer:訊息生產者,就是投遞訊息的程式。
consumer:訊息消費者,就是接受訊息的程式。
channel:訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表一個會話任務。
訊息佇列的使用過程大概如下:
(1)客戶端連接到訊息佇列伺服器,開啟一個channel。
(2)客戶端聲明exchange,並設定相關屬性。
(3)客戶端宣告一個queue,並設定相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關係。
(5)客戶端投遞訊息到exchange。
了解了RabbitMQ大概流程與優勢之後,我開始在自己的Fedora上進行RabbitMQ的試用。
根據RabbitMQ官網介紹,安裝需要從此處下載
http://www.rabbitmq.com/download.html點擊server-3.3.0-1.noarch.rpm
進入下載路徑,/home/sun5495/Downloads/
[sun5495@localhost Downloads]# sudo chmod 777 rabbitmq-server-3.3.0-1.no.3.0-1.no.3.
更改可執行檔權限,增加執行權限。 然後執行./rabbitmq-server-3.3.0-1.noarch.rpm,運行報錯,無法安裝。 原來是現需要安裝Erlang才可以,試試此指令 yum install erlang 搞定。 然後使用root用戶執行rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc yum install rabbitmq-server-3.3.0-1.noarch.rpm安裝成功。 為了設定RBMQ開機啟動,使用管理員帳號執行
chkconfig rabbitmq-server on開啟和關閉server使用指令
/sbin/service rabbitmq-server stop/start結果開啟的時候報錯如下
Starting rabbitmq-server (via system forctl): forftlion for; See 'systemctl status rabbitmq-server.service' and 'journalctl -xn' for details. [FAILED]
使用journalctl -xn命令打开日志,查看了下貌似是Erlang的某个文件拒绝访问,然后还提出了一大堆建议。
尝试一下
grep beam.smp /var/log/audit/audit.log | audit2allow -M mypolsemodule -i mypol.pp/sbin/service rabbitmq-server start
既然RabbitMQ安装也运行成功了,那么我就来尝尝RabbitMQ的鲜吧。就从官网上的例子一步一笔来做好了。
由于我使用的是Python,那么就需要安装一些支持RabbitMQ的库来进行操作,其中包括
py-amqplib,txAMQP,pika这几种,按照官网的tutorial,我也安装了pika。
pip install pika
从最简单的收发消息开始。即一端发送消息,一端接收消息。
发送方即生产者,首先要创建与RabbitMQ服务器的连接,
#!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost'))channel = connection.channel()
此处服务器连接本地localhost,也可以指定ip或者主机名。
其次发送方需要声明一个队列,比如命名为sayhello
channel.queue_declare(queue='sayhello')
此时我们就可以发送消息了。由于第一个小案例比较简单,没有经过路由器,因此发送消息时,指定路由为空即可。
channel.basic_publish(exchange='', routing_key='hello', body='hello world')print "Sent ‘hello world'"
最后关闭连接即可。
connection.close()
接收方即消费者,需要从队列上获取数据,因此也需要绑定一个队列
channel.queue_declare(queue='hello')
同时,由于接收方的工作方式是基于队列的消息执行一个回调函数,当收到消息时,Pika就会执行相应的回调函数,因此我们需要定义此函数。
def callback(ch, method, properties, body): print " [x] Received %r" % (body,)
接下来我们需要初始化这个消费者,并对消费者进行启动。
channel.basic_consume(callback, queue='hello', no_ack=True)print ' [*] Waiting for messages. To exit press CTRL+C'channel.start_consuming()
OK执行成功。
接下来就逐步的深入体验RabbitMQ的独特魅力。
以上就是Python操作RabbitMQ初体验(一)的内容,更多相关内容请关注PHP中文网(www.php.cn)!