首頁  >  文章  >  後端開發  >  解析Python實作MQ訊息佇列以及訊息佇列的優點

解析Python實作MQ訊息佇列以及訊息佇列的優點

巴扎黑
巴扎黑原創
2017-08-16 13:37:046139瀏覽

訊息佇列(MQ,Message Queue)在訊息資料傳輸中的保存作用為資料通訊提供了保障和即時處理上的便利,這裡我們就來看一下Python中線程的MQ訊息佇列實作以及訊息佇列的優點解析

「訊息佇列」是在訊息的傳輸過程中保存訊息的容器。訊息佇列管理器在將訊息從它的來源中繼到它的目標時充當中間人。佇列的主要目的是提供路由並保證訊息的傳遞;如果發送訊息時接收者不可用,訊息佇列會保留訊息,直到可以成功地傳遞它。相信對任何架構或應用來說,訊息佇列都是一個至關重要的元件,以下是十個理由:

Python的訊息佇列範例:

1.threading+Queue實現執行緒佇列

#!/usr/bin/env python
 
import Queue
import threading
import time
 
queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
 """没打印一个数字等待1秒,并发打印10个数字需要多少秒?"""
 def __init__(self, queue):
  threading.Thread.__init__(self)
  self.queue = queue
 
 def run(self):
  whileTrue:
   #消费者端,从队列中获取num
   num = self.queue.get()
   print "i'm num %s"%(num)
   time.sleep(1)
   #在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号
   self.queue.task_done()
 
start = time.time()
def main():
 #产生一个 threads pool, 并把消息传递给thread函数进行处理,这里开启10个并发
 for i in range(10):
  t = ThreadNum(queue)
  t.setDaemon(True)
  t.start()
  
 #往队列中填错数据 
 for num in range(10):
   queue.put(num)
 #wait on the queue until everything has been processed
 queue.join()
 
main()
print "Elapsed Time: %s" % (time.time() - start)

執行結果:

i'm num 0
i'm num 1
i'm num 2
i'm num 3
i'm num 4
i'm num 5
i'm num 6
i'm num 7
i'm num 8
i'm num 9
Elapsed Time: 1.01399993896

解讀:

#具體工作步驟說明如下:

1,建立一個Queue.Queue() 的實例,然後使用資料對它進行填充。

2,將經過填充資料的實例傳遞給線程類,後者是透過繼承 threading.Thread 的方式創建的。

3,產生守護線程池。

4,每次從佇列中取出一個項目,並使用該執行緒中的資料和 run 方法來執行相應的工作。

5,在完成這項工作之後,使用 queue.task_done() 函數向任務已經完成的佇列發送一個訊號。

6,對佇列執行 join 操作,實際上意味著等到佇列為空,再退出主程式。

在使用這個模式時需要注意一點:透過將守護執行緒設為 true,程式運行完自動退出。好處是在退出之前,可以對佇列執行 join 操作、或等到佇列為空。

2.多個佇列

所謂多個佇列,一個佇列的輸出可以作為另一個佇列的輸入

#!/usr/bin/env python
import Queue
import threading
import time
 
queue = Queue.Queue()
out_queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
  def __init__(self, queue, out_queue):
    threading.Thread.__init__(self)
    self.queue = queue
    self.out_queue = out_queue
 
  def run(self):
    whileTrue:
      #从队列中取消息
      num = self.queue.get()
      bkeep = num
      
      #将bkeep放入队列中
      self.out_queue.put(bkeep)
 
      #signals to queue job is done
      self.queue.task_done()
 
class PrintLove(threading.Thread):
  def __init__(self, out_queue):
    threading.Thread.__init__(self)
    self.out_queue = out_queue
 
  def run(self):
    whileTrue:
      #从队列中获取消息并赋值给bkeep
      bkeep = self.out_queue.get()  
      keke = "I love " + str(bkeep)
      print keke,
      print self.getName()
      time.sleep(1)
 
      #signals to queue job is done
      self.out_queue.task_done()
 
start = time.time()
def main():
  #populate queue with data
  for num in range(10):
    queue.put(num)
    
  #spawn a pool of threads, and pass them queue instance
  for i in range(5):
    t = ThreadNum(queue, out_queue)
    t.setDaemon(True)
    t.start()
 
 
  for i in range(5):
    pl = PrintLove(out_queue)
    pl.setDaemon(True)
    pl.start()
 
  #wait on the queue until everything has been processed
  queue.join()
  out_queue.join()
 
main()
print "Elapsed Time: %s" % (time.time() - start)

運行結果:

