首頁  >  文章  >  資料庫  >  Spring中怎麼實現響應式Redis交互

Spring中怎麼實現響應式Redis交互

PHPz
PHPz轉載
2023-05-27 17:49:471194瀏覽

本文將模擬一個使用者服務,並使用Redis作為資料儲存伺服器。
涉及兩個java bean,使用者與權益

public class User {
    private long id;
    private String name;
    // 标签
    private String label;
    // 收货地址经度
    private Double deliveryAddressLon;
    // 收货地址维度
    private Double deliveryAddressLat;
    // 最新签到日
    private String lastSigninDay;
    // 积分
    private Integer score;
    // 权益
    private List<Rights> rights;
    ...
}

public class Rights {
    private Long id;
    private Long userId;
    private String name;
    ...
}

啟動

引入依賴

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>

新增Redis配置

spring.redis.host=192.168.56.102
spring.redis.port=6379
spring.redis.password=
spring.redis.timeout=5000

SpringBoot啟動

@SpringBootApplication
public class UserServiceReactive {
    public static void main(String[] args) {
        new SpringApplicationBuilder(
                UserServiceReactive.class)
                .web(WebApplicationType.REACTIVE).run(args);
    }
}

應用程式啟動後,Spring會自動產生ReactiveRedisTemplate(它的底層框架是Lettuce)。
ReactiveRedisTemplate與RedisTemplate使用類似,但它提供的是非同步的,響應式Redis互動方式。
這裡再強調一下,響應式程式設計是異步的,ReactiveRedisTemplate發送Redis請求後不會阻塞線程,當前線程可以去執行其他任務。
等到Redis回應資料回傳後,ReactiveRedisTemplate再調度執行緒處理回應資料。
響應式程式設計可以透過優雅的方式實現非同步呼叫以及處理非同步結果,正是它的最大的意義。

序列化

ReactiveRedisTemplate預設使用的序列化是Jdk序列化,我們可以配置為json序列化

@Bean
public RedisSerializationContext redisSerializationContext() {
    RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();
    builder.key(StringRedisSerializer.UTF_8);
    builder.value(RedisSerializer.json());
    builder.hashKey(StringRedisSerializer.UTF_8);
    builder.hashValue(StringRedisSerializer.UTF_8);

    return builder.build();
}

@Bean
public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
    RedisSerializationContext serializationContext = redisSerializationContext();
    ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext);
    return reactiveRedisTemplate;
}

builder.hashValue方法指定Redis列表值的序列化方式,由於本文Redis列表值只存放字串,所以還是設定為StringRedisSerializer.UTF_8。

基本資料類型

ReactiveRedisTemplate支援Redis字串,雜湊,列表,集合,有序集合等基本的資料類型。
本文使用散列保存用戶信息,列表保存用戶權益,其他基本數據類型的使用本文不展開。

public Mono<Boolean>  save(User user) {
    ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash();
    Mono<Boolean>  userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user));
    if(user.getRights() != null) {
        ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList();
        opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> {
            logger.info("add rights:{}", l);
        });
    }
    return userRs;
}

beanToMap方法負責將User類別轉換為map。

HyperLogLog

Redis HyperLogLog結構可以統計一個集合內不同元素的數量。
使用HyperLogLog統計每天登入的使用者量

public Mono<Long>  login(User user) {
    ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog();
    return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId());
}

BitMap

Redis BitMap(點陣圖)透過一個Bit位元表示某個元素對應的值或狀態。由於Bit是電腦儲存中最小的單位,使用它進行儲存將非常節省空間。
使用BitMap記錄使用者本周是否有簽到

public void addSignInFlag(long userId) {
    String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16);
    redisTemplate.opsForValue().setBit(
            key, userId & 0xffff , true)
    .subscribe(b -> logger.info("set:{},result:{}", key, b));
}

userId高48位元用於將使用者分割到不同的key,低16位元作為位圖偏移參數offset。
offset參數必須大於或等於0,小於2^32(bit 映射被限制在 512 MB 之內)。

Geo

Redis Geo可以儲存地理位置信息,並對地理位置進行計算。
如尋找給定範圍內的倉庫資訊

public Flux getWarehouseInDist(User u, double dist) {
    ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo();
    Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist);
    RedisGeoCommands.GeoRadiusCommandArgs args =
            RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending();
    return geo.radius("warehouse:address", circle, args);
}

warehouse:address這個集合中需要先保存好倉庫地理位置資訊。
ReactiveGeoOperations#radius方法可以尋找集合中地理位置在給定範圍內的元素,它中也支援新增元素到集合,計算集合中兩個元素地理位置距離等操作。

Lua

ReactiveRedisTemplate也可以執行Lua腳本。
下面透過Lua腳本完成使用者簽到邏輯:如果使用者今天未簽到,允許簽到,積分加1,如果使用者今天已簽到,則拒接操作。

public Flux<String> addScore(long userId) {
    DefaultRedisScript<String> script = new DefaultRedisScript<>();
    script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua")));
    List<String> keys = new ArrayList<>();
    keys.add(String.valueOf(userId));
    keys.add(LocalDateTime.now().toString().substring(0, 10));
    return redisTemplate.execute(script, keys);
}

signin.lua內容如下

local score=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;)
local day=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;lastSigninDay&#39;)
if(day==KEYS[2])
    then
    return &#39;0&#39;
else
    redis.call(&#39;hset&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;, score+1,&#39;lastSigninDay&#39;,KEYS[2])
    return &#39;1&#39;
end

Stream

Redis Stream 是 Redis 5.0 版本新增加的資料類型。此類型可以實現訊息佇列,並提供訊息的持久化和主備複製功能,並且可以記住每個客戶端的存取位置,還能保證訊息不會遺失。

Redis借鏡了kafka的設計,一個Stream內可以存在多個消費組,一個消費組內可以存在多個消費者。
如果一個消費組內某個消費者消費了Stream中某條訊息,則這消息不會被該消費組其他消費者消費到,當然,它還可以被其他消費組中某個消費者消費到。

下面定義一個Stream消費者,負責處理接收到的權益資料

@Component
public class RightsStreamConsumer implements ApplicationRunner, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class);

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container;
    // Stream队列
    private static final String STREAM_KEY = "stream:user:rights";
    // 消费组
    private static final String STREAM_GROUP = "user-service";
    // 消费者
    private static final String STREAM_CONSUMER = "consumer-1";

    @Autowired
    @Qualifier("reactiveRedisTemplate")
    private ReactiveRedisTemplate redisTemplate;

    public void run(ApplicationArguments args) throws Exception {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(100) //一批次拉取的最大count数
                        .executor(Executors.newSingleThreadExecutor())  //线程池
                        .pollTimeout(Duration.ZERO) //阻塞式轮询
                        .targetType(Rights.class) //目标类型(消息内容的类型)
                        .build();
        // 创建一个消息监听容器
        container = StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream
        prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP)
                .subscribe(stream -> {
            // 为Stream创建一个消费者,并绑定处理类
            container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
                    StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
                    new StreamMessageListener());
            container.start();
        });
    }

    @Override
    public void destroy() throws Exception {
        container.stop();
    }

    // 查找Stream信息,如果不存在,则创建Stream
    private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) {
        // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。
        return ops.info(stream).onErrorResume(err -> {
            logger.warn("query stream err:{}", err.getMessage());
            // createGroup方法创建Stream
            return ops.createGroup(stream, group).flatMap(s -> ops.info(stream));
        });
    }

    // 消息处理对象
    class  StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> {
        public void onMessage(ObjectRecord<String, Rights> message) {
            // 处理消息
            RecordId id = message.getId();
            Rights rights = message.getValue();
            logger.info("receive id:{},rights:{}", id, rights);
            redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> {
                logger.info("add rights:{}", l);
            });
        }
    }
}

下面看一下如何傳送訊息

public Mono<RecordId> addRights(Rights r) {
    String streamKey = "stream:user:rights";//stream key
    ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r);
    Mono<RecordId> mono = redisTemplate.opsForStream().add(record);
    return mono;
}

建立一個訊息​​記錄物件ObjectRecord,並透過ReactiveStreamOperations發送訊息記錄。

Sentinel、Cluster

ReactiveRedisTemplate也支援Redis Sentinel、Cluster叢集模式,只需要調整配置即可。
Sentinel配置如下

spring.redis.sentinel.master=mymaster
spring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379
spring.redis.sentinel.password=

spring.redis.sentinel.nodes配置的是Sentinel節點IP位址和端口,不是Redis實例節點IP位址和連接埠。

Cluster配置如下

spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379
spring.redis.lettuce.cluster.refresh.period=10000
spring.redis.lettuce.cluster.refresh.adaptive=true

如Redis Cluster中node2是node1的從節點,Lettuce中會快取該訊息,當node1宕機後,Redis Cluster會將node2升級為主節點。但Lettuce不會自動將請求切換到node2,因為它的緩衝沒有刷新。
開啟spring.redis.lettuce.cluster.refresh.adaptive配置,Lettuce可以定時刷新Redis Cluster叢集快取訊息,動態改變客戶端的節點情況,完成故障轉移。

暫時未發現ReactiveRedisTemplate實作pipeline,交易的方案。

以上是Spring中怎麼實現響應式Redis交互的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除