ホームページ  >  記事  >  データベース  >  Redis がキューのブロック、遅延、パブリッシュ、サブスクリプションを実装する方法

Redis がキューのブロック、遅延、パブリッシュ、サブスクリプションを実装する方法

WBOY
WBOY転載
2022-05-23 12:15:043235ブラウズ

この記事では、Redis に関する関連知識を提供します。主に、キューのブロック、遅延、パブリッシュ、サブスクリプションの実装方法に関する関連問題を紹介します。一緒に見てみましょう。皆さんのご協力を願っています。 。

Redis がキューのブロック、遅延、パブリッシュ、サブスクリプションを実装する方法

推奨学習: Redis ビデオ チュートリアル

Redis はキャッシュ サーバーとしてだけでなく、メッセージ キューとしても使用できます。そのリスト タイプは本質的にメッセージ キューとしての使用をサポートしています。以下の図に示すように、
Redis がキューのブロック、遅延、パブリッシュ、サブスクリプションを実装する方法

#Redis リストは二重リンクリストを使用して実装されているため、先頭ノードと末尾ノードが保存されるため、その両側の要素を挿入または取得します。リストの先頭と末尾は非常に高速で、時間計算量は O(1) です。

通常のキュー

Redis リスト データ型を直接使用して、lpush と rpop、または rpush と lpop という 2 つの簡単な命令だけでメッセージ キューを実装できます。

    lpush rpop: left-in および right-out キュー
  • rpush lpop: left-out および right-in キュー
以下では、通常のキューをシミュレートする redis コマンド。

lpush コマンドを使用してメッセージを生成します:

>lpush queue:single 1"1">lpush queue:single 2"2">lpush queue:single 3"3"
rpop コマンドを使用してメッセージを消費します:

>rpop queue:single"1">rpop queue:single"2">rpop queue:single"3"
以下では、Java コードを使用して共通キューを実装します。

Producer SingleProducer

package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class SingleProducer {

    public static final String SINGLE_QUEUE_NAME = "queue:single";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i Consumer SingleConsumer: <p></p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;import java.util.Objects;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class SingleConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
            if(Objects.nonNull(message)) {
                System.out.println(message);
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }}
上記のコードは基本的に通常のキューの生成と消費を実現していますが、上記の例ではメッセージのコンシューマが存在します。

    消費者は、redis リストに処理対象のデータ (メッセージ) があるかどうかを確認するために、常に rpop メソッドを呼び出す必要があります。接続は呼び出されるたびに開始されます。リストにデータがない可能性があるため、多数の空のポーリングが発生し、不要な無駄が発生します。 Thread.sleep() やその他のメソッドを使用して、一定期間後にコンシューマ スレッドが再び消費できるようにすることもできます。スリープ時間が長すぎると、一部の時間に敏感なメッセージが処理できなくなります。スリープ時間が短すぎる場合は、場合、接続上での比較も発生し、大きなオーバーヘッドが発生します。
  1. プロデューサーの速度がコンシューマーの消費速度よりも速い場合、メッセージ キューの長さは増加し続け、時間の経過とともに多くのメモリ領域を占有することになります。
ブロッキングキュー

コンシューマーは brpop 命令を使用して、redis リストからデータを取得できます。この命令は、要素がある場合にのみ返されます。要素がない場合は、タイムアウトになるまでブロックされ、 null を返す. したがって、コンシューマーはデータを取得するためにスリープする必要はありません。これは、ブロッキング キューを実装するのと同等です。

redis の brpop コマンドを使用して、ブロッキング キューをシミュレートします。

>brpop queue:single 30
コマンド ラインが brpop でブロックされており、データがなければ 30 秒後に戻ることがわかります。

Java コードは次のように実装されます。

プロデューサーは通常のキューのプロデューサーと同じです。

Consumer BlockConsumer:

package com.morris.redis.demo.queue.block;import redis.clients.jedis.Jedis;import java.util.List;/**
 * 消费者
 */public class BlockConsumer {

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        while (true) {
            // 超时时间为1s
            List<string> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);
            if (null != messageList && !messageList.isEmpty()) {
                System.out.println(messageList);
            }
        }
    }}</string>
短所: 1 つの生産と複数の消費は実現できません。

パブリッシュおよびサブスクライブ モード

メッセージ キューのサポートを提供することに加えて、Redis はパブリッシュ/サブスクライブ モードをサポートする一連のコマンドも提供します。 Redis のパブリッシュ/サブスクライブ モードを使用すると、1 回生成して複数回消費するキューを実装できます。

Publish: PUBLISH 命令を使用してメッセージを公開できます。形式:

