使用redis怎麼做訊息佇列
首先redis它的設計是用來做快取的,但是由於它本身的某種特性使得他可以用來做訊息佇列。它有幾個阻塞式的API可以使用,而正是這些阻塞式的API讓他有做訊息佇列的能力。
試想一下在」資料庫解決所有問題「的思路下,不使用訊息佇列也是可以完成你的需求的。我們把任務全部存放在資料庫然後透過不斷的輪詢方式來取任 務處理。這種做法雖然可以完成你的任務但是做法很粗劣。但是如果你的資料庫介面提供一個阻塞的方法那麼就可以避免輪詢操作了,你的資料庫也可以用來做訊息 隊列,只不過目前的資料庫還沒有這樣的介面。
另外做訊息佇列的其他特性例如FIFO也很容易實現,只需要一個List物件從頭取數據,從尾部塞資料即可實現。
redis能做訊息佇列得益於他list物件blpop brpop介面以及Pub/Sub(發佈/訂閱)的某些介面。他們都是阻塞版的,所以可以用來做訊息佇列。
rabbitmq的優先權做法
目前成熟的訊息佇列產品有很多,著名的例如rabbitmq。它使用起來相對還是比較簡單的,功能也相對比較豐富,一般場合下是完全夠用的。但是有個很煩人的就是它不支援優先順序。
例如一個發送郵件的任務,某些特權用戶希望它的郵件能夠更及時的發送出去,至少比普通用戶要優先對待。預設情況下rabbitmq是無法處理掉 的,丟給rabbitmq的任務都是FIFO先進先出。但是我們可以使用一些變通的技巧來支援這些優先順序。建立多個佇列,並為rabbitmq的消費者設定 置對應的路由規則。
例如預設有這樣一個佇列,我們拿list來模擬 [task1, task2, task3],消費者輪流依照FIFO的原則一個個拿出task來處理掉。如果有高優先權的任務進來,它也只能跟在最後被處理[task1, task2, task3, higitask1]. 但是如果使用兩個隊列,一個高優先權隊列,一個普通優先權隊列。 一般優先權[task1, task2, task3], 高優先權[hightask1 ] 然後我們設定消費者的路由讓消費者隨機從任意佇列中取資料即可。
並且我們可以定義一個專門處理高優先權隊列的消費者,它在空閒的時候也不處理低優先權隊列的資料。這類似銀行的VIP櫃檯,普通客戶在銀行取號排隊,一個VIP來了他雖然沒有從取號機裡拿出一個排在普通會員前面的票,但是他還是可以更快地直接走VIP通道。
使用rabbitmq來做支援優先權的訊息佇列的話,就像是上面所述同銀行VIP會員一樣,走不同的通道。但是這種方式只是相對的優先級,做不到絕對的優先級控制,例如我希望某一個優先級高的任務在絕對意義上要比其他普通任務優先處理掉,這樣上面的方案是行不通的。因為rabbitmq的消費者 只知道再自己空閒的情況下從自己關心的隊列中「隨機」取某一個隊列裡面的第一個資料來處理,它沒辦法控制優先取找哪一個隊列。或是更加細粒度的優先權控制。 或是你係統裡面設定的優先順序有10多種。這樣使用rabbitmq也是很難實現的。
但是如果使用redis來做佇列的話上面的需求都可以實現。
為什麼需要在訊息佇列
系統中引入訊息佇列機制是對系統一個非常大的改善。例如一個web系統中,使用者做了某項操作後需要發送郵件通知到使用者信箱。你可以使用同步方式讓用戶等待郵件發送完成後回饋給用戶,但這樣可能會因為網路的不確定性造成用戶長時間的等待而影響用戶體驗。
有些場景下是不可能使用同步方式等待完成的,那些需要後台花費大量時間的操作。例如極端例子,一個線上編譯系統任務,後台編譯完成需要30分鐘。這種場景的設計不可能同步等待後在回饋,必須是先回饋使用者隨後非同步處理完成,再等待處理完成後根據情況再此回饋使用者與否。
另外適用訊息佇列的情況是那些系統處理能力有限的情況下,先使用佇列機制把任務暫時存放起來,系統再一個個輪流處理掉排隊的任務。這樣在系統吞吐量不足的情況下也能穩定的處理掉高併發的任務。
訊息佇列可以用來做排隊機制,只要係統需要用到排隊機制的地方就可以使用訊息佇列來做。
redis訊息佇列優先權的實作
一些基礎redis基礎知識的說明
redis> blpop tasklist 0 "im task 01"
這個例子使用blpop指令會阻塞方式地從tasklist列表中取頭一個數據,最後一個參數就是等待超時的時間。如果設定為0則表示無限等 待。另外redis存放的資料都只能是string類型,所以在任務傳遞的時候只能是傳遞字串。我們只需要簡單的將負責資料序列化成json格式的字元 字串,然後消費者那邊再轉換一下即可。
這裡我們的範例語言使用python,連結redis的函式庫使用redis-py. 如果你有些程式設計基礎把它切換成自己喜歡的語言應該是沒問題的。
1.簡單的FIFO佇列
import redis, time def handle(task): print task time.sleep(4) def main(): pool = redis.ConnectionPool(host='localhost', port=6379, db=0) r = redis.Redis(connection_pool=pool) while 1: result = r.brpop('tasklist', 0) handle(result[1]) if name == "main": main()
上範例即使一個最簡單的消費者,我們透過一個無限迴圈不斷地從redis的佇列中取資料。如果佇列中沒有資料則沒有逾時的阻塞在那裡,有資料則取出往下執行。
一般情況取出來是個複雜的字串,我們可能需要將其格式化後作為再傳給處理函數,但是為了簡單我們的例子就是一個普通字串。另外範例中的處理函數不做任何處理,只是sleep 用來模擬耗時的操作。
我們另開一個redis的客戶端來模擬生產者,自備的客戶端就可以。多往tasklist 隊列裡面塞上一些資料。
redis> lpush tasklist 'im task 01' redis> lpush tasklist 'im task 02' redis> lpush tasklist 'im task 03' redis> lpush tasklist 'im task 04' redis> lpush tasklist 'im task 05'
隨後在消費者端便會看到這些模擬出來的任務被挨個消費掉。
2.簡單優先權的佇列
假設一個簡單的需求,只需要高優先權的比低優先權的任務率先處理掉。其他任務之間的順序一概不管,這種我們只需要在遇到高優先級任務的時候將它塞到隊列的前頭,而不是push到最後面即可。
因為我們的隊列是使用的redis的 list,所以很容易實現。遇到高優先級的使用rpush 遇到低優先順序的使用lpush
redis> lpush tasklist 'im task 01' redis> lpush tasklist 'im task 02' redis> rpush tasklist 'im high task 01' redis> rpush tasklist 'im high task 01' redis> lpush tasklist 'im task 03' redis> rpush tasklist 'im high task 03'
隨後會看到,高優先順序的總是比低優先順序的率先執行。但是這個方案的缺點是高優先順序的任務之間的執行順序是先進後出的。
3.較為完善的佇列
範例2中只是簡單的將高優先權的任務塞到佇列最前面,低優先權的塞到最後面。這樣保證不了高優先任務之間的順序。
假設當所有的任務都是高優先級的話,那麼他們的執行順序將是相反的。這樣明顯違背了隊列的FIFO原則。
不過只要稍加改進就可以完善我們的佇列。
跟使用rabbitmq一樣,我們設定兩個佇列,一個高優先權一個低優先權的佇列。高優先權任務放到高佇列中,低的放在低優先權佇列中。 redis和rabbitmq不同的是它可以要求佇列消費者從哪個佇列裡面先讀。
def main(): pool = redis.ConnectionPool(host='localhost', port=6379, db=0) r = redis.Redis(connection_pool=pool) while 1: result = r.brpop(['high_task_queue', 'low_task_queue'], 0) handle(result[1])
上面的程式碼,會阻塞地從'high_task_queue', 'low_task_queue'這兩個佇列裡面取數據,如果第一個沒有再從第二個裡面取。
所以只需要將隊列消費者做這樣的改進便可以達到目的。
redis> lpush low_task_queue low001 redis> lpush low_task_queue low002 redis> lpush low_task_queue low003 redis> lpush low_task_queue low004 redis> lpush high_task_queue low001 redis> lpush high_task_queue low002 redis> lpush high_task_queue low003 redis> lpush high_task_queue low004
透過上面的測試看到,高優先級的會被率先執行,並且高優先級之間也是保證了FIFO的原則。
這種方案我們可以支援不同階段的優先權佇列,例如高中低三個等級或更多的等級都可以。
4.優先等級很多的情況
假設有個這樣的需求,優先順序不是簡單的高中低或0-10這些固定的等級。而是類似0-99999這麼多等級。那麼我們第三種方案就不太適合了。
雖然redis有sorted set這樣的可以排序的資料型別,看是很可惜它沒有阻塞版的介面。於是我們還是只能使用list類型透過其他方式來完成目的。
有個簡單的做法我們可以只設定一個佇列,並保證它是依照優先權排序號的。然後透過二分找出法找出一個任務合適的位置,並透過 lset 指令插入到對應的位置。
例如佇列裡麵包含著寫優先權的任務[1, 3, 6, 8, 9, 14],當有個優先權為7的任務過來,我們透過自己的二分演算法一個個從佇列裡面取資料出來反和目標資料比對,計算出對應的位置然後插入到指定地點即可。
因為二分查找是比較快的,而redis本身也都在記憶體中,理論上速度是可以保證的。但是如果說資料量確實很大的話我們也可以透過一些方式來調優。
回想我們第三種方案,把第三種方案結合起來就會很大程度減少開銷。例如資料量十萬的佇列,它們的優先權也是隨機0-十萬的區間。我們可以設定 10個或100個不同的佇列,0-一萬的優先權任務投放到1號佇列,一萬-二萬的任務投放到2號佇列。這樣將一個佇列以不同等級拆分後它單一佇列的資料 就減少許多,這樣二分查找匹配的效率也會高一點。但是資料所佔的資源基本上是不變的,十萬資料該佔多少記憶體還是多少。只是系統裡面多了一些隊列而已。
以上是如何實作redis佇列優先權程式碼實例詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!