>데이터 베이스 >Redis >Spring에서 반응형 Redis 상호작용을 구현하는 방법

Spring에서 반응형 Redis 상호작용을 구현하는 방법

PHPz
PHPz앞으로
2023-05-27 17:49:471234검색

이 기사에서는 사용자 서비스를 시뮬레이션하고 Redis를 데이터 스토리지 서버로 사용합니다.
두 개의 Java Bean, 사용자 및 권한이 포함됩니다

public class User {
    private long id;
    private String name;
    // 标签
    private String label;
    // 收货地址经度
    private Double deliveryAddressLon;
    // 收货地址维度
    private Double deliveryAddressLat;
    // 最新签到日
    private String lastSigninDay;
    // 积分
    private Integer score;
    // 权益
    private List<Rights> rights;
    ...
}

public class Rights {
    private Long id;
    private Long userId;
    private String name;
    ...
}

Startup

종속성 소개

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>

Redis 구성 추가

spring.redis.host=192.168.56.102
spring.redis.port=6379
spring.redis.password=
spring.redis.timeout=5000

SpringBoot 시작

@SpringBootApplication
public class UserServiceReactive {
    public static void main(String[] args) {
        new SpringApplicationBuilder(
                UserServiceReactive.class)
                .web(WebApplicationType.REACTIVE).run(args);
    }
}

애플리케이션이 시작된 후 Spring은 자동으로 ReactiveRedisTemplate을 생성합니다(기본 프레임워크는 Lettuce입니다).
ReactiveRedisTemplate은 RedisTemplate과 유사하지만 비동기식, 반응형 Redis 상호 작용 방법을 제공합니다.
Reactive 프로그래밍은 비동기식이라는 점을 다시 강조하고 싶습니다. ReactiveRedisTemplate은 Redis 요청을 보낸 후 스레드를 차단하지 않으며 현재 스레드는 다른 작업을 수행할 수 있습니다.
Redis 응답 데이터가 반환된 후 ReactiveRedisTemplate은 응답 데이터를 처리하도록 스레드를 예약합니다.
반응형 프로그래밍은 비동기 호출을 구현하고 비동기 결과를 우아한 방식으로 처리할 수 있다는 것이 가장 큰 의미입니다.

Serialization

ReactiveRedisTemplate에서 사용하는 기본 직렬화는 Jdk 직렬화입니다. 이 문서의 Redis 목록 값은 문자열만 저장하므로 이를 json 직렬화

@Bean
public RedisSerializationContext redisSerializationContext() {
    RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();
    builder.key(StringRedisSerializer.UTF_8);
    builder.value(RedisSerializer.json());
    builder.hashKey(StringRedisSerializer.UTF_8);
    builder.hashValue(StringRedisSerializer.UTF_8);

    return builder.build();
}

@Bean
public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
    RedisSerializationContext serializationContext = redisSerializationContext();
    ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext);
    return reactiveRedisTemplate;
}

builder.hashValue 메서드로 구성할 수 있습니다. , 따라서 여전히 StringRedisSerializer.UTF_8로 설정되어 있습니다.

기본 데이터 유형

ReactiveRedisTemplate은 Redis 문자열, 해시, 목록, 집합, 순서 집합 등과 같은 기본 데이터 유형을 지원합니다.
이 문서에서는 해시를 사용하여 사용자 정보를 저장하고 목록을 사용하여 사용자 권한을 저장합니다. 이 문서에서는 다른 기본 데이터 유형의 사용에 대해 확장하지 않습니다.

public Mono<Boolean>  save(User user) {
    ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash();
    Mono<Boolean>  userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user));
    if(user.getRights() != null) {
        ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList();
        opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> {
            logger.info("add rights:{}", l);
        });
    }
    return userRs;
}

beanToMap 메소드는 User 클래스를 맵으로 변환하는 역할을 담당합니다.

HyperLogLog

Redis HyperLogLog 구조는 컬렉션의 다양한 요소 수를 계산할 수 있습니다.
HyperLogLog를 사용하여 매일 로그인하는 사용자 수 계산

public Mono<Long>  login(User user) {
    ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog();
    return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId());
}

BitMap

Redis BitMap(비트맵)은 Bit를 사용하여 요소에 해당하는 값 또는 상태를 나타냅니다. 비트는 컴퓨터 저장 공간의 가장 작은 단위이므로 이를 저장 공간으로 활용하면 공간이 절약됩니다.
BitMap을 사용하여 이번 주에 사용자가 체크인했는지 기록합니다

