首頁 >資料庫 >Redis >淺談Redis中訊息佇列和延時訊息佇列的實作方法

淺談Redis中訊息佇列和延時訊息佇列的實作方法

青灯夜游
青灯夜游轉載
2021-12-10 10:02:012732瀏覽

Redis如何實作訊息佇列與延時訊息佇列?以下這篇文章跟大家介紹一下Redis中訊息佇列和延時訊息佇列的實作方法,希望對大家有幫助!

淺談Redis中訊息佇列和延時訊息佇列的實作方法

提到redis,更多的可能想到用作快取的用途,其實redis也可以實作一些簡單的訊息佇列用途,我們可以使用list 資料結構實作佇列。 【相關推薦:Redis影片教學

#list的幾個指令

lpush (left push)

由佇列的左邊存放進去

rpush (right push)

由隊列的右邊存放進去

lpop  (left pop)

由隊列的左邊取出來

rpop (right pop)

由佇列的右邊取出來

以上的四個指令,可以讓list 幫我們實作佇列或棧,佇列的特性是先進先出,堆疊的特性是先進後出,

所以佇列的實作可以使用lpush rpop 或rpush lpop,

堆疊的實作則是lpush lpop 或rpush rpop。

淺談Redis中訊息佇列和延時訊息佇列的實作方法

使用指令來示範佇列

#生產者發布訊息

#首先我們使用rpush 對一個叫做notify-queue的隊列,增加五個元素,即1 2 3 4 5,也就是作為生產者發布消息啦

淺談Redis中訊息佇列和延時訊息佇列的實作方法

消費者消費訊息

既然生產者使用的是rpush,那麼消費者就要用lpop,可以看下圖,我們不停對notify -queue進行訊息消費,而且是按照順序的,從1一直到5,按順序讀出,最終隊列中沒有訊息了,彈出的則一直是空

淺談Redis中訊息佇列和延時訊息佇列的實作方法

空輪詢問題

在上面使用lpop消費訊息時,可以看到,訊息消費完後,我們每次再去pop時,讀到的都是一個空的訊息,

上面是手動執行指令,但如果是寫好的程式碼程式不停的去pop資料(拉取資料)的話,會造成空輪詢(無用的讀取),

既拉高了客戶端的CPU消耗,又拉高了redis的QPS,並且還是無用操作,這些無用操作可能會造成其他客戶端對redis的存取變得響應緩慢。

解決方案A (休眠)

既然空輪詢會讓客戶端和redis的資源消耗都會變得較高,那麼我們可以讓客戶端在​​收到空資料的時候,進行1s的休眠,1s後再進行資料拉取,這樣可以降低消耗

Thread.sleep(1000)

#這個方案也是存在瑕疵的,即訊息消費延遲性增大了,如果只有一個消費者的話,延遲就是1s,即空輪詢後,正好休眠了,但是這時候剛好有消息過來了,還是要等到1s醒來後才能消費,

如果有多個消費者的話,由於每個消費者的睡眠時間是岔開的,會降低一些延遲性,但是有沒有辦法更好的方法,可以做到幾乎0 延遲?

解決方案B (阻塞讀取)

#redis中關於佇列取資料其實還有兩個指令,也就是阻塞讀取,

blpop (blocking left pop)

brpop (blocking right pop)

阻塞讀在佇列沒有資料的時候,會進入休眠狀態,一旦有消息來了以後,則立刻做出反應,讀取數據,因此使用blpop/brpop 替換lpop/rpop 則可以解決訊息延遲性的問題,

繼續入隊3個屬性,6、7、8

淺談Redis中訊息佇列和延時訊息佇列的實作方法

使用blpop 進行隊列的讀取,最後一個參數是阻塞讀的等待時間,如果超過這個時間還沒有消息,將會返回nil,此時可以繼續重複blpop操作,

淺談Redis中訊息佇列和延時訊息佇列的實作方法

阻塞讀取的空閒連線自動斷開問題

客戶端使用阻塞讀取時,如果阻塞的時間過長,服務一般會當成空閒連接,從而對其進行主動斷開,減少無用的連接佔用資源,這個時候客戶端會拋出異常,

所以請注意,在客戶端使用阻塞讀取的時候,要進行異常的捕獲,從而做出相應的處理,例如重試。