PUBLISH channel message
戻り値は、メッセージの購読者数を示します。

サブスクリプション: SUBSCRIBE 命令はメッセージを受信するために使用されます。形式:

SUBSCRIBE channel
SUBSCRIBE 命令を使用した後、サブスクリプション モードに入りますが、その前にパブリッシュによって送信されたメッセージは受信されません。これは、サブスクリプションのみが送信されるまでメッセージを受信しないためです。このモードの他のコマンドについては、応答のみが表示されます。

返信は 3 つのタイプに分類されます。

    subscribe の場合、2 番目の値は購読したチャネルを示し、3 番目の値は購読したチャネルの数を示します
  1. メッセージ (メッセージ) の場合、2 番目の値はメッセージを生成したチャネル、3 番目の値はメッセージです。
  2. unsubscribe の場合、2 番目の値は購読を中止するチャネルを表します。 3 番目の値はメッセージを生成したチャネルで、現在のクライアントのサブスクリプション番号を表します。
以下では、redis コマンドを使用してパブリッシュ/サブスクライブ モードをシミュレートします。

プロデューサー:

127.0.0.1:6379> publish queue hello(integer) 1127.0.0.1:6379> publish queue hi(integer) 1
コンシューマー:

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "queue"3) (integer) 11) "message"2) "queue"3) "hello"1) "message"2) "queue"3) "hi"
Java コードは次のように実装されます:

プロデューサー PubsubProducer:

package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class PubsubProducer {

    public static final String PUBSUB_QUEUE_NAME = "queue:pubsub";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i コンシューマー PubsubConsumer : <p></p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/**
 * 消费者
 */public class PubsubConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();

        JedisPubSub jedisPubSub = new JedisPubSub() {

            @Override
            public void onMessage(String channel, String message) {
                System.out.println("receive message: " + message);
                if(message.indexOf("99") > -1) {
                    this.unsubscribe();
                }
            }

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println("subscribe channel: " + channel);
            }

            @Override
            public void onUnsubscribe(String channel, int subscribedChannels) {
                System.out.println("unsubscribe channel " + channel);
            }
        };

        jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);
    }}
コンシューマは複数のコンシューマを起動でき、各コンシューマはすべてのメッセージを受信できます。

コマンド UNSUBSCRIBE を使用して購読を解除できます。パラメーターを追加しない場合、SUBSCRIBE コマンドによって購読されたすべてのチャネルが購読解除されます。

Redis は、コマンド PSUBSCRIBE (パターン サブスクライブ) を使用した、ワイルドカードに基づくメッセージ サブスクリプションもサポートしています。例:

psubscribe channel.*
PSUBSCRIBE コマンドでサブスクライブされたチャネルは、サブスクライブを解除するためにコマンド PUNSUBSCRIBE コマンドも使用する必要があります。このコマンドは、SUBSCRIBE によって購読されたチャネルから購読を解除することはできません。同様に、UNSUBSCRIBE は、PSUBSCRIBE コマンドによって購読されたチャネルから購読を解除することはできません。

同時に、PUNSUBSCRIBE 命令のワイルドカードは展開されません。例:

PUNSUBSCRIBE \*channel.\* と一致しないため、channel.\* の購読を解除するには、PUBSUBSCRIBE チャネルを記述する必要があります。 \*

Redis の pub/sub には欠点もあります。つまり、コンシューマーがオフラインになると、プロデューサーのメッセージが失われます。

延时队列和优先级队列

Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。

如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。

如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。

下面使用redis的zset来模拟延时队列。

生产者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3(integer) 0

消费者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores1) "order1"2) "1"127.0.0.1:6379> zrem queue:delay order1(integer) 1

Java代码如下:

生产者DelayProducer:

package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import java.util.Date;import java.util.Random;/**
 * 生产者
 */public class DelayProducer {

    public static final String DELAY_QUEUE_NAME = "queue:delay";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        long now = new Date().getTime();
        Random random = new Random();
        for (int i = 0; i <p>消费者:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import redis.clients.jedis.Tuple;import java.util.Date;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class DelayConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            long now = new Date().getTime();
            Set<tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
            if(tupleSet.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(500);
            } else {
                for (Tuple tuple : tupleSet) {
                    Double score = tuple.getScore();
                    long time = score.longValue();
                    if(time <h2>应用场景</h2>
<ul>
<li>延时队列可用于订单超时失效的场景</li>
<li>二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。</li>
</ul>
<p>推荐学习:<a href="https://www.php.cn/course/list/54.html" target="_blank">Redis视频教程</a></p></tuple>

以上がRedis がキューのブロック、遅延、パブリッシュ、サブスクリプションを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はcsdn.netで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。