public void addSignInFlag(long userId) {
    String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16);
    redisTemplate.opsForValue().setBit(
            key, userId & 0xffff , true)
    .subscribe(b -> logger.info("set:{},result:{}", key, b));
}

userId의 상위 48비트는 사용자를 여러 키로 나누는 데 사용되고 하위 16비트는 비트맵 오프셋 매개변수 오프셋으로 사용됩니다.
오프셋 매개변수는 0보다 크거나 같고 2^32보다 작아야 합니다(비트 매핑은 512MB로 제한됨).

Geo

Redis Geo는 지리적 위치 정보를 저장하고 지리적 위치를 계산할 수 있습니다.
특정 범위 내의 창고 정보를 찾으려면

public Flux getWarehouseInDist(User u, double dist) {
    ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo();
    Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist);
    RedisGeoCommands.GeoRadiusCommandArgs args =
            RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending();
    return geo.radius("warehouse:address", circle, args);
}

창고:주소이 컬렉션에서 먼저 창고 지리적 위치 정보를 저장해야 합니다. warehouse:address这个集合中需要先保存好仓库地理位置信息。
ReactiveGeoOperations#radius方法可以查找集合中地理位置在给定范围内的元素,它中还支持添加元素到集合,计算集合中两个元素地理位置距离等操作。

Lua

ReactiveRedisTemplate也可以执行Lua脚本。
下面通过Lua脚本完成用户签到逻辑:如果用户今天未签到,允许签到,积分加1,如果用户今天已签到,则拒接操作。

public Flux<String> addScore(long userId) {
    DefaultRedisScript<String> script = new DefaultRedisScript<>();
    script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua")));
    List<String> keys = new ArrayList<>();
    keys.add(String.valueOf(userId));
    keys.add(LocalDateTime.now().toString().substring(0, 10));
    return redisTemplate.execute(script, keys);
}

signin.lua内容如下

local score=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;)
local day=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;lastSigninDay&#39;)
if(day==KEYS[2])
    then
    return &#39;0&#39;
else
    redis.call(&#39;hset&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;, score+1,&#39;lastSigninDay&#39;,KEYS[2])
    return &#39;1&#39;
end

Stream

Redis Stream 是 Redis 5.0 版本新增加的数据类型。该类型可以实现消息队列,并提供消息的持久化和主备复制功能,并且可以记住每一个客户端的访问位置,还能保证消息不丢失。

Redis借鉴了kafka的设计,一个Stream内可以存在多个消费组,一个消费组内可以存在多个消费者。
如果一个消费组内某个消费者消费了Stream中某条消息,则这消息不会被该消费组其他消费者消费到,当然,它还可以被其他消费组中某个消费者消费到。

下面定义一个Stream消费者,负责处理接收到的权益数据

@Component
public class RightsStreamConsumer implements ApplicationRunner, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class);

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container;
    // Stream队列
    private static final String STREAM_KEY = "stream:user:rights";
    // 消费组
    private static final String STREAM_GROUP = "user-service";
    // 消费者
    private static final String STREAM_CONSUMER = "consumer-1";

    @Autowired
    @Qualifier("reactiveRedisTemplate")
    private ReactiveRedisTemplate redisTemplate;

    public void run(ApplicationArguments args) throws Exception {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(100) //一批次拉取的最大count数
                        .executor(Executors.newSingleThreadExecutor())  //线程池
                        .pollTimeout(Duration.ZERO) //阻塞式轮询
                        .targetType(Rights.class) //目标类型(消息内容的类型)
                        .build();
        // 创建一个消息监听容器
        container = StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream
        prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP)
                .subscribe(stream -> {
            // 为Stream创建一个消费者,并绑定处理类
            container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
                    StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
                    new StreamMessageListener());
            container.start();
        });
    }

    @Override
    public void destroy() throws Exception {
        container.stop();
    }

    // 查找Stream信息,如果不存在,则创建Stream
    private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) {
        // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。
        return ops.info(stream).onErrorResume(err -> {
            logger.warn("query stream err:{}", err.getMessage());
            // createGroup方法创建Stream
            return ops.createGroup(stream, group).flatMap(s -> ops.info(stream));
        });
    }

    // 消息处理对象
    class  StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> {
        public void onMessage(ObjectRecord<String, Rights> message) {
            // 处理消息
            RecordId id = message.getId();
            Rights rights = message.getValue();
            logger.info("receive id:{},rights:{}", id, rights);
            redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> {
                logger.info("add rights:{}", l);
            });
        }
    }
}

下面看一下如何发送信息

