首頁 >資料庫 >Redis >淺析Redis中怎麼使用訊息佇列

淺析Redis中怎麼使用訊息佇列

青灯夜游
青灯夜游轉載
2022-01-05 09:57:352772瀏覽

這篇文章帶大家了解Redis進階用法-訊息隊列,介紹一下Redis中的延時隊列,希望對大家有幫助!

淺析Redis中怎麼使用訊息佇列

說到訊息佇列中間件,我們都會想到RabbitMQ、RocketMQ和Kafka,來給予應用程式實現非同步訊息傳遞的功能。這些都是專業的訊息佇列中間件,其特性之多超出了我們的理解能力。

而這些訊息中間件使用起來的是複雜的,例如RabbitMQ,發送訊息之前要建立Exchange,還要建立Queue,然後將Exchange和Queue透過某種規則綁定起來,發送訊息的時候也要製定routing-key,也要控制頭訊息。這僅是生產者,消費者在消費訊息之前也要將上面一連串的繁瑣步驟再操作一次。

那麼對於那些不要求百分百可靠,並且希望實現簡單的訊息佇列需求時,我們可以透過Redis將我們從訊息佇列的中間件的繁瑣步驟中解脫出來。

Redis的訊息佇列不是專業的訊息佇列,他並沒有訊息佇列中許多的高階特性,也沒有ack保證。如果對訊息的可靠性有著極致的追求,請轉向專業的MQ中間件。 【相關建議:Redis影片教學

異步訊息佇列

從最簡單的非同步訊息佇列開始,Redis的list資料結構常用來作為非同步訊息佇列,透過lrpush/lpush來操作入列,透過rpop/lpop來出列。

淺析Redis中怎麼使用訊息佇列

問題一:空佇列

#對於pop操作來說,當訊息佇列空了的時候,客戶端會陷入pop的死循環,造成大量的浪費生命的空輪詢,導致客戶端CPU拉高,同時Redis的QPS也被拉高。

對於以上問題的解決方法就是透過list結構的blpop/brpop來操作出列,其中b前綴代表的就是blocking,阻塞讀取。對於阻塞讀在佇列沒有資料的時候就會進入休眠狀態,一旦資料到來就會立刻醒來。完美的解決了上面這個問題。

問題二:空閒連線斷開

阻塞讀取的方案看似完美,緊接著引出了另一個問題:空閒連結。如果執行緒一直阻塞在哪裡,Redis的客戶端連線就變成了空閒連線。空閒時間過長,Redis伺服器就會主動斷開連接,以減少閒置資源佔用。這時候blpop/brpop就會拋出異常來。

所以,我們在編寫客戶端(應用程式)消費者的時候需要小心,注意捕獲異常,並進行重試。

應用一:延時佇列

在Redis的分散式鎖定中一般有三種策略來處理加鎖失敗的情況:

  • 直接拋出異常,前端提醒使用者是否要繼續操作;

  • sleep一會再重試;

  • 將請求放到延時佇列中,一會再重試;

而Redis中延時佇列,我們可以透過zset(有序列表)資料結構來實現。我們將訊息序列化作為一個字串作為zse的value,而訊息的到期處理時間(延時時間)作為score。然後透過輪詢zset取得到期時間進行處理,透過zrem將key從zset移除代表成功消費,進而處理任務。

核心程式碼如下:

// 生产\
public void delay(T msg) {\
  TaskItem task = new TaskItem();\
  task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid\
  task.msg = msg;\
  String s = JSON.toJSONString(task); // fastjson 序列化\
  jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试\
}\
// 消费\
public void loop() {\
  while (!Thread.interrupted()) {\
   // zrangeByScore参数中0, System.currentTimeMills()代表从redis中去score范围在0到系统当前时间的数据, 0,1表示从0开始取1个 拓展传入的score为-inf, +inf 分别表示zset中的最大值和最小值,当你不知道zset中的score最值时就可以使用inf作为参数变量\
   Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);\
   if (values.isEmpty()) {\
     try {\
       Thread.sleep(500); // 歇会继续\
    }\
     catch (InterruptedException e) {\
       break;\
    }\
     continue;\
  }\
   String s = values.iterator().next();  //消费队列\
   if (jedis.zrem(queueKey, s) > 0) { // 抢到了,要考虑到多线程下锁争抢的情况,只有rem成功代表成功的消费了一条消息。\
     TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化\
     this.handleMsg(task.msg);\
  }\
}\
}

以上的程式碼在多執行緒中對於同一個任務被多個執行緒爭搶的情況,雖然能夠透過zrem後在處理任務來避免一個任務被多次消費的情況。但是對於那些獲取到了任務但是沒有成功消費的線程來說,都是白白浪費時間取了一次任務。所以可以考慮透過lua scripting來優化這個邏輯。將zrangeByScore和zrem一同挪到伺服器進行原子操作,就能夠完美解決了。

更多程式相關知識,請造訪:程式設計入門! !

以上是淺析Redis中怎麼使用訊息佇列的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:juejin.cn。如有侵權,請聯絡admin@php.cn刪除