Rumah >pangkalan data >Redis >Cara redis melaksanakan penyekatan baris gilir, kelewatan, penerbitan dan langganan

Cara redis melaksanakan penyekatan baris gilir, kelewatan, penerbitan dan langganan

WBOY
WBOYke hadapan
2022-05-23 12:15:043422semak imbas

Artikel ini membawakan anda pengetahuan yang berkaitan tentang Redis Terutamanya memperkenalkan isu berkaitan tentang cara melaksanakan penyekatan baris gilir, penangguhan, penerbitan dan langganan Mari kita lihat bersama-sama.

Cara redis melaksanakan penyekatan baris gilir, kelewatan, penerbitan dan langganan

Pembelajaran yang disyorkan: Tutorial video Redis

Redis bukan sahaja boleh digunakan sebagai pelayan cache, tetapi juga sebagai baris gilir mesej . Jenis senarainya sememangnya menyokong penggunaan sebagai baris gilir mesej. Seperti yang ditunjukkan dalam rajah di bawah:
Cara redis melaksanakan penyekatan baris gilir, kelewatan, penerbitan dan langganan

Memandangkan senarai Redis dilaksanakan menggunakan senarai berganda, nod kepala dan nod ekor disimpan, jadi memasukkan atau mendapatkan elemen pada kedua-dua belah kepala dan ekor senarai adalah Ia sangat pantas dan kerumitan masa ialah O(1).

Baris Gilir Biasa

Anda boleh terus menggunakan jenis data senarai Redis untuk melaksanakan baris gilir mesej, dengan hanya dua arahan mudah: lpush dan rpop atau rpush dan lpop.

  • lpush rpop: beratur dengan kiri masuk dan kanan keluar
  • rpush lpop: baris gilir dengan kiri masuk dan kanan dalam

Yang berikut menggunakan perintah redis untuk mensimulasikan baris gilir biasa.
Gunakan arahan lpush untuk menghasilkan mesej:

>lpush queue:single 1"1">lpush queue:single 2"2">lpush queue:single 3"3"

Gunakan arahan rpop untuk menggunakan mesej:

>rpop queue:single"1">rpop queue:single"2">rpop queue:single"3"

Yang berikut menggunakan kod Java untuk melaksanakan baris gilir biasa.

Producer SingleProducer

package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class SingleProducer {

    public static final String SINGLE_QUEUE_NAME = "queue:single";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i <p>Consumer SingleConsumer: </p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;import java.util.Objects;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class SingleConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
            if(Objects.nonNull(message)) {
                System.out.println(message);
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }}

Kod di atas pada asasnya telah melaksanakan pengeluaran dan penggunaan baris gilir biasa, tetapi dalam contoh di atas Terdapat dua masalah untuk pengguna mesej:

  1. Pengguna perlu sentiasa menghubungi kaedah rpop untuk menyemak sama ada terdapat data (mesej) yang belum selesai dalam senarai redis. Sambungan akan dimulakan setiap kali ia dipanggil Mungkin tiada data dalam senarai, mengakibatkan sejumlah besar undian kosong dan pembaziran yang tidak perlu. Mungkin anda boleh menggunakan Thread.sleep() dan kaedah lain untuk membenarkan urutan pengguna digunakan semula selepas tempoh masa Jika masa tidur terlalu lama, sesetengah mesej sensitif masa tidak boleh diproses Jika masa tidur terlalu singkat , ia juga akan menyebabkan perbandingan pada sambungan.
  2. Jika kelajuan pengeluar lebih besar daripada kelajuan penggunaan pengguna, panjang baris gilir mesej akan terus meningkat, yang akan menduduki banyak ruang memori dari semasa ke semasa.

Menyekat baris gilir

Pengguna boleh menggunakan arahan brpop untuk mendapatkan data daripada senarai redis Arahan ini hanya akan kembali jika terdapat elemen Jika tidak, ia akan menyekat sehingga tamat masa return null. Oleh itu, pengguna tidak perlu tidur untuk mendapatkan data, yang setara dengan melaksanakan baris gilir menyekat.

Anda dapat melihat bahawa baris arahan disekat di sini dalam brpop dan akan kembali selepas 30 saat tanpa data.
>brpop queue:single 30

Kod Java dilaksanakan seperti berikut:

Pengeluar adalah sama dengan pengeluar baris gilir biasa.

Blok PenggunaPengguna:

Kelemahan: Satu pengeluaran dan penggunaan berganda tidak boleh dicapai.
package com.morris.redis.demo.queue.block;import redis.clients.jedis.Jedis;import java.util.List;/**
 * 消费者
 */public class BlockConsumer {

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        while (true) {
            // 超时时间为1s
            List<string> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);
            if (null != messageList && !messageList.isEmpty()) {
                System.out.println(messageList);
            }
        }
    }}</string>

Mod terbitkan dan langgan

Selain menyediakan sokongan untuk baris gilir mesej, Redis juga menyediakan satu set arahan untuk menyokong mod terbitkan/langgan. Menggunakan mod pub/sub Redis, anda boleh melaksanakan baris gilir yang menghasilkan sekali dan menggunakan beberapa kali.

Terbitkan: Arahan PUBLISH boleh digunakan untuk menerbitkan mesej, format:

Nilai pulangan menunjukkan bilangan pelanggan kepada mesej.
PUBLISH channel message

Langganan: Arahan SUBSCRIBE digunakan untuk menerima mesej Formatnya ialah:

Selepas menggunakan arahan LANGGANAN, anda memasuki mod langganan, tetapi anda tidak akan menerima mesej. dihantar dengan menerbitkan sebelum melanggan Ini Kerana langganan hanya akan menerima mesej sebelum ia dihantar. Untuk arahan lain dalam mod ini, hanya balasan boleh dilihat.
SUBSCRIBE channel

Balasan terbahagi kepada tiga jenis:

Jika ia melanggan, nilai kedua mewakili saluran yang dilanggan dan nilai ketiga mewakili bilangan saluran yang dilanggan
  1. Jika ia adalah mesej (mesej), nilai kedua ialah saluran yang menghasilkan mesej, dan nilai ketiga ialah mesej
  2. Jika ia berhenti melanggan, nilai kedua mewakili saluran untuk berhenti melanggan, dan nilai ketiga ialah Nilai mewakili nombor langganan pelanggan semasa.
  3. Yang berikut menggunakan arahan redis untuk mensimulasikan mod terbitkan-langganan.

Pengeluar:

Pengguna:
127.0.0.1:6379> publish queue hello(integer) 1127.0.0.1:6379> publish queue hi(integer) 1

Kod Java dilaksanakan seperti berikut:
127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "queue"3) (integer) 11) "message"2) "queue"3) "hello"1) "message"2) "queue"3) "hi"

Penerbit PubsubProducer:

Pengguna PubsubPengguna:
package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class PubsubProducer {

    public static final String PUBSUB_QUEUE_NAME = "queue:pubsub";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i <p></p>Pengguna boleh memulakan berbilang pengguna dan setiap pengguna boleh menerima semua mesej. <pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/**
 * 消费者
 */public class PubsubConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();

        JedisPubSub jedisPubSub = new JedisPubSub() {

            @Override
            public void onMessage(String channel, String message) {
                System.out.println("receive message: " + message);
                if(message.indexOf("99") > -1) {
                    this.unsubscribe();
                }
            }

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println("subscribe channel: " + channel);
            }

            @Override
            public void onUnsubscribe(String channel, int subscribedChannels) {
                System.out.println("unsubscribe channel " + channel);
            }
        };

        jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);
    }}

Anda boleh menggunakan perintah UNSUBSCRIBE untuk menyahlanggan Jika tiada parameter ditambahkan, semua saluran yang dilanggan oleh arahan SUBSCRIBE akan dinyahlanggan.

Redis juga menyokong langganan mesej berasaskan kad bebas, gunakan arahan PSUBSCRIBE (langgan corak), contohnya:

Saluran yang dilanggan dengan arahan PSUBSCRIBE juga mesti menggunakan arahan PUNSUBSCRIBE untuk berhenti melanggan. Perintah tidak boleh berhenti melanggan saluran yang dilanggan oleh LANGGAN Begitu juga, UNSUBSCRIBE tidak boleh menyahlanggan saluran yang dilanggan oleh arahan PSUBSCRIBE.
psubscribe channel.*

Pada masa yang sama, kad liar arahan PUNSUBSCRIBE tidak akan dikembangkan. Contohnya:

tidak akan sepadan dengan

, jadi untuk menyahlanggan PUNSUBSCRIBE * anda perlu menulis channel.* seperti ini. channel.*PUBSUBSCRIBE channel.*Pub/sub Redis juga mempunyai kelemahannya, iaitu, jika pengguna pergi ke luar talian, mesej pengeluar akan hilang.

延时队列和优先级队列

Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。

如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。

如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。

下面使用redis的zset来模拟延时队列。

生产者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3(integer) 0

消费者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores1) "order1"2) "1"127.0.0.1:6379> zrem queue:delay order1(integer) 1

Java代码如下:

生产者DelayProducer:

package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import java.util.Date;import java.util.Random;/**
 * 生产者
 */public class DelayProducer {

    public static final String DELAY_QUEUE_NAME = "queue:delay";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        long now = new Date().getTime();
        Random random = new Random();
        for (int i = 0; i <p>消费者:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import redis.clients.jedis.Tuple;import java.util.Date;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class DelayConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            long now = new Date().getTime();
            Set<tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
            if(tupleSet.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(500);
            } else {
                for (Tuple tuple : tupleSet) {
                    Double score = tuple.getScore();
                    long time = score.longValue();
                    if(time <h2>应用场景</h2>
<ul>
<li>延时队列可用于订单超时失效的场景</li>
<li>二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。</li>
</ul>
<p>推荐学习:<a href="https://www.php.cn/course/list/54.html" target="_blank">Redis视频教程</a></p></tuple>

Atas ialah kandungan terperinci Cara redis melaksanakan penyekatan baris gilir, kelewatan, penerbitan dan langganan. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:csdn.net. Jika ada pelanggaran, sila hubungi admin@php.cn Padam