public Mono<RecordId> addRights(Rights r) {
    String streamKey = "stream:user:rights";//stream key
    ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r);
    Mono<RecordId> mono = redisTemplate.opsForStream().add(record);
    return mono;
}

创建一个消息记录对象ObjectRecord,并通过ReactiveStreamOperations发送信息记录。

Sentinel、Cluster

ReactiveRedisTemplate也支持Redis Sentinel、Cluster集群模式,只需要调整配置即可。
Sentinel配置如下

spring.redis.sentinel.master=mymaster
spring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379
spring.redis.sentinel.password=

spring.redis.sentinel.nodes配置的是Sentinel节点IP地址和端口,不是Redis实例节点IP地址和端口。

Cluster配置如下

spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379
spring.redis.lettuce.cluster.refresh.period=10000
spring.redis.lettuce.cluster.refresh.adaptive=true

如Redis Cluster中node2是node1的从节点,Lettuce中会缓存该信息,当node1宕机后,Redis Cluster会将node2升级为主节点。但Lettuce不会自动将请求切换到node2,因为它的缓冲没有刷新。
开启spring.redis.lettuce.cluster.refresh.adaptiveReactiveGeoOperations#radius 메소드는 컬렉션에서 지리적 위치가 지정된 범위 내에 있는 요소를 찾을 수 있습니다. 또한 컬렉션에 요소를 추가하고 컬렉션에 있는 두 요소 사이의 지리적 거리를 계산하는 등의 작업도 지원합니다.

Lua

ReactiveRedisTemplate은 Lua 스크립트도 실행할 수 있습니다. 🎜아래 Lua 스크립트를 통해 사용자 체크인 로직이 완성됩니다. 사용자가 오늘 체크인하지 않았다면 체크인이 허용되고 포인트가 1 증가합니다. 사용자가 오늘 이미 체크인했다면 작업은 다음과 같습니다. 거부되었습니다. 🎜rrreee🎜signin.lua 내용은 다음과 같습니다🎜rrreee

Stream🎜🎜Redis Stream은 Redis 5.0 버전에 새롭게 추가된 데이터 타입입니다. 이 유형은 메시지 대기열을 구현하고 메시지 지속성 및 기본-대기 복제 기능을 제공하며 각 클라이언트의 액세스 위치를 기억하고 메시지가 손실되지 않도록 보장할 수 있습니다. 🎜🎜Redis는 kafka의 디자인을 따릅니다. 하나의 스트림에 여러 소비자 그룹이 존재할 수 있고 하나의 소비자 그룹에 여러 소비자가 존재할 수 있습니다. 🎜Consumer 그룹의 Consumer가 Stream의 메시지를 소비하는 경우 해당 Consumer Group의 다른 Consumer는 해당 메시지를 소비하지 않습니다. 물론 다른 Consumer Group의 Consumer도 해당 메시지를 소비할 수 있습니다. 🎜🎜다음은 수신된 자산 데이터 처리를 담당하는 Stream 소비자를 정의합니다.🎜rrreee🎜정보를 보내는 방법을 살펴보겠습니다.🎜rrreee🎜메시지 레코드 객체 ObjectRecord를 생성하고 ReactiveStreamOperations를 통해 정보 레코드를 보냅니다. 🎜

Sentinel, Cluster🎜🎜ReactiveRedisTemplate은 Redis Sentinel 및 Cluster 클러스터 모드도 지원하므로 구성만 조정하면 됩니다. 🎜Sentinel 구성은 다음과 같습니다🎜rrreee🎜spring.redis.sentinel.nodes는 Redis 인스턴스 노드 IP 주소 및 포트가 아닌 Sentinel 노드 IP 주소 및 포트를 구성합니다. 🎜🎜클러스터 구성은 다음과 같습니다🎜rrreee🎜예를 들어 Redis Cluster의 node2는 node1의 슬레이브 노드이고, Lettuce는 node1이 다운되면 node2를 마스터 노드로 업그레이드합니다. 그러나 Lettuce는 버퍼가 플러시되지 않기 때문에 요청을 node2로 자동 전환하지 않습니다. 🎜spring.redis.lettuce.cluster.refresh.adaptive 구성을 활성화하세요. Lettuce는 정기적으로 Redis 클러스터 클러스터 캐시 정보를 새로 고치고, 클라이언트의 노드 상태를 동적으로 변경하고, 장애 조치를 완료할 수 있습니다. 🎜🎜현재 ReactiveRedisTemplate에서 파이프라인과 트랜잭션을 구현하는 솔루션은 없습니다. 🎜

위 내용은 Spring에서 반응형 Redis 상호작용을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제