Maison > Article > base de données > Comment implémenter une interaction Redis réactive au printemps
Cet article simulera un service utilisateur et utilisera Redis comme serveur de stockage de données.
Implique deux beans Java, des utilisateurs et des droits
public class User { private long id; private String name; // 标签 private String label; // 收货地址经度 private Double deliveryAddressLon; // 收货地址维度 private Double deliveryAddressLat; // 最新签到日 private String lastSigninDay; // 积分 private Integer score; // 权益 private List<Rights> rights; ... } public class Rights { private Long id; private Long userId; private String name; ... }
Introduire des dépendances
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
Ajouter une configuration Redis
spring.redis.host=192.168.56.102 spring.redis.port=6379 spring.redis.password= spring.redis.timeout=5000
Démarrage de SpringBoot
@SpringBootApplication public class UserServiceReactive { public static void main(String[] args) { new SpringApplicationBuilder( UserServiceReactive.class) .web(WebApplicationType.REACTIVE).run(args); } }
Après le démarrage de l'application, Spring générera automatiquement ReactiveRedisTemplate (son framework sous-jacent est Lettuce).
ReactiveRedisTemplate est similaire à RedisTemplate, mais il fournit une méthode d'interaction Redis asynchrone et réactive.
Je voudrais souligner à nouveau que la programmation réactive est asynchrone. ReactiveRedisTemplate ne bloquera pas le thread après l'envoi d'une requête Redis et le thread actuel peut effectuer d'autres tâches.
Une fois les données de réponse Redis renvoyées, ReactiveRedisTemplate planifie ensuite le thread pour traiter les données de réponse.
La programmation réactive peut implémenter des appels asynchrones et traiter les résultats asynchrones de manière élégante, ce qui est sa plus grande importance.
La sérialisation par défaut utilisée par ReactiveRedisTemplate est la sérialisation Jdk. Nous pouvons la configurer comme méthode de sérialisation json
@Bean public RedisSerializationContext redisSerializationContext() { RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext(); builder.key(StringRedisSerializer.UTF_8); builder.value(RedisSerializer.json()); builder.hashKey(StringRedisSerializer.UTF_8); builder.hashValue(StringRedisSerializer.UTF_8); return builder.build(); } @Bean public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) { RedisSerializationContext serializationContext = redisSerializationContext(); ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext); return reactiveRedisTemplate; }
builder.hashValue pour spécifier la méthode de sérialisation de la valeur de la liste Redis puisque la valeur de la liste Redis dans cet article ne stocke que les chaînes. , Il est donc toujours défini sur StringRedisSerializer.UTF_8.
ReactiveRedisTemplate prend en charge les types de données de base tels que les chaînes Redis, les hachages, les listes, les ensembles, les ensembles ordonnés, etc.
Cet article utilise des hachages pour enregistrer les informations utilisateur et des listes pour enregistrer les droits des utilisateurs. Cet article ne développe pas l'utilisation d'autres types de données de base. La méthode
public Mono<Boolean> save(User user) { ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash(); Mono<Boolean> userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user)); if(user.getRights() != null) { ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList(); opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> { logger.info("add rights:{}", l); }); } return userRs; }
beanToMap est responsable de la conversion de la classe User en carte.
La structure Redis HyperLogLog peut compter le nombre d'éléments différents dans une collection.
Utilisez HyperLogLog pour compter le nombre d'utilisateurs se connectant chaque jour
public Mono<Long> login(User user) { ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog(); return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId()); }
Redis BitMap (bitmap) utilise un Bit pour représenter la valeur ou l'état correspondant à un élément. Étant donné que Bit est la plus petite unité de stockage informatique, son utilisation pour le stockage permettra d'économiser de l'espace.
Utilisez BitMap pour enregistrer si l'utilisateur s'est enregistré cette semaine
public void addSignInFlag(long userId) { String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16); redisTemplate.opsForValue().setBit( key, userId & 0xffff , true) .subscribe(b -> logger.info("set:{},result:{}", key, b)); }
Les 48 bits élevés de userId sont utilisés pour diviser les utilisateurs en différentes clés, et les 16 bits faibles sont utilisés comme décalage du paramètre de décalage bitmap.
Le paramètre offset doit être supérieur ou égal à 0 et inférieur à 2^32 (le bit mapping est limité à 512 Mo).
Redis Geo peut stocker des informations de localisation géographique et calculer la localisation géographique.
Si vous souhaitez trouver des informations sur l'entrepôt dans une plage donnée
public Flux getWarehouseInDist(User u, double dist) { ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo(); Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist); RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending(); return geo.radius("warehouse:address", circle, args); }
warehouse:address
Dans cette collection, vous devez d'abord enregistrer les informations de localisation géographique de l'entrepôt. warehouse:address
这个集合中需要先保存好仓库地理位置信息。
ReactiveGeoOperations#radius方法可以查找集合中地理位置在给定范围内的元素,它中还支持添加元素到集合,计算集合中两个元素地理位置距离等操作。
ReactiveRedisTemplate也可以执行Lua脚本。
下面通过Lua脚本完成用户签到逻辑:如果用户今天未签到,允许签到,积分加1,如果用户今天已签到,则拒接操作。
public Flux<String> addScore(long userId) { DefaultRedisScript<String> script = new DefaultRedisScript<>(); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua"))); List<String> keys = new ArrayList<>(); keys.add(String.valueOf(userId)); keys.add(LocalDateTime.now().toString().substring(0, 10)); return redisTemplate.execute(script, keys); }
signin.lua内容如下
local score=redis.call('hget','user:'..KEYS[1],'score') local day=redis.call('hget','user:'..KEYS[1],'lastSigninDay') if(day==KEYS[2]) then return '0' else redis.call('hset','user:'..KEYS[1],'score', score+1,'lastSigninDay',KEYS[2]) return '1' end
Redis Stream 是 Redis 5.0 版本新增加的数据类型。该类型可以实现消息队列,并提供消息的持久化和主备复制功能,并且可以记住每一个客户端的访问位置,还能保证消息不丢失。
Redis借鉴了kafka的设计,一个Stream内可以存在多个消费组,一个消费组内可以存在多个消费者。
如果一个消费组内某个消费者消费了Stream中某条消息,则这消息不会被该消费组其他消费者消费到,当然,它还可以被其他消费组中某个消费者消费到。
下面定义一个Stream消费者,负责处理接收到的权益数据
@Component public class RightsStreamConsumer implements ApplicationRunner, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class); @Autowired private RedisConnectionFactory redisConnectionFactory; private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container; // Stream队列 private static final String STREAM_KEY = "stream:user:rights"; // 消费组 private static final String STREAM_GROUP = "user-service"; // 消费者 private static final String STREAM_CONSUMER = "consumer-1"; @Autowired @Qualifier("reactiveRedisTemplate") private ReactiveRedisTemplate redisTemplate; public void run(ApplicationArguments args) throws Exception { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(100) //一批次拉取的最大count数 .executor(Executors.newSingleThreadExecutor()) //线程池 .pollTimeout(Duration.ZERO) //阻塞式轮询 .targetType(Rights.class) //目标类型(消息内容的类型) .build(); // 创建一个消息监听容器 container = StreamMessageListenerContainer.create(redisConnectionFactory, options); // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP) .subscribe(stream -> { // 为Stream创建一个消费者,并绑定处理类 container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER), StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()), new StreamMessageListener()); container.start(); }); } @Override public void destroy() throws Exception { container.stop(); } // 查找Stream信息,如果不存在,则创建Stream private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) { // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。 return ops.info(stream).onErrorResume(err -> { logger.warn("query stream err:{}", err.getMessage()); // createGroup方法创建Stream return ops.createGroup(stream, group).flatMap(s -> ops.info(stream)); }); } // 消息处理对象 class StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> { public void onMessage(ObjectRecord<String, Rights> message) { // 处理消息 RecordId id = message.getId(); Rights rights = message.getValue(); logger.info("receive id:{},rights:{}", id, rights); redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> { logger.info("add rights:{}", l); }); } } }
下面看一下如何发送信息
public Mono<RecordId> addRights(Rights r) { String streamKey = "stream:user:rights";//stream key ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r); Mono<RecordId> mono = redisTemplate.opsForStream().add(record); return mono; }
创建一个消息记录对象ObjectRecord,并通过ReactiveStreamOperations发送信息记录。
ReactiveRedisTemplate也支持Redis Sentinel、Cluster集群模式,只需要调整配置即可。
Sentinel配置如下
spring.redis.sentinel.master=mymaster spring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379 spring.redis.sentinel.password=
spring.redis.sentinel.nodes
配置的是Sentinel节点IP地址和端口,不是Redis实例节点IP地址和端口。
Cluster配置如下
spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379 spring.redis.lettuce.cluster.refresh.period=10000 spring.redis.lettuce.cluster.refresh.adaptive=true
如Redis Cluster中node2是node1的从节点,Lettuce中会缓存该信息,当node1宕机后,Redis Cluster会将node2升级为主节点。但Lettuce不会自动将请求切换到node2,因为它的缓冲没有刷新。
开启spring.redis.lettuce.cluster.refresh.adaptive
La méthode ReactiveGeoOperations#radius peut trouver des éléments de la collection dont l'emplacement géographique se situe dans une plage donnée. Elle prend également en charge des opérations telles que l'ajout d'éléments à la collection et le calcul de la distance géographique entre deux éléments de la collection.
spring.redis.sentinel.nodes
configure l'adresse IP et le port du nœud Sentinel, et non l'adresse IP et le port du nœud d'instance Redis. 🎜🎜La configuration du cluster est la suivante🎜rrreee🎜Par exemple, le nœud2 dans Redis Cluster est le nœud esclave du nœud1, et Lettuce mettra en cache ces informations lorsque le nœud1 tombe en panne, Redis Cluster mettra à niveau le nœud2 vers le nœud maître. Mais Lettuce ne basculera pas automatiquement la requête vers node2 car son tampon n'est pas vidé. 🎜Activez la configuration spring.redis.lettuce.cluster.refresh.adaptive
. Lettuce peut régulièrement actualiser les informations du cache du cluster Redis, modifier dynamiquement l'état du nœud du client et effectuer le basculement. 🎜🎜Il n'existe actuellement aucune solution permettant à ReactiveRedisTemplate d'implémenter un pipeline et des transactions. 🎜Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!