I love 0 Thread-6
I love 1 Thread-7
I love 2 Thread-8
I love 3 Thread-9
I love 4 Thread-10
I love 5 Thread-7
I love 6 Thread-6
I love 7 Thread-9
I love 8 Thread-8
I love 9 Thread-10
Elapsed Time: 2.00300002098

 

解讀:

ThreadNum 類別工作流程

#定義佇列--->繼承threading---->初始化queue---->定義run函數--->get queue中的資料---->處理資料---->put資料到另一個queue-->發訊號告訴queue該條處理完畢

 

main函數工作流程:

--->往自訂queue中丟資料

--->for迴圈確定啟動的執行緒數-- -->實例化ThreadNum類別---->啟動線程並設定守護

--->for循環決定啟動的線程數---->實例化PrintLove類別--- >啟動執行緒並設定為守護

--->等待queue中的訊息處理完畢後執行join。即退出主程式。

了解了MQ的大概實現以後,我們來總結一下訊息佇列的優點:

1. 解耦

在專案啟動之初來​​預測未來專案會碰到什麼需求,是極為困難的。訊息佇列在處理過程中間插入了一個隱含的、基於資料的介面層,兩邊的處理過程都要實現這一介面。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守相同的介面約束。

2. 冗餘

有時在處理資料的時候處理過程會失敗。除非資料被持久化,否則將永遠遺失。訊息佇列把資料持久化直到它們已經完全處理,透過這種方式規避了資料遺失風險。在被許多訊息佇列所採用的"插入-獲取-刪除"範式中,在把一個訊息從佇列中刪除之前,需要你的處理過程明確的指出該訊息已經被處理完畢,確保你的資料被安全的保存直到你使用完畢。

3. 擴展性

因為訊息佇列解耦了你的處理過程,所以增大訊息入隊和處理的頻率是很容易的;只要另外增加處理過程即可。不需要改變程式碼、不需要調節參數。擴充就像調大電力按鈕一樣簡單。

4. 靈活性 & 峰值處理能力

當你的應用程式上了Hacker News的首頁,你將發現訪問流量攀升到一個不同尋常的水平。在訪問量劇增的情況下,你的應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見;如果為 以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用訊息佇列能夠使關鍵元件頂住成長的存取壓力,而不是因為超出負荷的請求而完全崩潰。 請查看我們關於峰值處理能力的部落格文章以了解更多此方面的資訊。

5. 可恢復性

當體系的一部分元件失效,不會影響整個系統。訊息佇列降低了進程間的耦合度,所以即使一個處理訊息的進程掛掉,加入佇列中的訊息仍然可以在系統復原後被處理。而這種允許重試或延後處理請求的能力通常是造就一個略感不便的使用者和一個沮喪透頂的使用者之間的區別。

6. 送達保證

訊息佇列提供的冗餘機制保證了訊息能被實際的處理,只要一個行程讀取了該佇列即可。在此基礎上,IronMQ提供了一個"只送達一次"保證。無論有多少進 程在從隊列中領取數據,每個訊息只能被處理一次。這之所以成為可能,是因為取得一個訊息只是"預定"了這個訊息,暫時把它移出了隊列。除非客戶端明確的 表示已經處理完了這個訊息,否則這個訊息會被放回佇列中去,在一段可設定的時間之後可再次處理。

7.排序保證

在許多情況下,資料處理的順序都很重要。訊息佇列本來就是排序的,並且能保證資料會按照特定的順序來處理。 IronMO保證訊息漿糊透過FIFO(先進先出)的順序來處理,因此訊息在佇列中的位置就是從佇列中擷取他們的位置。

8.緩衝

在任何重要的系統中,都會有需要不同的處理時間的元素。例如,載入一張圖片比應用濾鏡花費更少的時間。訊息佇列透過一個緩衝層來幫助任務最高效率的執行--寫入佇列的處理會盡可能的快速,而不受從佇列讀取的預備處理的約束。此緩衝有助於控制和最佳化資料流經過系統的速度。

9. 理解資料流

在一個分散式系統裡,要得到一個關於使用者操作會用多長時間及其原因的總體印象,是個巨大的挑戰。訊息系列透過訊息被處理的頻率,來方便的輔助確定那些表現不佳的處理過程或領域,這些地方的資料流都不夠優化。

10. 非同步通訊

很多時候,你不想也不需要立即處理訊息。訊息佇列提供了非同步處理機制,讓你把一個訊息放入佇列,但不會立即處理它。你想向隊列中放入多少訊息就放多少,然後在你樂意的時候再去處理它們。

以上是解析Python實作MQ訊息佇列以及訊息佇列的優點的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn