Rumah >pangkalan data >Redis >Cara redis melaksanakan penyekatan baris gilir, kelewatan, penerbitan dan langganan
Artikel ini membawakan anda pengetahuan yang berkaitan tentang Redis Terutamanya memperkenalkan isu berkaitan tentang cara melaksanakan penyekatan baris gilir, penangguhan, penerbitan dan langganan Mari kita lihat bersama-sama.
Pembelajaran yang disyorkan: Tutorial video Redis
Redis bukan sahaja boleh digunakan sebagai pelayan cache, tetapi juga sebagai baris gilir mesej . Jenis senarainya sememangnya menyokong penggunaan sebagai baris gilir mesej. Seperti yang ditunjukkan dalam rajah di bawah:
Memandangkan senarai Redis dilaksanakan menggunakan senarai berganda, nod kepala dan nod ekor disimpan, jadi memasukkan atau mendapatkan elemen pada kedua-dua belah kepala dan ekor senarai adalah Ia sangat pantas dan kerumitan masa ialah O(1).
Anda boleh terus menggunakan jenis data senarai Redis untuk melaksanakan baris gilir mesej, dengan hanya dua arahan mudah: lpush dan rpop atau rpush dan lpop.
Yang berikut menggunakan perintah redis untuk mensimulasikan baris gilir biasa.
Gunakan arahan lpush untuk menghasilkan mesej:
>lpush queue:single 1"1">lpush queue:single 2"2">lpush queue:single 3"3"
Gunakan arahan rpop untuk menggunakan mesej:
>rpop queue:single"1">rpop queue:single"2">rpop queue:single"3"
Yang berikut menggunakan kod Java untuk melaksanakan baris gilir biasa.
Producer SingleProducer
package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;/** * 生产者 */public class SingleProducer { public static final String SINGLE_QUEUE_NAME = "queue:single"; public static void main(String[] args) { Jedis jedis = new Jedis(); for (int i = 0; i <p>Consumer SingleConsumer: </p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;import java.util.Objects;import java.util.concurrent.TimeUnit;/** * 消费者 */public class SingleConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis(); while (true) { String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME); if(Objects.nonNull(message)) { System.out.println(message); } else { TimeUnit.MILLISECONDS.sleep(500); } } }}
Kod di atas pada asasnya telah melaksanakan pengeluaran dan penggunaan baris gilir biasa, tetapi dalam contoh di atas Terdapat dua masalah untuk pengguna mesej:
Pengguna boleh menggunakan arahan brpop untuk mendapatkan data daripada senarai redis Arahan ini hanya akan kembali jika terdapat elemen Jika tidak, ia akan menyekat sehingga tamat masa return null. Oleh itu, pengguna tidak perlu tidur untuk mendapatkan data, yang setara dengan melaksanakan baris gilir menyekat.
Anda dapat melihat bahawa baris arahan disekat di sini dalam brpop dan akan kembali selepas 30 saat tanpa data.>brpop queue:single 30
Kod Java dilaksanakan seperti berikut:
Pengeluar adalah sama dengan pengeluar baris gilir biasa.
Blok PenggunaPengguna:
Kelemahan: Satu pengeluaran dan penggunaan berganda tidak boleh dicapai.package com.morris.redis.demo.queue.block;import redis.clients.jedis.Jedis;import java.util.List;/** * 消费者 */public class BlockConsumer { public static void main(String[] args) { Jedis jedis = new Jedis(); while (true) { // 超时时间为1s List<string> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME); if (null != messageList && !messageList.isEmpty()) { System.out.println(messageList); } } }}</string>
Mod terbitkan dan langgan
Terbitkan: Arahan PUBLISH boleh digunakan untuk menerbitkan mesej, format:
Nilai pulangan menunjukkan bilangan pelanggan kepada mesej.PUBLISH channel message
Langganan: Arahan SUBSCRIBE digunakan untuk menerima mesej Formatnya ialah:
Selepas menggunakan arahan LANGGANAN, anda memasuki mod langganan, tetapi anda tidak akan menerima mesej. dihantar dengan menerbitkan sebelum melanggan Ini Kerana langganan hanya akan menerima mesej sebelum ia dihantar. Untuk arahan lain dalam mod ini, hanya balasan boleh dilihat.SUBSCRIBE channel
Balasan terbahagi kepada tiga jenis:
Jika ia melanggan, nilai kedua mewakili saluran yang dilanggan dan nilai ketiga mewakili bilangan saluran yang dilangganPengeluar:
Pengguna:127.0.0.1:6379> publish queue hello(integer) 1127.0.0.1:6379> publish queue hi(integer) 1Kod Java dilaksanakan seperti berikut:
127.0.0.1:6379> subscribe queue Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "queue"3) (integer) 11) "message"2) "queue"3) "hello"1) "message"2) "queue"3) "hi"
Penerbit PubsubProducer:
Pengguna PubsubPengguna:package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;/** * 生产者 */public class PubsubProducer { public static final String PUBSUB_QUEUE_NAME = "queue:pubsub"; public static void main(String[] args) { Jedis jedis = new Jedis(); for (int i = 0; i <p></p>Pengguna boleh memulakan berbilang pengguna dan setiap pengguna boleh menerima semua mesej. <pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/** * 消费者 */public class PubsubConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis(); JedisPubSub jedisPubSub = new JedisPubSub() { @Override public void onMessage(String channel, String message) { System.out.println("receive message: " + message); if(message.indexOf("99") > -1) { this.unsubscribe(); } } @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println("subscribe channel: " + channel); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println("unsubscribe channel " + channel); } }; jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME); }}
Anda boleh menggunakan perintah UNSUBSCRIBE untuk menyahlanggan Jika tiada parameter ditambahkan, semua saluran yang dilanggan oleh arahan SUBSCRIBE akan dinyahlanggan.
Redis juga menyokong langganan mesej berasaskan kad bebas, gunakan arahan PSUBSCRIBE (langgan corak), contohnya:
Saluran yang dilanggan dengan arahan PSUBSCRIBE juga mesti menggunakan arahan PUNSUBSCRIBE untuk berhenti melanggan. Perintah tidak boleh berhenti melanggan saluran yang dilanggan oleh LANGGAN Begitu juga, UNSUBSCRIBE tidak boleh menyahlanggan saluran yang dilanggan oleh arahan PSUBSCRIBE.psubscribe channel.*
Pada masa yang sama, kad liar arahan PUNSUBSCRIBE tidak akan dikembangkan. Contohnya:
tidak akan sepadan dengan, jadi untuk menyahlanggan PUNSUBSCRIBE *
anda perlu menulis channel.*
seperti ini. channel.*
PUBSUBSCRIBE channel.*
Pub/sub Redis juga mempunyai kelemahannya, iaitu, jika pengguna pergi ke luar talian, mesej pengeluar akan hilang.
Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。
如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。
如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。
下面使用redis的zset来模拟延时队列。
生产者:
127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3(integer) 0
消费者:
127.0.0.1:6379> zrange queue:delay 0 0 withscores1) "order1"2) "1"127.0.0.1:6379> zrem queue:delay order1(integer) 1
Java代码如下:
生产者DelayProducer:
package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import java.util.Date;import java.util.Random;/** * 生产者 */public class DelayProducer { public static final String DELAY_QUEUE_NAME = "queue:delay"; public static void main(String[] args) { Jedis jedis = new Jedis(); long now = new Date().getTime(); Random random = new Random(); for (int i = 0; i <p>消费者:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import redis.clients.jedis.Tuple;import java.util.Date;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;/** * 消费者 */public class DelayConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis(); while (true) { long now = new Date().getTime(); Set<tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0); if(tupleSet.isEmpty()) { TimeUnit.MILLISECONDS.sleep(500); } else { for (Tuple tuple : tupleSet) { Double score = tuple.getScore(); long time = score.longValue(); if(time <h2>应用场景</h2> <ul> <li>延时队列可用于订单超时失效的场景</li> <li>二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。</li> </ul> <p>推荐学习:<a href="https://www.php.cn/course/list/54.html" target="_blank">Redis视频教程</a></p></tuple>
Atas ialah kandungan terperinci Cara redis melaksanakan penyekatan baris gilir, kelewatan, penerbitan dan langganan. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!