搜尋
首頁資料庫Redisredis怎麼實現隊列的阻塞、延時、發布和訂閱

這篇文章為大家帶來了關於Redis的相關知識,其中主要介紹了關於怎麼實現隊列的阻塞、延時、發布和訂閱的相關問題,下面一起來看一下,希望對大家有幫助。

redis怎麼實現隊列的阻塞、延時、發布和訂閱

推薦學習:Redis視訊教學

#Redis不僅可作為快取伺服器,也可以用作訊息佇列。它的列表類型天生支援用作訊息隊列。如下圖所示:
redis怎麼實現隊列的阻塞、延時、發布和訂閱

由於Redis的列表是使用雙向鍊錶實現的,保存了頭節點和尾節點,所以在列表的頭部和尾部兩邊插入或獲取元素都是非常快的,時間複雜度為O(1)。

普通佇列

可以直接使用Redis的list資料類型實作訊息佇列,只需簡單的兩個指令lpush和rpop或rpush和lpop。

  • lpush rpop:左進右出的佇列
  • rpush lpop:左出右進的佇列

下面使用redis的指令來模擬普通隊列。
使用lpush指令生產訊息:

>lpush queue:single 1"1">lpush queue:single 2"2">lpush queue:single 3"3"

使用rpop指令消費訊息:

>rpop queue:single"1">rpop queue:single"2">rpop queue:single"3"

下面使用Java程式碼來實作普通佇列。

生產者SingleProducer

package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class SingleProducer {

    public static final String SINGLE_QUEUE_NAME = "queue:single";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i <p>消費者SingleConsumer:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;import java.util.Objects;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class SingleConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
            if(Objects.nonNull(message)) {
                System.out.println(message);
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }}

上面的程式碼已經基本實現了普通隊列的生產與消費,但是上述的例子中訊息的消費者有兩個問題:

  1. 消費者需要不停的呼叫rpop方法來查看redis的list中是否有待處理的資料(訊息)。每呼叫一次都會發起一次連接,有可能list中沒有數據,造成大量的空輪詢,導致不必要的浪費。也許你可以使用Thread.sleep()等方法讓消費者線程隔一段時間再消費,如果睡眠時間過長,這樣不能處理一些時效性要求高的消息,睡眠時間過短,也會在連接上造成比較大的開銷。
  2. 如果生產者速度大於消費者消費速度,訊息佇列長度會一直增大,時間久了會佔用大量記憶體空間。

阻塞隊列

消費者可以使用brpop指令從redis的list中獲取數據,這個指令只有在有元素時才返回,沒有則會阻塞直到超時返回null,於是消費端就不需要休眠後取得資料了,這樣就等於實作了一個阻塞佇列,

使用redis的brpop指令來模擬阻塞佇列。

>brpop queue:single 30

可以看到命令列阻塞在了brpop這裡了,30s後沒資料就回傳。

Java程式碼實作如下:

生產者與普通佇列的生產者一致。

消費者BlockConsumer:

package com.morris.redis.demo.queue.block;import redis.clients.jedis.Jedis;import java.util.List;/**
 * 消费者
 */public class BlockConsumer {

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        while (true) {
            // 超时时间为1s
            List<string> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);
            if (null != messageList && !messageList.isEmpty()) {
                System.out.println(messageList);
            }
        }
    }}</string>

缺點:無法實現一次生產多次消費。

發布訂閱模式

Redis除了對訊息佇列提供支援外,還提供了一組命令用於支援發布/訂閱模式。利用Redis的pub/sub模式可以實現一次生產多次消費的隊列。

發佈:PUBLISH指令可用於發布一則訊息,格式:

PUBLISH channel message

傳回值表示訂閱了該訊息的數量。

訂閱:SUBSCRIBE指令用於接收一條訊息,格式:

SUBSCRIBE channel

使用SUBSCRIBE指令後進入了訂閱模式,但是不會接收到訂閱之前publish發送的訊息,這是因為只有在訊息發出去前訂閱才會接收到。在這個模式下其他指令,只能看到回應。

