ホームページ  >  記事  >  データベース  >  Redis のパブリッシュとサブスクライブを実装する方法

Redis のパブリッシュとサブスクライブを実装する方法

王林
王林転載
2023-06-02 14:37:561662ブラウズ

このようなビジネス シナリオがあるとします。Web サイトで注文して支払いを行った後、配送処理のために在庫サービスに通知する必要があります。

上記業務の実現は難しくなく、該当店舗に棚卸しサービスを提供し、注文・決済後に棚卸しサービスを呼び出すだけです。

Redis のパブリッシュとサブスクライブを実装する方法

後からポイントサービスなどの新規事業があれば、注文の決済結果を取得してユーザーのポイントを増やす必要があります。

これは実装は難しくありません。ポイント サービスにもインターフェイスが用意されており、注文と支払いが完了したら、在庫サービスを呼び出すだけです。

Redis のパブリッシュとサブスクライブを実装する方法

注文の支払い結果を取得する必要がある企業が 2 社のみの場合、プログラムを変更するのは比較的簡単です。しかし、ビジネスが発展し続けるにつれて、注文と支払いを行う必要がある新しいビジネスがますます増えています。

現時点では、上記のシステム アーキテクチャには多くの問題があることがわかります:

まず、注文支払いビジネスは他のビジネスと密接に結びついています。決済結果により、注文決済業務の変更が必要となります。

第 2 に、呼び出されるサービスが多すぎると、注文支払いインターフェイスの応答時間が長くなります。同期により注文決済インターフェースの応答が長くなる理由の 1 つは、下流インターフェースの応答が遅くなることです。

3 番目に、ダウンストリーム インターフェイスで障害が発生すると、データの不整合が生じる可能性があります。たとえば、次の図では、最初に A が呼び出され、次に成功後に B が呼び出され、最後に C が呼び出されます。

Redis のパブリッシュとサブスクライブを実装する方法

B インターフェイスの呼び出し時に例外が発生した場合、注文支払いインターフェイスが返らない可能性がありますが、この時点では実際には A インターフェイスが正常に呼び出されています。つまり、注文の支払いが成功した結果は内部で処理されました。

これにより、3 つのダウンストリーム インターフェイス A、B、C が生成されます。A は決済結果の取得に成功しましたが、B と C は決済結果の取得に失敗し、3 つのシステムのシステム データに不整合が発生します。

実際、よく考えてみると、注文決済ビジネスの場合、通知する仕組みがあれば、実際には下流側の呼び出し結果を気にする必要はありません。

そういえば、今日導入する必要があるパブリッシュとサブスクライブのメカニズムを導入する必要があります。

Redis のパブリッシュとサブスクライブ

Redis は、「パブリッシュ/サブスクライブ」モデルに基づいたメッセージ メカニズムを提供します。このモデルでは、メッセージ パブリッシャーとサブスクライバーが直接通信する必要はありません。

Redis のパブリッシュとサブスクライブを実装する方法

上の図に示すように、メッセージ発行者は指定されたチャネルにメッセージを発行するだけでよく、チャネルに登録しているすべてのクライアントがメッセージを受信できます。

Redis パブリッシュおよびサブスクライブ メカニズムを使用すると、上記のビジネスの場合、注文支払いビジネスは Payment Result チャネルにメッセージを送信するだけでよく、他の下流ビジネスは チャネルにサブスクライブします。支払い結果 このチャネルでは、対応するメッセージを受信し、業務処理を実行できます。

このようにして、システムの上流と下流の間の呼び出し関係を分離できます。

次に、Redis のパブリッシュおよびサブスクライブ機能の使用方法を見てみましょう。

Redis は、パターンに従ってメッセージのパブリッシュ、チャネルのサブスクライブ、サブスクライブ解除、およびサブスクライブに使用できる一連のコマンドを提供します。

まず、メッセージを公開する方法を見てみましょう。実際は非常に簡単です。publish コマンドを使用するだけです:

publish channel message

Redis のパブリッシュとサブスクライブを実装する方法

#上の図では、publish コマンドを使用して、pay_result チャネルにメッセージを送信します。 redis が 0 を返していることがわかりますが、これは実際には現在のサブスクライバー数を表しており、現時点ではサブスクリプションがないため、返される結果は 0 です。

次に、subscribe を使用して 1 つ以上のチャンネルを購読します

subscribe channel [channel ...]

Redis のパブリッシュとサブスクライブを実装する方法

上の図に示すように、購読しますpay_result このチャネル、他のクライアントがこのチャネルにメッセージを送信すると、

Redis のパブリッシュとサブスクライブを実装する方法

現在の購読者がメッセージを受信します。

Redis のパブリッシュとサブスクライブを実装する方法

subscription コマンドを使用するときは、次の点に注意する必要があります。

まず、クライアントが subscription コマンドを実行すると、サブスクリプション状態に入ります。受信できるコマンドは subscribepsubscribeunsubscribepunsubscribe の 4 つだけです。

Redis のパブリッシュとサブスクライブを実装する方法

第二,新订阅的客户端,是无法收到这个频道之前的消息,这是因为 Redis 并不会对发布的消息持久化的。

相比于很多专业 MQ,比如 kafka、rocketmq 来说, redis 发布订阅功能就显得有点简陋了。如果当前的使用场景可以容忍这些缺点,那么简单优秀的 redis 发布订阅功能值得选择。

除了上面的功能以外的,Redis 还支持模式匹配的订阅方式。简单来说,客户端可以订阅一个带 * 号的模式,如果某些频道的名字与这个模式匹配,那么当其他客户端发送给消息给这些频道时,订阅这个模式的客户端也将会到收到消息。

使用 Redis 订阅模式,我们需要使用一个新的指令 psubscribe

我们执行下面这个指令:

psubscribe pay.*

那么一旦有其他客户端往 pay 开头的频道,比如 pay_resultpay_xxx,我们都可以收到消息。

Redis のパブリッシュとサブスクライブを実装する方法

如果需要取消订阅模式,我们需要使用相应punsubscribe 指令,比如取消上面订阅的模式:

punsubscribe pay.*

Redis 客户端发布订阅使用方式

基于 Jedis 开发发布/订阅

聊完 Redis 发布订阅指令,我们来看下 Java Redis 客户端如何使用发布订阅。

下面的例子主要基于 Jedis,maven 版本为:

<dependency>
 <groupid>redis.clients</groupid>
 <artifactid>jedis</artifactid>
 <version>3.1.0</version>
</dependency>

其他 Redis 客户端大同小异。

jedis 发布代码比较简单,只需要调用 Jedis 类的 publish 方法。

// 生产环境千万不要这么使用哦,推荐使用 JedisPool 线程池的方式 
Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("xxxxx");
jedis.publish("pay_result", "hello world");

订阅的代码就相对复杂了,我们需要继承 JedisPubSub 实现里面的相关方法,一旦有其他客户端往订阅的频道上发送消息,将会调用 JedisPubSub 相应的方法。

private static class MyListener extends JedisPubSub {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("收到订阅频道:" + channel + " 消息:" + message);

    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println("收到具体订阅频道:" + channel + "订阅模式:" + pattern + " 消息:" + message);
    }

}

其次我们需要调用 Jedis 类的 subscribe 方法:

Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("xxx");
jedis.subscribe(new MyListener(), "pay_result");

当有其他客户端往 pay_result频道发送消息时,订阅将会收到消息。

Redis のパブリッシュとサブスクライブを実装する方法

不过需要注意的是,jedis#subscribe 是一个阻塞方法,调用之后将会阻塞主线程的,所以如果需要在正式项目使用需要使用异步线程运行,这里就不演示具体的代码了。

基于 Spring-Data-Redis 开发发布订阅

原生 jedis 发布订阅操作,相对来说还是有点复杂。现在我们很多应用已经基于 SpringBoot 开发,使用 spring-boot-starter-data-redis ,可以简化发布订阅开发。

首先我们需要引入相应的 startter 依赖:

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-data-redis</artifactid>
    <exclusions>
        <exclusion>
            <artifactid>lettuce-core</artifactid>
            <groupid>io.lettuce</groupid>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupid>redis.clients</groupid>
    <artifactid>jedis</artifactid>
</dependency>

这里我们使用 Jedis 当做底层连接客户端,所以需要排除 lettuce,然后引入 Jedis 依赖。

然后我们需要创建一个消息接收类,里面需要有方法消费消息:

@Slf4j
public class Receiver {
    private AtomicInteger counter = new AtomicInteger();

    public void receiveMessage(String message) {
        log.info("Received ");
        counter.incrementAndGet();
    }

    public int getCount() {
        return counter.get();
    }
}

接着我们只需要注入 Spring- Redis 相关 Bean,比如:

  • StringRedisTemplate,用来操作 Redis 命令

  • MessageListenerAdapter ,消息监听器,可以在这个类注入我们上面创建消息接受类 Receiver

  • RedisConnectionFactory, 创建 Redis 底层连接

@Configuration
public class MessageConfiguration {

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 订阅指定频道使用 ChannelTopic
        // 订阅模式使用 PatternTopic
        container.addMessageListener(listenerAdapter, new ChannelTopic("pay_result"));

        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        // 注入 Receiver,指定类中的接受方法
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }

    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }

}

