Home  >  Article  >  Database  >  How to implement responsive Redis interaction in Spring

How to implement responsive Redis interaction in Spring

PHPz
PHPzforward
2023-05-27 17:49:471180browse

This article will simulate a user service and use Redis as the data storage server.
Involves two java beans, users and rights

public class User {
    private long id;
    private String name;
    // 标签
    private String label;
    // 收货地址经度
    private Double deliveryAddressLon;
    // 收货地址维度
    private Double deliveryAddressLat;
    // 最新签到日
    private String lastSigninDay;
    // 积分
    private Integer score;
    // 权益
    private List<Rights> rights;
    ...
}

public class Rights {
    private Long id;
    private Long userId;
    private String name;
    ...
}

Startup

Introduce dependencies

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>

Add Redis configuration

spring.redis.host=192.168.56.102
spring.redis.port=6379
spring.redis.password=
spring.redis.timeout=5000

SpringBoot startup

@SpringBootApplication
public class UserServiceReactive {
    public static void main(String[] args) {
        new SpringApplicationBuilder(
                UserServiceReactive.class)
                .web(WebApplicationType.REACTIVE).run(args);
    }
}

After the application is started, Spring will automatically generate ReactiveRedisTemplate (its underlying framework is Lettuce).
ReactiveRedisTemplate is similar to RedisTemplate, but it provides an asynchronous, responsive Redis interaction method.
Let me emphasize again that reactive programming is asynchronous. ReactiveRedisTemplate will not block the thread after sending a Redis request, and the current thread can perform other tasks.
After the Redis response data is returned, ReactiveRedisTemplate then schedules the thread to process the response data.
Responsive programming can implement asynchronous calls and process asynchronous results in an elegant way, which is its greatest significance.

Serialization

The serialization used by ReactiveRedisTemplate by default is Jdk serialization. We can configure it as json serialization

@Bean
public RedisSerializationContext redisSerializationContext() {
    RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();
    builder.key(StringRedisSerializer.UTF_8);
    builder.value(RedisSerializer.json());
    builder.hashKey(StringRedisSerializer.UTF_8);
    builder.hashValue(StringRedisSerializer.UTF_8);

    return builder.build();
}

@Bean
public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
    RedisSerializationContext serializationContext = redisSerializationContext();
    ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext);
    return reactiveRedisTemplate;
}

The builder.hashValue method specifies the serialization of the Redis list value method, since the Redis list value in this article only stores strings, it is still set to StringRedisSerializer.UTF_8.

Basic data types

ReactiveRedisTemplate supports basic data types such as Redis strings, hashes, lists, sets, and ordered sets.
This article uses hashes to save user information and lists to save user rights. This article does not expand on the use of other basic data types.

public Mono<Boolean>  save(User user) {
    ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash();
    Mono<Boolean>  userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user));
    if(user.getRights() != null) {
        ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList();
        opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> {
            logger.info("add rights:{}", l);
        });
    }
    return userRs;
}

The beanToMap method is responsible for converting the User class into a map.

HyperLogLog

The Redis HyperLogLog structure can count the number of different elements in a collection.
Use HyperLogLog to count the number of users logging in every day

public Mono<Long>  login(User user) {
    ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog();
    return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId());
}

BitMap

Redis BitMap (bitmap) uses a Bit to represent the value or status corresponding to an element. Since Bit is the smallest unit of computer storage, using it for storage will save space.
Use BitMap to record whether the user has checked in this week

public void addSignInFlag(long userId) {
    String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16);
    redisTemplate.opsForValue().setBit(
            key, userId & 0xffff , true)
    .subscribe(b -> logger.info("set:{},result:{}", key, b));
}

The high 48 bits of userId are used to divide users into different keys, and the low 16 bits are used as the bitmap offset parameter offset.
The offset parameter must be greater than or equal to 0 and less than 2^32 (bit mapping is limited to 512 MB).

Geo

Redis Geo can store geographical location information and calculate geographical location.
If you are looking for warehouse information within a given range

public Flux getWarehouseInDist(User u, double dist) {
    ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo();
    Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist);
    RedisGeoCommands.GeoRadiusCommandArgs args =
            RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending();
    return geo.radius("warehouse:address", circle, args);
}

warehouse:addressYou need to save the warehouse geographical location information in this collection first.
ReactiveGeoOperations#radius method can find elements in the collection whose geographical location is within a given range. It also supports operations such as adding elements to the collection and calculating the geographical distance between two elements in the collection.

Lua

ReactiveRedisTemplate can also execute Lua scripts.
The user check-in logic is completed through the Lua script below: if the user has not checked in today, the check-in is allowed and the points are increased by 1. If the user has signed in today, the operation is rejected.

public Flux<String> addScore(long userId) {
    DefaultRedisScript<String> script = new DefaultRedisScript<>();
    script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua")));
    List<String> keys = new ArrayList<>();
    keys.add(String.valueOf(userId));
    keys.add(LocalDateTime.now().toString().substring(0, 10));
    return redisTemplate.execute(script, keys);
}

signin.lua content is as follows

local score=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;)
local day=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;lastSigninDay&#39;)
if(day==KEYS[2])
    then
    return &#39;0&#39;
else
    redis.call(&#39;hset&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;, score+1,&#39;lastSigninDay&#39;,KEYS[2])
    return &#39;1&#39;
end

Stream

Redis Stream is a newly added data type in Redis 5.0 version. This type can implement message queues, provide message persistence and primary-standby replication functions, and can remember the access location of each client and ensure that messages are not lost.

Redis draws on the design of kafka. Multiple consumer groups can exist in a Stream, and multiple consumers can exist in a consumer group.
If a consumer in a consumer group consumes a message in the Stream, the message will not be consumed by other consumers in the consumer group. Of course, it can also be consumed by a consumer in other consumer groups. arrive.

Define a Stream consumer below, responsible for processing the received equity data

@Component
public class RightsStreamConsumer implements ApplicationRunner, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class);

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container;
    // Stream队列
    private static final String STREAM_KEY = "stream:user:rights";
    // 消费组
    private static final String STREAM_GROUP = "user-service";
    // 消费者
    private static final String STREAM_CONSUMER = "consumer-1";

    @Autowired
    @Qualifier("reactiveRedisTemplate")
    private ReactiveRedisTemplate redisTemplate;

    public void run(ApplicationArguments args) throws Exception {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(100) //一批次拉取的最大count数
                        .executor(Executors.newSingleThreadExecutor())  //线程池
                        .pollTimeout(Duration.ZERO) //阻塞式轮询
                        .targetType(Rights.class) //目标类型(消息内容的类型)
                        .build();
        // 创建一个消息监听容器
        container = StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream
        prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP)
                .subscribe(stream -> {
            // 为Stream创建一个消费者,并绑定处理类
            container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
                    StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
                    new StreamMessageListener());
            container.start();
        });
    }

    @Override
    public void destroy() throws Exception {
        container.stop();
    }

    // 查找Stream信息,如果不存在,则创建Stream
    private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) {
        // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。
        return ops.info(stream).onErrorResume(err -> {
            logger.warn("query stream err:{}", err.getMessage());
            // createGroup方法创建Stream
            return ops.createGroup(stream, group).flatMap(s -> ops.info(stream));
        });
    }

    // 消息处理对象
    class  StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> {
        public void onMessage(ObjectRecord<String, Rights> message) {
            // 处理消息
            RecordId id = message.getId();
            Rights rights = message.getValue();
            logger.info("receive id:{},rights:{}", id, rights);
            redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> {
                logger.info("add rights:{}", l);
            });
        }
    }
}

Let’s take a look at how to send information

public Mono<RecordId> addRights(Rights r) {
    String streamKey = "stream:user:rights";//stream key
    ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r);
    Mono<RecordId> mono = redisTemplate.opsForStream().add(record);
    return mono;
}

Create a message record object ObjectRecord, and pass ReactiveStreamOperations Send message record.

Sentinel, Cluster

ReactiveRedisTemplate also supports Redis Sentinel and Cluster cluster modes. You only need to adjust the configuration.
Sentinel configuration is as follows

spring.redis.sentinel.master=mymaster
spring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379
spring.redis.sentinel.password=

spring.redis.sentinel.nodesThe configuration is the Sentinel node IP address and port, not the Redis instance node IP address and port.

Cluster configuration is as follows

spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379
spring.redis.lettuce.cluster.refresh.period=10000
spring.redis.lettuce.cluster.refresh.adaptive=true

For example, node2 in Redis Cluster is the slave node of node1, and Lettuce will cache this information. When node1 goes down, Redis Cluster will upgrade node2 to the master node. But Lettuce will not automatically switch the request to node2 because its buffer is not flushed.
Openspring.redis.lettuce.cluster.refresh.adaptiveConfiguration, Lettuce can regularly refresh the Redis Cluster cluster cache information, dynamically change the client's node status, and complete failover.

There is currently no solution for ReactiveRedisTemplate to implement pipeline and transactions.

The above is the detailed content of How to implement responsive Redis interaction in Spring. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete