웹사이트에서 주문하고 결제한 후 배송 처리를 위해 재고 서비스에 알려야 하는 비즈니스 시나리오가 있다고 가정해 보겠습니다.
위 사업을 구현하는 것은 어렵지 않습니다. 주문하고 결제한 후 재고 서비스를 호출하기만 하면 됩니다.
추후 포인트 서비스 등 새로운 사업이 있을 경우 주문 결제 결과를 받아 사용자의 포인트를 늘려야 합니다.
이것은 구현하기 어렵지 않습니다. 포인트 서비스는 주문하고 결제한 후 재고 서비스에 전화하기만 하면 됩니다.
주문 결제 결과를 받아야 하는 업체가 두 곳뿐이라면 프로그램을 수정하는 것이 비교적 쉽습니다. 그러나 비즈니스가 계속 발전함에 따라 점점 더 많은 신규 비즈니스에서 주문하고 결제해야 합니다.
이때 위의 시스템 아키텍처에는 많은 문제가 있음을 알 수 있습니다.
첫째, 주문 결제 사업은 결제 결과가 필요한 새로운 사업이 있을 때마다 다른 사업과 밀접하게 결합되어 있습니다. 변경됩니다.
둘째, 비즈니스 통화가 너무 많으면 주문 결제 인터페이스의 응답 시간이 길어집니다. 동기화로 인해 주문결제 인터페이스의 응답이 길어지는 이유 중 하나는 다운스트림 인터페이스의 응답이 느려지기 때문입니다.
셋째, 다운스트림 인터페이스가 실패하면 데이터 불일치가 발생할 수 있습니다. 예를 들어 아래 그림에서는 A가 먼저 호출되고 성공 후 B가 호출되고 마지막으로 C가 호출됩니다.
B 인터페이스를 호출할 때 예외가 발생하면 주문 결제 인터페이스가 실패를 반환할 수 있습니다. 그러나 이때 실제로는 A 인터페이스가 성공적으로 호출되었으므로 내부적으로 결과가 처리되었음을 의미합니다. 성공적인 주문 결제.
이로 인해 세 개의 다운스트림 인터페이스 A, B, C가 발생합니다. A는 성공적으로 결제 결과를 얻었지만 B와 C는 결제 결과를 얻지 못하여 세 시스템의 데이터에 불일치가 발생합니다.
사실 잘 생각해보면, 주문 결제 사업의 경우, 이를 통보할 수 있는 메커니즘이 있는 한 다운스트림 호출 결과에 실제로 신경 쓸 필요는 없습니다.
이 시점에서 오늘 도입해야 할 게시 및 구독 메커니즘을 소개해야 합니다.
Redis는 "게시/구독" 모델을 기반으로 하는 메시징 메커니즘을 제공합니다. 이 모델에서는 메시지 게시자와 구독자가 직접 통신할 필요가 없습니다.
위 그림과 같이 메시지 게시자는 지정된 채널에 메시지를 게시하기만 하면 되며 채널을 구독하는 모든 클라이언트는 이 메시지를 받을 수 있습니다.
Redis 게시 및 구독 메커니즘을 사용하면 위 비즈니스의 경우 주문 결제 비즈니스는 Payment Result 채널로 메시지를 보내기만 하면 되며 다른 하위 비즈니스는 Payment Result 채널을 구독하여 해당 메시지와 비즈니스 처리만 하세요.
이런 방식으로 시스템의 업스트림과 다운스트림 간의 호출 관계를 분리할 수 있습니다.
다음으로 Redis 게시 및 구독 기능을 사용하는 방법을 살펴보겠습니다.
Redis는 패턴에 따라 메시지 게시, 채널 구독, 구독 취소 및 구독에 사용할 수 있는 명령 세트를 제공합니다.
먼저 메시지를 게시하는 방법을 살펴보겠습니다. publish 명령을 사용하면 됩니다.
publish channel message
위 그림에서는 publish 명령을 사용하여 메시지를 보냅니다. pay_result 채널로 이동하세요. redis가 실제로 현재 구독자 수를 나타내는 0을 반환하는 것을 볼 수 있습니다. 현재 구독이 없으므로 반환 결과는 0입니다.
다음으로 subscribe를 사용하여 하나 이상의 채널을 구독합니다.
subscribe channel [channel ...]
위 그림에 표시된 대로 다른 클라이언트가 이 채널에 메시지를 보내면
이 채널을 구독합니다. 현재 구독자에게 메시지가 전송됩니다.
subscription 명령을 사용할 때 다음 사항에 주의해야 합니다.
먼저 클라이언트가 구독 명령을 실행한 후 구독 상태로 들어가고 그 다음에는
subscribe만 수신할 수 있습니다. , psubscribe, unsubscribe , punsubscribe 이 네 가지 명령입니다.
第二,新订阅的客户端,是无法收到这个频道之前的消息,这是因为 Redis 并不会对发布的消息持久化的。
相比于很多专业 MQ,比如 kafka、rocketmq 来说, redis 发布订阅功能就显得有点简陋了。如果当前的使用场景可以容忍这些缺点,那么简单优秀的 redis 发布订阅功能值得选择。
除了上面的功能以外的,Redis 还支持模式匹配的订阅方式。简单来说,客户端可以订阅一个带 *
号的模式,如果某些频道的名字与这个模式匹配,那么当其他客户端发送给消息给这些频道时,订阅这个模式的客户端也将会到收到消息。
使用 Redis 订阅模式,我们需要使用一个新的指令 psubscribe。
我们执行下面这个指令:
psubscribe pay.*
那么一旦有其他客户端往 pay 开头的频道,比如 pay_result
、pay_xxx
,我们都可以收到消息。
如果需要取消订阅模式,我们需要使用相应punsubscribe
指令,比如取消上面订阅的模式:
punsubscribe pay.*
聊完 Redis 发布订阅指令,我们来看下 Java Redis 客户端如何使用发布订阅。
下面的例子主要基于 Jedis,maven 版本为:
<dependency> <groupid>redis.clients</groupid> <artifactid>jedis</artifactid> <version>3.1.0</version> </dependency>其他 Redis 客户端大同小异。
jedis 发布代码比较简单,只需要调用 Jedis
类的 publish
方法。
// 生产环境千万不要这么使用哦,推荐使用 JedisPool 线程池的方式 Jedis jedis = new Jedis("localhost", 6379); jedis.auth("xxxxx"); jedis.publish("pay_result", "hello world");
订阅的代码就相对复杂了,我们需要继承 JedisPubSub
实现里面的相关方法,一旦有其他客户端往订阅的频道上发送消息,将会调用 JedisPubSub
相应的方法。
private static class MyListener extends JedisPubSub { @Override public void onMessage(String channel, String message) { System.out.println("收到订阅频道:" + channel + " 消息:" + message); } @Override public void onPMessage(String pattern, String channel, String message) { System.out.println("收到具体订阅频道:" + channel + "订阅模式:" + pattern + " 消息:" + message); } }
其次我们需要调用 Jedis
类的 subscribe
方法:
Jedis jedis = new Jedis("localhost", 6379); jedis.auth("xxx"); jedis.subscribe(new MyListener(), "pay_result");
当有其他客户端往 pay_result
频道发送消息时,订阅将会收到消息。
不过需要注意的是,jedis#subscribe
是一个阻塞方法,调用之后将会阻塞主线程的,所以如果需要在正式项目使用需要使用异步线程运行,这里就不演示具体的代码了。
原生 jedis 发布订阅操作,相对来说还是有点复杂。现在我们很多应用已经基于 SpringBoot 开发,使用 spring-boot-starter-data-redis
,可以简化发布订阅开发。
首先我们需要引入相应的 startter 依赖:
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis</artifactid> <exclusions> <exclusion> <artifactid>lettuce-core</artifactid> <groupid>io.lettuce</groupid> </exclusion> </exclusions> </dependency> <dependency> <groupid>redis.clients</groupid> <artifactid>jedis</artifactid> </dependency>
这里我们使用 Jedis 当做底层连接客户端,所以需要排除 lettuce,然后引入 Jedis 依赖。
然后我们需要创建一个消息接收类,里面需要有方法消费消息:
@Slf4j public class Receiver { private AtomicInteger counter = new AtomicInteger(); public void receiveMessage(String message) { log.info("Received "); counter.incrementAndGet(); } public int getCount() { return counter.get(); } }
接着我们只需要注入 Spring- Redis 相关 Bean,比如:
StringRedisTemplate
,用来操作 Redis 命令
MessageListenerAdapter
,消息监听器,可以在这个类注入我们上面创建消息接受类 Receiver
RedisConnectionFactory
, 创建 Redis 底层连接
@Configuration public class MessageConfiguration { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 订阅指定频道使用 ChannelTopic // 订阅模式使用 PatternTopic container.addMessageListener(listenerAdapter, new ChannelTopic("pay_result")); return container; } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { // 注入 Receiver,指定类中的接受方法 return new MessageListenerAdapter(receiver, "receiveMessage"); } @Bean Receiver receiver() { return new Receiver(); } @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } }
最后我们使用 StringRedisTemplate#convertAndSend
发送消息,同时 Receiver
将会收到一条消息。
@SpringBootApplication public class MessagingRedisApplication { public static void main(String[] args) throws InterruptedException { ApplicationContext ctx = SpringApplication.run(MessagingRedisApplication.class, args); StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class); Receiver receiver = ctx.getBean(Receiver.class); while (receiver.getCount() == 0) { template.convertAndSend("pay_result", "Hello from Redis!"); Thread.sleep(500L); } System.exit(0); } }
Redis Sentinel 是 Redis 一套高可用方案,可以在主节点故障的时候,自动将从节点提升为主节点,从而转移故障。
今天这里我们不详细解释 Redis Sentinel 详细原理,主要来看下 Redis Sentinel 如何使用发布订阅机制。
Redis Sentinel 节点主要使用发布订阅机制,实现新节点的发现,以及交换主节点的之间的状态。
如下所示,每一个 Sentinel 节点将会定时向 _sentinel_:hello
频道发送消息,并且每个 Sentinel 都会订阅这个节点。
这样一旦有节点往这个频道发送消息,其他节点就可以立刻收到消息。
这样一旦有的新节点加入,它往这个频道发送消息,其他节点收到之后,判断本地列表并没有这个节点,于是就可以当做新的节点加入本地节点列表。
除此之外,每次往这个频道发送消息内容可以包含节点的状态信息,这样可以作为后面 Sentinel 领导者选举的依据。
以上都是对于 Redis 服务端来讲,对于客户端来讲,我们也可以用到发布订阅机制。
当 Redis Sentinel 进行主节点故障转移,这个过程各个阶段会通过发布订阅对外提供。
对于我们客户端来讲,比较关心切换之后的主节点,这样我们及时切换主节点的连接(旧节点此时已故障,不能再接受操作指令),
客户端可以订阅 +switch-master
频道,一旦 Redis Sentinel 结束了对主节点的故障转移就会发布主节点的的消息。
redission 开源框架提供一些便捷操作 Redis 的方法,其中比较出名的 redission 基于 Redis 的实现分布式锁。
今天我们来看下 Redis 的实现分布式锁中如何使用 Redis 发布订阅机制,提高加锁的性能。
PS:redission 分布式锁实现原理,可以参考之前写过的文章:
可重入分布式锁的实现方式
Redis 分布式锁,看似简单,其实真不简单
首先我们来看下 redission 加锁的方法:
Redisson redisson = .... RLock redissonLock = redisson.getLock("xxxx"); redissonLock.lock();
RLock
继承自 Java 标准的 Lock
接口,调用 lock
方法,如果当前锁已被其他客户端获取,那么当前加锁的线程将会被阻塞,直到其他客户端释放这把锁。
这里其实有个问题,当前阻塞的线程如何感知分布式锁已被释放呢?
这里其实有两种实现方法:
第一钟,定时查询分布时锁的状态,一旦查到锁已被释放(Redis 中不存在这个键值),那么就去加锁。
实现伪码如下:
while (true) { boolean result=lock(); if (!result) { Thread.sleep(N); } }
这种方式实现起来起来简单,不过缺点也比较多。
如果定时任务时间过短,将会导致查询次数过多,其实这些都是无效查询。
如果定时任务休眠时间过长,那又会导致加锁时间过长,导致加锁性能不好。
那么第二种实现方案,就是采用服务通知的机制,当分布式锁被释放之后,客户端可以收到锁释放的消息,然后第一时间再去加锁。
这个服务通知的机制我们可以使用 Redis 发布订阅模式。
当线程加锁失败之后,线程将会订阅 redisson_lock__channel_xxx
(xx 代表锁的名称) 频道,使用异步线程监听消息,然后利用 Java 中 Semaphore
使当前线程进入阻塞。
一旦其他客户端进行解锁,redission 就会往这个redisson_lock__channel_xxx
发送解锁消息。
等异步线程收到消息,将会调用 Semaphore
释放信号量,从而让当前被阻塞的线程唤醒去加锁。
ps:这里只是简单描述了 redission 加锁部分原理,出于篇幅,这里就不再消息解析源码。
感兴趣的小伙伴可以自己看下 redission 加锁的源码。
通过发布订阅机制,被阻塞的线程可以及时被唤醒,减少无效的空转的查询,有效的提高的加锁的效率。
ps: 这种方式,性能确实提高,但是实现起来的复杂度也很高,这部分源码有点东西,快看晕了。
위 내용은 Redis 게시 및 구독을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!