最后我们使用 StringRedisTemplate#convertAndSend 发送消息,同时 Receiver 将会收到一条消息。

@SpringBootApplication
public class MessagingRedisApplication {
    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(MessagingRedisApplication.class, args);

        StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);
        Receiver receiver = ctx.getBean(Receiver.class);

        while (receiver.getCount() == 0) {
            template.convertAndSend("pay_result", "Hello from Redis!");
            Thread.sleep(500L);
        }

        System.exit(0);
    }
}

Redis のパブリッシュとサブスクライブを実装する方法

Redis 发布订阅实际应用

Redis Sentinel 节点发现

Redis Sentinel 是 Redis 一套高可用方案,可以在主节点故障的时候,自动将从节点提升为主节点,从而转移故障。

今天这里我们不详细解释 Redis Sentinel 详细原理,主要来看下 Redis Sentinel 如何使用发布订阅机制。

Redis Sentinel 节点主要使用发布订阅机制,实现新节点的发现,以及交换主节点的之间的状态。

如下所示,每一个 Sentinel 节点将会定时向 _sentinel_:hello 频道发送消息,并且每个 Sentinel 都会订阅这个节点。

Redis のパブリッシュとサブスクライブを実装する方法

这样一旦有节点往这个频道发送消息,其他节点就可以立刻收到消息。

这样一旦有的新节点加入,它往这个频道发送消息,其他节点收到之后,判断本地列表并没有这个节点,于是就可以当做新的节点加入本地节点列表。

除此之外,每次往这个频道发送消息内容可以包含节点的状态信息,这样可以作为后面 Sentinel 领导者选举的依据。

以上都是对于 Redis 服务端来讲,对于客户端来讲,我们也可以用到发布订阅机制。

Redis Sentinel 进行主节点故障转移,这个过程各个阶段会通过发布订阅对外提供。

对于我们客户端来讲,比较关心切换之后的主节点,这样我们及时切换主节点的连接(旧节点此时已故障,不能再接受操作指令),

客户端可以订阅 +switch-master频道,一旦 Redis Sentinel 结束了对主节点的故障转移就会发布主节点的的消息。

redission 分布式锁

redission 开源框架提供一些便捷操作 Redis 的方法,其中比较出名的 redission 基于 Redis 的实现分布式锁。

今天我们来看下 Redis 的实现分布式锁中如何使用 Redis 发布订阅机制,提高加锁的性能。

PS:redission 分布式锁实现原理,可以参考之前写过的文章:

  1. 可重入分布式锁的实现方式

  2. Redis 分布式锁,看似简单,其实真不简单

首先我们来看下 redission 加锁的方法:

Redisson redisson = ....
RLock redissonLock = redisson.getLock("xxxx");
redissonLock.lock();

RLock 继承自 Java 标准的 Lock 接口,调用 lock 方法,如果当前锁已被其他客户端获取,那么当前加锁的线程将会被阻塞,直到其他客户端释放这把锁。

这里其实有个问题,当前阻塞的线程如何感知分布式锁已被释放呢?

这里其实有两种实现方法:

第一钟,定时查询分布时锁的状态,一旦查到锁已被释放(Redis 中不存在这个键值),那么就去加锁。

实现伪码如下:

while (true) {
  boolean result=lock();
  if (!result) {
    Thread.sleep(N);
  }
}

这种方式实现起来起来简单,不过缺点也比较多。

如果定时任务时间过短,将会导致查询次数过多,其实这些都是无效查询。

如果定时任务休眠时间过长,那又会导致加锁时间过长,导致加锁性能不好。

那么第二种实现方案,就是采用服务通知的机制,当分布式锁被释放之后,客户端可以收到锁释放的消息,然后第一时间再去加锁。

这个服务通知的机制我们可以使用 Redis 发布订阅模式。

当线程加锁失败之后,线程将会订阅 redisson_lock__channel_xxx(xx 代表锁的名称) 频道,使用异步线程监听消息,然后利用 Java 中 Semaphore 使当前线程进入阻塞。

一旦其他客户端进行解锁,redission 就会往这个redisson_lock__channel_xxx 发送解锁消息。

等异步线程收到消息,将会调用 Semaphore 释放信号量,从而让当前被阻塞的线程唤醒去加锁。

ps:这里只是简单描述了 redission 加锁部分原理,出于篇幅,这里就不再消息解析源码。

感兴趣的小伙伴可以自己看下 redission 加锁的源码。

通过发布订阅机制,被阻塞的线程可以及时被唤醒,减少无效的空转的查询,有效的提高的加锁的效率。

ps: 这种方式,性能确实提高,但是实现起来的复杂度也很高,这部分源码有点东西,快看晕了。

以上がRedis のパブリッシュとサブスクライブを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。