回覆分為三種:

  1. 如果為subscribe,第二個值表示訂閱的頻道,第三個值表示是已訂閱的頻道的數量
  2. 如果為message(訊息),第二個值為產生該訊息的頻道,第三個值為訊息
  3. 如果為unsubscribe,第二個值表示取消訂閱的頻道,第三個值表示目前客戶端的訂閱數量。

下面使用redis的指令來模擬發布訂閱模式。

生產者:

127.0.0.1:6379> publish queue hello(integer) 1127.0.0.1:6379> publish queue hi(integer) 1

消費者:

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "queue"3) (integer) 11) "message"2) "queue"3) "hello"1) "message"2) "queue"3) "hi"

Java程式碼實作如下:

生產者PubsubProducer:

package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class PubsubProducer {

    public static final String PUBSUB_QUEUE_NAME = "queue:pubsub";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i <p>消費者PubsubConsumer:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/**
 * 消费者
 */public class PubsubConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();

        JedisPubSub jedisPubSub = new JedisPubSub() {

            @Override
            public void onMessage(String channel, String message) {
                System.out.println("receive message: " + message);
                if(message.indexOf("99") > -1) {
                    this.unsubscribe();
                }
            }

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println("subscribe channel: " + channel);
            }

            @Override
            public void onUnsubscribe(String channel, int subscribedChannels) {
                System.out.println("unsubscribe channel " + channel);
            }
        };

        jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);
    }}

消費者可以啟動多個,每個消費者都能收到所有的訊息。

可以使用指令UNSUBSCRIBE退訂,如果不加參數,則會取消所有由SUBSCRIBE指令訂閱的頻道。

Redis也支援基於通配符的訊息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:

psubscribe channel.*

用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,指令無法指令退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也無法退訂PSUBSCRIBE指令訂閱的頻道。

同時PUNSUBSCRIBE指令通配符不會展開。例如:PUNSUBSCRIBE \*不會符合到channel.\*,所以要取消訂閱channel.\*就要這樣寫PUBSUBSCRIBE channel. \*

Redis的pub/sub也有其缺點,那就是如果消費者下線,生產者的消息會遺失。

延时队列和优先级队列

Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。

如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。

如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。

下面使用redis的zset来模拟延时队列。

生产者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3(integer) 0

消费者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores1) "order1"2) "1"127.0.0.1:6379> zrem queue:delay order1(integer) 1

Java代码如下:

生产者DelayProducer:

package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import java.util.Date;import java.util.Random;/**
 * 生产者
 */public class DelayProducer {

    public static final String DELAY_QUEUE_NAME = "queue:delay";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        long now = new Date().getTime();
        Random random = new Random();
        for (int i = 0; i <p>消费者:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import redis.clients.jedis.Tuple;import java.util.Date;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class DelayConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            long now = new Date().getTime();
            Set<tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
            if(tupleSet.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(500);
            } else {
                for (Tuple tuple : tupleSet) {
                    Double score = tuple.getScore();
                    long time = score.longValue();
                    if(time <h2 id="应用场景">应用场景</h2>
<ul>
<li>延时队列可用于订单超时失效的场景</li>
<li>二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。</li>
</ul>
<p>推荐学习:<a href="https://www.php.cn/course/list/54.html" target="_blank">Redis视频教程</a></p></tuple>

以上是redis怎麼實現隊列的阻塞、延時、發布和訂閱的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文轉載於:CSDN。如有侵權,請聯絡admin@php.cn刪除
REDIS:探索其功能和功能REDIS:探索其功能和功能Apr 19, 2025 am 12:04 AM

Redis脫穎而出是因為其高速、多功能性和豐富的數據結構。 1)Redis支持字符串、列表、集合、散列和有序集合等數據結構。 2)它通過內存存儲數據,支持RDB和AOF持久化。 3)從Redis6.0開始引入多線程處理I/O操作,提升了高並發場景下的性能。

Redis是SQL還是NOSQL數據庫?答案解釋了Redis是SQL還是NOSQL數據庫?答案解釋了Apr 18, 2025 am 12:11 AM

RedisisclassifiedasaNoSQLdatabasebecauseitusesakey-valuedatamodelinsteadofthetraditionalrelationaldatabasemodel.Itoffersspeedandflexibility,makingitidealforreal-timeapplicationsandcaching,butitmaynotbesuitableforscenariosrequiringstrictdataintegrityo

REDIS:提高應用程序性能和可擴展性REDIS:提高應用程序性能和可擴展性Apr 17, 2025 am 12:16 AM

Redis通過緩存數據、實現分佈式鎖和數據持久化來提升應用性能和可擴展性。 1)緩存數據:使用Redis緩存頻繁訪問的數據,提高數據訪問速度。 2)分佈式鎖:利用Redis實現分佈式鎖,確保在分佈式環境中操作的安全性。 3)數據持久化:通過RDB和AOF機制保證數據安全性,防止數據丟失。

