이 기사에서는 사용자 서비스를 시뮬레이션하고 Redis를 데이터 스토리지 서버로 사용합니다.
두 개의 Java Bean, 사용자 및 권한이 포함됩니다
public class User { private long id; private String name; // 标签 private String label; // 收货地址经度 private Double deliveryAddressLon; // 收货地址维度 private Double deliveryAddressLat; // 最新签到日 private String lastSigninDay; // 积分 private Integer score; // 权益 private List<Rights> rights; ... } public class Rights { private Long id; private Long userId; private String name; ... }
종속성 소개
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
Redis 구성 추가
spring.redis.host=192.168.56.102 spring.redis.port=6379 spring.redis.password= spring.redis.timeout=5000
SpringBoot 시작
@SpringBootApplication public class UserServiceReactive { public static void main(String[] args) { new SpringApplicationBuilder( UserServiceReactive.class) .web(WebApplicationType.REACTIVE).run(args); } }
애플리케이션이 시작된 후 Spring은 자동으로 ReactiveRedisTemplate을 생성합니다(기본 프레임워크는 Lettuce입니다).
ReactiveRedisTemplate은 RedisTemplate과 유사하지만 비동기식, 반응형 Redis 상호 작용 방법을 제공합니다.
Reactive 프로그래밍은 비동기식이라는 점을 다시 강조하고 싶습니다. ReactiveRedisTemplate은 Redis 요청을 보낸 후 스레드를 차단하지 않으며 현재 스레드는 다른 작업을 수행할 수 있습니다.
Redis 응답 데이터가 반환된 후 ReactiveRedisTemplate은 응답 데이터를 처리하도록 스레드를 예약합니다.
반응형 프로그래밍은 비동기 호출을 구현하고 비동기 결과를 우아한 방식으로 처리할 수 있다는 것이 가장 큰 의미입니다.
ReactiveRedisTemplate에서 사용하는 기본 직렬화는 Jdk 직렬화입니다. 이 문서의 Redis 목록 값은 문자열만 저장하므로 이를 json 직렬화
@Bean public RedisSerializationContext redisSerializationContext() { RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext(); builder.key(StringRedisSerializer.UTF_8); builder.value(RedisSerializer.json()); builder.hashKey(StringRedisSerializer.UTF_8); builder.hashValue(StringRedisSerializer.UTF_8); return builder.build(); } @Bean public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) { RedisSerializationContext serializationContext = redisSerializationContext(); ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext); return reactiveRedisTemplate; }
builder.hashValue 메서드로 구성할 수 있습니다. , 따라서 여전히 StringRedisSerializer.UTF_8로 설정되어 있습니다.
ReactiveRedisTemplate은 Redis 문자열, 해시, 목록, 집합, 순서 집합 등과 같은 기본 데이터 유형을 지원합니다.
이 문서에서는 해시를 사용하여 사용자 정보를 저장하고 목록을 사용하여 사용자 권한을 저장합니다. 이 문서에서는 다른 기본 데이터 유형의 사용에 대해 확장하지 않습니다.
public Mono<Boolean> save(User user) { ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash(); Mono<Boolean> userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user)); if(user.getRights() != null) { ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList(); opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> { logger.info("add rights:{}", l); }); } return userRs; }
beanToMap 메소드는 User 클래스를 맵으로 변환하는 역할을 담당합니다.
Redis HyperLogLog 구조는 컬렉션의 다양한 요소 수를 계산할 수 있습니다.
HyperLogLog를 사용하여 매일 로그인하는 사용자 수 계산
public Mono<Long> login(User user) { ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog(); return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId()); }
Redis BitMap(비트맵)은 Bit를 사용하여 요소에 해당하는 값 또는 상태를 나타냅니다. 비트는 컴퓨터 저장 공간의 가장 작은 단위이므로 이를 저장 공간으로 활용하면 공간이 절약됩니다.
BitMap을 사용하여 이번 주에 사용자가 체크인했는지 기록합니다
public void addSignInFlag(long userId) { String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16); redisTemplate.opsForValue().setBit( key, userId & 0xffff , true) .subscribe(b -> logger.info("set:{},result:{}", key, b)); }
userId의 상위 48비트는 사용자를 여러 키로 나누는 데 사용되고 하위 16비트는 비트맵 오프셋 매개변수 오프셋으로 사용됩니다.
오프셋 매개변수는 0보다 크거나 같고 2^32보다 작아야 합니다(비트 매핑은 512MB로 제한됨).
Redis Geo는 지리적 위치 정보를 저장하고 지리적 위치를 계산할 수 있습니다.
특정 범위 내의 창고 정보를 찾으려면
public Flux getWarehouseInDist(User u, double dist) { ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo(); Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist); RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending(); return geo.radius("warehouse:address", circle, args); }
창고:주소
이 컬렉션에서 먼저 창고 지리적 위치 정보를 저장해야 합니다. warehouse:address
这个集合中需要先保存好仓库地理位置信息。
ReactiveGeoOperations#radius方法可以查找集合中地理位置在给定范围内的元素,它中还支持添加元素到集合,计算集合中两个元素地理位置距离等操作。
ReactiveRedisTemplate也可以执行Lua脚本。
下面通过Lua脚本完成用户签到逻辑:如果用户今天未签到,允许签到,积分加1,如果用户今天已签到,则拒接操作。
public Flux<String> addScore(long userId) { DefaultRedisScript<String> script = new DefaultRedisScript<>(); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua"))); List<String> keys = new ArrayList<>(); keys.add(String.valueOf(userId)); keys.add(LocalDateTime.now().toString().substring(0, 10)); return redisTemplate.execute(script, keys); }
signin.lua内容如下
local score=redis.call('hget','user:'..KEYS[1],'score') local day=redis.call('hget','user:'..KEYS[1],'lastSigninDay') if(day==KEYS[2]) then return '0' else redis.call('hset','user:'..KEYS[1],'score', score+1,'lastSigninDay',KEYS[2]) return '1' end
Redis Stream 是 Redis 5.0 版本新增加的数据类型。该类型可以实现消息队列,并提供消息的持久化和主备复制功能,并且可以记住每一个客户端的访问位置,还能保证消息不丢失。
Redis借鉴了kafka的设计,一个Stream内可以存在多个消费组,一个消费组内可以存在多个消费者。
如果一个消费组内某个消费者消费了Stream中某条消息,则这消息不会被该消费组其他消费者消费到,当然,它还可以被其他消费组中某个消费者消费到。
下面定义一个Stream消费者,负责处理接收到的权益数据
@Component public class RightsStreamConsumer implements ApplicationRunner, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class); @Autowired private RedisConnectionFactory redisConnectionFactory; private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container; // Stream队列 private static final String STREAM_KEY = "stream:user:rights"; // 消费组 private static final String STREAM_GROUP = "user-service"; // 消费者 private static final String STREAM_CONSUMER = "consumer-1"; @Autowired @Qualifier("reactiveRedisTemplate") private ReactiveRedisTemplate redisTemplate; public void run(ApplicationArguments args) throws Exception { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(100) //一批次拉取的最大count数 .executor(Executors.newSingleThreadExecutor()) //线程池 .pollTimeout(Duration.ZERO) //阻塞式轮询 .targetType(Rights.class) //目标类型(消息内容的类型) .build(); // 创建一个消息监听容器 container = StreamMessageListenerContainer.create(redisConnectionFactory, options); // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP) .subscribe(stream -> { // 为Stream创建一个消费者,并绑定处理类 container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER), StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()), new StreamMessageListener()); container.start(); }); } @Override public void destroy() throws Exception { container.stop(); } // 查找Stream信息,如果不存在,则创建Stream private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) { // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。 return ops.info(stream).onErrorResume(err -> { logger.warn("query stream err:{}", err.getMessage()); // createGroup方法创建Stream return ops.createGroup(stream, group).flatMap(s -> ops.info(stream)); }); } // 消息处理对象 class StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> { public void onMessage(ObjectRecord<String, Rights> message) { // 处理消息 RecordId id = message.getId(); Rights rights = message.getValue(); logger.info("receive id:{},rights:{}", id, rights); redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> { logger.info("add rights:{}", l); }); } } }
下面看一下如何发送信息
public Mono<RecordId> addRights(Rights r) { String streamKey = "stream:user:rights";//stream key ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r); Mono<RecordId> mono = redisTemplate.opsForStream().add(record); return mono; }
创建一个消息记录对象ObjectRecord,并通过ReactiveStreamOperations发送信息记录。
ReactiveRedisTemplate也支持Redis Sentinel、Cluster集群模式,只需要调整配置即可。
Sentinel配置如下
spring.redis.sentinel.master=mymaster spring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379 spring.redis.sentinel.password=
spring.redis.sentinel.nodes
配置的是Sentinel节点IP地址和端口,不是Redis实例节点IP地址和端口。
Cluster配置如下
spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379 spring.redis.lettuce.cluster.refresh.period=10000 spring.redis.lettuce.cluster.refresh.adaptive=true
如Redis Cluster中node2是node1的从节点,Lettuce中会缓存该信息,当node1宕机后,Redis Cluster会将node2升级为主节点。但Lettuce不会自动将请求切换到node2,因为它的缓冲没有刷新。
开启spring.redis.lettuce.cluster.refresh.adaptive
ReactiveGeoOperations#radius 메소드는 컬렉션에서 지리적 위치가 지정된 범위 내에 있는 요소를 찾을 수 있습니다. 또한 컬렉션에 요소를 추가하고 컬렉션에 있는 두 요소 사이의 지리적 거리를 계산하는 등의 작업도 지원합니다.
spring.redis.sentinel.nodes
는 Redis 인스턴스 노드 IP 주소 및 포트가 아닌 Sentinel 노드 IP 주소 및 포트를 구성합니다. 🎜🎜클러스터 구성은 다음과 같습니다🎜rrreee🎜예를 들어 Redis Cluster의 node2는 node1의 슬레이브 노드이고, Lettuce는 node1이 다운되면 node2를 마스터 노드로 업그레이드합니다. 그러나 Lettuce는 버퍼가 플러시되지 않기 때문에 요청을 node2로 자동 전환하지 않습니다. 🎜spring.redis.lettuce.cluster.refresh.adaptive
구성을 활성화하세요. Lettuce는 정기적으로 Redis 클러스터 클러스터 캐시 정보를 새로 고치고, 클라이언트의 노드 상태를 동적으로 변경하고, 장애 조치를 완료할 수 있습니다. 🎜🎜현재 ReactiveRedisTemplate에서 파이프라인과 트랜잭션을 구현하는 솔루션은 없습니다. 🎜위 내용은 Spring에서 반응형 Redis 상호작용을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!