Heim  >  Artikel  >  Datenbank  >  So implementieren Sie eine reaktionsfähige Redis-Interaktion im Frühjahr

So implementieren Sie eine reaktionsfähige Redis-Interaktion im Frühjahr

PHPz
PHPznach vorne
2023-05-27 17:49:471155Durchsuche

In diesem Artikel wird ein Benutzerdienst simuliert und Redis als Datenspeicherserver verwendet.
Beinhaltet zwei Java Beans, Benutzer und Rechte

public class User {
    private long id;
    private String name;
    // 标签
    private String label;
    // 收货地址经度
    private Double deliveryAddressLon;
    // 收货地址维度
    private Double deliveryAddressLat;
    // 最新签到日
    private String lastSigninDay;
    // 积分
    private Integer score;
    // 权益
    private List<Rights> rights;
    ...
}

public class Rights {
    private Long id;
    private Long userId;
    private String name;
    ...
}

Startup

Abhängigkeiten einführen

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>

Redis-Konfiguration hinzufügen

spring.redis.host=192.168.56.102
spring.redis.port=6379
spring.redis.password=
spring.redis.timeout=5000

SpringBoot-Startup

@SpringBootApplication
public class UserServiceReactive {
    public static void main(String[] args) {
        new SpringApplicationBuilder(
                UserServiceReactive.class)
                .web(WebApplicationType.REACTIVE).run(args);
    }
}

Nach dem Start der Anwendung generiert Spring automatisch ReactiveRedisTemplate (das zugrunde liegende Framework ist Lettuce).
ReactiveRedisTemplate ähnelt RedisTemplate, bietet jedoch eine asynchrone, reaktionsfähige Redis-Interaktionsmethode.
Ich möchte noch einmal betonen, dass reaktive Programmierung asynchron ist und den Thread nach dem Senden einer Redis-Anfrage nicht blockiert und der aktuelle Thread andere Aufgaben ausführen kann.
Nachdem die Redis-Antwortdaten zurückgegeben wurden, plant ReactiveRedisTemplate den Thread zur Verarbeitung der Antwortdaten.
Reaktive Programmierung kann asynchrone Aufrufe implementieren und asynchrone Ergebnisse auf elegante Weise verarbeiten, was ihre größte Bedeutung darstellt.

Serialisierung

Die von ReactiveRedisTemplate verwendete Standardserialisierung ist die JDK-Serialisierung.

@Bean
public RedisSerializationContext redisSerializationContext() {
    RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();
    builder.key(StringRedisSerializer.UTF_8);
    builder.value(RedisSerializer.json());
    builder.hashKey(StringRedisSerializer.UTF_8);
    builder.hashValue(StringRedisSerializer.UTF_8);

    return builder.build();
}

@Bean
public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
    RedisSerializationContext serializationContext = redisSerializationContext();
    ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext);
    return reactiveRedisTemplate;
}

Die Methode builder.hashValue gibt die Serialisierungsmethode des Redis-Listenwerts an strings, daher ist es immer noch auf StringRedisSerializer.UTF_8 gesetzt.

Grundlegende Datentypen

ReactiveRedisTemplate unterstützt grundlegende Datentypen wie Redis-Strings, Hashes, Listen, Mengen, geordnete Mengen usw.
Dieser Artikel verwendet Hashes zum Speichern von Benutzerinformationen und Listen zum Speichern von Benutzerrechten. Dieser Artikel geht nicht auf die Verwendung anderer grundlegender Datentypen ein. Die

public Mono<Boolean>  save(User user) {
    ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash();
    Mono<Boolean>  userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user));
    if(user.getRights() != null) {
        ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList();
        opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> {
            logger.info("add rights:{}", l);
        });
    }
    return userRs;
}

beanToMap-Methode ist für die Konvertierung der Benutzerklasse in eine Karte verantwortlich.

HyperLogLog

Die Redis HyperLogLog-Struktur kann die Anzahl verschiedener Elemente in einer Sammlung zählen.
Verwenden Sie HyperLogLog, um die Anzahl der Benutzer zu zählen, die sich täglich anmelden.

public Mono<Long>  login(User user) {
    ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog();
    return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId());
}

BitMap

Redis BitMap (Bitmap) verwendet ein Bit, um den Wert oder Status eines Elements darzustellen. Da Bit die kleinste Computerspeichereinheit ist, spart die Verwendung als Speicher Platz.
Verwenden Sie BitMap, um aufzuzeichnen, ob der Benutzer diese Woche eingecheckt hat

public void addSignInFlag(long userId) {
    String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16);
    redisTemplate.opsForValue().setBit(
            key, userId & 0xffff , true)
    .subscribe(b -> logger.info("set:{},result:{}", key, b));
}