REDIS:探索其數據模型和結構REDIS:探索其數據模型和結構Apr 16, 2025 am 12:09 AM

Redis的數據模型和結構包括五種主要類型:1.字符串(String):用於存儲文本或二進制數據,支持原子操作。 2.列表(List):有序元素集合,適合隊列和堆棧。 3.集合(Set):無序唯一元素集合,支持集合運算。 4.有序集合(SortedSet):帶分數的唯一元素集合,適用於排行榜。 5.哈希表(Hash):鍵值對集合,適合存儲對象。

REDIS:對其數據庫方法進行分類REDIS:對其數據庫方法進行分類Apr 15, 2025 am 12:06 AM

Redis的數據庫方法包括內存數據庫和鍵值存儲。 1)Redis將數據存儲在內存中,讀寫速度快。 2)它使用鍵值對存儲數據,支持複雜數據結構,如列表、集合、哈希表和有序集合,適用於緩存和NoSQL數據庫。

為什麼要使用redis?利益和優勢為什麼要使用redis?利益和優勢Apr 14, 2025 am 12:07 AM

Redis是一個強大的數據庫解決方案,因為它提供了極速性能、豐富的數據結構、高可用性和擴展性、持久化能力以及廣泛的生態系統支持。 1)極速性能:Redis的數據存儲在內存中,讀寫速度極快,適合高並發和低延遲應用。 2)豐富的數據結構:支持多種數據類型,如列表、集合等,適用於多種場景。 3)高可用性和擴展性:支持主從復制和集群模式,實現高可用性和水平擴展。 4)持久化和數據安全:通過RDB和AOF兩種方式實現數據持久化,確保數據的完整性和可靠性。 5)廣泛的生態系統和社區支持:擁有龐大的生態系統和活躍社區,

了解NOSQL:Redis的關鍵特徵了解NOSQL:Redis的關鍵特徵Apr 13, 2025 am 12:17 AM

Redis的關鍵特性包括速度、靈活性和豐富的數據結構支持。 1)速度:Redis作為內存數據庫,讀寫操作幾乎瞬時,適用於緩存和會話管理。 2)靈活性:支持多種數據結構,如字符串、列表、集合等,適用於復雜數據處理。 3)數據結構支持:提供字符串、列表、集合、哈希表等,適合不同業務需求。

REDIS:確定其主要功能REDIS:確定其主要功能Apr 12, 2025 am 12:01 AM

Redis的核心功能是高性能的內存數據存儲和處理系統。 1)高速數據訪問:Redis將數據存儲在內存中,提供微秒級別的讀寫速度。 2)豐富的數據結構:支持字符串、列表、集合等,適應多種應用場景。 3)持久化:通過RDB和AOF方式將數據持久化到磁盤。 4)發布訂閱:可用於消息隊列或實時通信系統。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱工具

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

強大的PHP整合開發環境

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

mPDF

mPDF

mPDF是一個PHP庫,可以從UTF-8編碼的HTML產生PDF檔案。原作者Ian Back編寫mPDF以從他的網站上「即時」輸出PDF文件,並處理不同的語言。與原始腳本如HTML2FPDF相比,它的速度較慢,並且在使用Unicode字體時產生的檔案較大,但支援CSS樣式等,並進行了大量增強。支援幾乎所有語言,包括RTL(阿拉伯語和希伯來語)和CJK(中日韓)。支援嵌套的區塊級元素(如P、DIV),

EditPlus 中文破解版

EditPlus 中文破解版

體積小,語法高亮,不支援程式碼提示功能

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具