java客戶端實作訊息佇列

想法和上述一樣,只是由命令列客戶端redis-cli變成了java語言,一個線程或多個線程進行rpush 的發布,

另外一個或多個執行緒進行blpop 消費,完成的程式碼在:https://github.com/qiaomengnan16/redis-demo/tree/main/redis-queue

#發布者

淺談Redis中訊息佇列和延時訊息佇列的實作方法

訂閱者

淺談Redis中訊息佇列和延時訊息佇列的實作方法

延時佇列的實作想法

延時佇列指的是,訊息發送的一段時間後,再由消費者進行消費,而不是發送過去後,消費者就能立即讀取到,

zset的可以幫我們做到這個事情,首先zset可以透過score進行排序,score可以存一個時間戳,所以我們每次發布訊息的時候,用當前時間戳加上延時的時間戳,

隨後消費者取訊息的時候,透過截取zset的數據,取到已經滿足當前時間的訊息(即取score小於等於當前時間戳記的數據,score小於等於當前時間戳表示訊息已經到時間了,如果大於的話,表示還得等一會兒才能消費)。

關鍵指令zadd (發佈者),zrangebyscore(訂閱者),zrem (訂閱者消費完資料後刪除)

指令實作

#我們使用zadd增加了4個數據,分別是1、2、3秒(偽說法,這裡其實只是個score)後才能消費的數據,還有一個10秒後才能消費的kafka,

淺談Redis中訊息佇列和延時訊息佇列的實作方法

假如現在已經到了第三秒,我們取zset中大於等於1秒的和小於等於3秒的數據,因為這個區間的數據正好是我們可以消費的,可以看到,我們取出了符合條件的3條數據,

淺談Redis中訊息佇列和延時訊息佇列的實作方法

如果每次只能消費一個數據的話,可以加一個limit限制條件,可以看下圖取出了第一個可以消費的數據,redis

淺談Redis中訊息佇列和延時訊息佇列的實作方法

同時注意,和list的lpop/和blpop不同(它們彈出即自動刪除原始隊列裡的該數據),

雖然獲取到了數據,但是如果不使用zrem進行刪除的話,這條數據還會被其他人讀到,因為他還一直存在zset中,

不過zrem可能會發生已經被別人搶先一步刪除(消費)的情況,所以程式碼中還需要根據zrem的回傳值是否大於0判斷,本次訊息我們是否搶佔成功,成功後再進行正確消費。

程式碼實作

發布者

1淺談Redis中訊息佇列和延時訊息佇列的實作方法

訂閱者

1淺談Redis中訊息佇列和延時訊息佇列的實作方法

測試延遲效果

1淺談Redis中訊息佇列和延時訊息佇列的實作方法

完整程式碼位址: https://github.com/qiaomengnan16/redis-demo/tree/main/redis-delayed-queue

優化, 使用lua實作

#上面實現的延遲佇列中,有一個問題,就是使用zrem判斷是否搶到這個資料時,很有可能沒有搶到,這樣繼續進行讀取,可能幾輪都搶不到,資源白白浪費了,所以可以透過lua腳本,來進行最佳化,

讓zrangebyscore 和zrem成為一個原子化操作,這可以避免多執行緒爭搶,搶不到的資源浪費了。

1淺談Redis中訊息佇列和延時訊息佇列的實作方法

1淺談Redis中訊息佇列和延時訊息佇列的實作方法

結語

一些專業的隊列中間件,應用起來會比較複雜並且增加維運成本,例如RabbitMQ,發訊息前需要建立Exchange交換機,再建立Queue,隨後Exchange和Queue要進行綁定,發訊息的時候還要指定routing-key 才能和Exchange匹配,最終到達Queue中,

如果場景簡單的話,可以使用redis實作一個佇列,但是需要注意,redis沒有專業佇列的特性,沒有ack的保證,也就是說訊息是不可靠的,消費失敗後,就沒有了,如果需要百分之百的可靠性,還是需要採用專業的佇列中間件的ack等機製作為保障。

更多程式相關知識,請造訪:程式設計入門! !

以上是淺談Redis中訊息佇列和延時訊息佇列的實作方法的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:juejin.cn。如有侵權,請聯絡admin@php.cn刪除