Die oberen 48 Bits der Benutzer-ID werden verwendet, um Benutzer in verschiedene Schlüssel zu unterteilen, und die niedrigen 16 Bits werden als Bitmap-Offset-Parameter-Offset verwendet.
Der Offset-Parameter muss größer oder gleich 0 und kleiner als 2^32 sein (Bit-Mapping ist auf 512 MB begrenzt).

Geo

Redis Geo kann geografische Standortinformationen speichern und den geografischen Standort berechnen.
Wenn Sie Lagerinformationen innerhalb eines bestimmten Bereichs finden möchten

public Flux getWarehouseInDist(User u, double dist) {
    ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo();
    Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist);
    RedisGeoCommands.GeoRadiusCommandArgs args =
            RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending();
    return geo.radius("warehouse:address", circle, args);
}

warehouse:addressIn dieser Sammlung müssen Sie zuerst die Informationen zum geografischen Standort des Lagers speichern. warehouse:address这个集合中需要先保存好仓库地理位置信息。
ReactiveGeoOperations#radius方法可以查找集合中地理位置在给定范围内的元素,它中还支持添加元素到集合,计算集合中两个元素地理位置距离等操作。

Lua

ReactiveRedisTemplate也可以执行Lua脚本。
下面通过Lua脚本完成用户签到逻辑:如果用户今天未签到,允许签到,积分加1,如果用户今天已签到,则拒接操作。

public Flux<String> addScore(long userId) {
    DefaultRedisScript<String> script = new DefaultRedisScript<>();
    script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua")));
    List<String> keys = new ArrayList<>();
    keys.add(String.valueOf(userId));
    keys.add(LocalDateTime.now().toString().substring(0, 10));
    return redisTemplate.execute(script, keys);
}

signin.lua内容如下

local score=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;)
local day=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;lastSigninDay&#39;)
if(day==KEYS[2])
    then
    return &#39;0&#39;
else
    redis.call(&#39;hset&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;, score+1,&#39;lastSigninDay&#39;,KEYS[2])
    return &#39;1&#39;
end

Stream

Redis Stream 是 Redis 5.0 版本新增加的数据类型。该类型可以实现消息队列,并提供消息的持久化和主备复制功能,并且可以记住每一个客户端的访问位置,还能保证消息不丢失。

Redis借鉴了kafka的设计,一个Stream内可以存在多个消费组,一个消费组内可以存在多个消费者。
如果一个消费组内某个消费者消费了Stream中某条消息,则这消息不会被该消费组其他消费者消费到,当然,它还可以被其他消费组中某个消费者消费到。

下面定义一个Stream消费者,负责处理接收到的权益数据

@Component
public class RightsStreamConsumer implements ApplicationRunner, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class);

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container;
    // Stream队列
    private static final String STREAM_KEY = "stream:user:rights";
    // 消费组
    private static final String STREAM_GROUP = "user-service";
    // 消费者
    private static final String STREAM_CONSUMER = "consumer-1";

    @Autowired
    @Qualifier("reactiveRedisTemplate")
    private ReactiveRedisTemplate redisTemplate;

    public void run(ApplicationArguments args) throws Exception {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(100) //一批次拉取的最大count数
                        .executor(Executors.newSingleThreadExecutor())  //线程池
                        .pollTimeout(Duration.ZERO) //阻塞式轮询
                        .targetType(Rights.class) //目标类型(消息内容的类型)
                        .build();
        // 创建一个消息监听容器
        container = StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream
        prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP)
                .subscribe(stream -> {
            // 为Stream创建一个消费者,并绑定处理类
            container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
                    StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
                    new StreamMessageListener());
            container.start();
        });
    }

    @Override
    public void destroy() throws Exception {
        container.stop();
    }

    // 查找Stream信息,如果不存在,则创建Stream
    private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) {
        // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。
        return ops.info(stream).onErrorResume(err -> {
            logger.warn("query stream err:{}", err.getMessage());
            // createGroup方法创建Stream
            return ops.createGroup(stream, group).flatMap(s -> ops.info(stream));
        });
    }

    // 消息处理对象
    class  StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> {
        public void onMessage(ObjectRecord<String, Rights> message) {
            // 处理消息
            RecordId id = message.getId();
            Rights rights = message.getValue();
            logger.info("receive id:{},rights:{}", id, rights);
            redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> {
                logger.info("add rights:{}", l);
            });
        }
    }
}

下面看一下如何发送信息

public Mono<RecordId> addRights(Rights r) {
    String streamKey = "stream:user:rights";//stream key
    ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r);
    Mono<RecordId> mono = redisTemplate.opsForStream().add(record);
    return mono;
}

创建一个消息记录对象ObjectRecord,并通过ReactiveStreamOperations发送信息记录。

Sentinel、Cluster

ReactiveRedisTemplate也支持Redis Sentinel、Cluster集群模式,只需要调整配置即可。
Sentinel配置如下

spring.redis.sentinel.master=mymaster
spring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379
spring.redis.sentinel.password=

spring.redis.sentinel.nodes配置的是Sentinel节点IP地址和端口,不是Redis实例节点IP地址和端口。

Cluster配置如下

spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379
spring.redis.lettuce.cluster.refresh.period=10000
spring.redis.lettuce.cluster.refresh.adaptive=true

如Redis Cluster中node2是node1的从节点,Lettuce中会缓存该信息,当node1宕机后,Redis Cluster会将node2升级为主节点。但Lettuce不会自动将请求切换到node2,因为它的缓冲没有刷新。
开启spring.redis.lettuce.cluster.refresh.adaptiveDie ReactiveGeoOperations#radius-Methode kann Elemente in der Sammlung finden, deren geografischer Standort innerhalb eines bestimmten Bereichs liegt. Sie unterstützt auch Vorgänge wie das Hinzufügen von Elementen zur Sammlung und die Berechnung der geografischen Entfernung zwischen zwei Elementen in der Sammlung.

Lua

ReactiveRedisTemplate kann auch Lua-Skripte ausführen. 🎜Die Benutzer-Check-in-Logik wird durch das folgende Lua-Skript vervollständigt: Wenn der Benutzer heute nicht eingecheckt hat, ist der Check-in zulässig und die Punkte werden um 1 erhöht. Wenn der Benutzer heute bereits eingecheckt hat, erfolgt der Vorgang abgelehnt. Der Inhalt von 🎜rrreee🎜signin.lua lautet wie folgt:🎜rrreee

Stream🎜🎜Redis Stream ist ein neu hinzugefügter Datentyp in der Redis 5.0-Version. Dieser Typ kann Nachrichtenwarteschlangen implementieren und Funktionen für Nachrichtenpersistenz und primäre Backup-Replikation bereitstellen. Er kann sich den Zugriffsort jedes Clients merken und sicherstellen, dass Nachrichten nicht verloren gehen. 🎜🎜Redis basiert auf dem Design von Kafka. In einem Stream können mehrere Verbrauchergruppen vorhanden sein, und in einer Verbrauchergruppe können mehrere Verbraucher vorhanden sein. 🎜Wenn ein Verbraucher in einer Verbrauchergruppe eine Nachricht im Stream konsumiert, wird die Nachricht nicht von anderen Verbrauchern in der Verbrauchergruppe konsumiert. Natürlich kann sie auch von einem Verbraucher in anderen Verbrauchergruppen konsumiert werden. 🎜🎜Im Folgenden wird ein Stream-Verbraucher definiert, der für die Verarbeitung der empfangenen Eigenkapitaldaten verantwortlich ist. 🎜rrreee🎜Sehen wir uns an, wie Informationen gesendet werden. 🎜

Sentinel, Cluster🎜🎜ReactiveRedisTemplate unterstützt auch den Redis Sentinel- und Cluster-Cluster-Modus, Sie müssen nur die Konfiguration anpassen. 🎜Sentinel-Konfiguration ist wie folgt🎜rrreee🎜spring.redis.sentinel.nodes konfiguriert die IP-Adresse und den Port des Sentinel-Knotens, nicht die IP-Adresse und den Port des Redis-Instanzknotens. 🎜🎜Cluster-Konfiguration ist wie folgt🎜rrreee🎜Zum Beispiel ist Knoten2 im Redis-Cluster der Slave-Knoten von Knoten1, und Lettuce speichert diese Informationen zwischen. Wenn Knoten1 ausfällt, aktualisiert Redis-Cluster Knoten2 zum Master-Knoten. Aber Lettuce leitet die Anfrage nicht automatisch an Knoten2 weiter, da sein Puffer nicht geleert wird. 🎜Aktivieren Sie die spring.redis.lettuce.cluster.refresh.adaptive-Konfiguration, um die Cache-Informationen des Redis-Clusters regelmäßig zu aktualisieren, den Knotenstatus des Clients dynamisch zu ändern und ein Failover durchzuführen. 🎜🎜Derzeit gibt es keine Lösung für ReactiveRedisTemplate zur Implementierung von Pipelines und Transaktionen. 🎜

Das obige ist der detaillierte Inhalt vonSo implementieren Sie eine reaktionsfähige Redis-Interaktion im Frühjahr. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen