Heim  >  Artikel  >  Datenbank  >  Wie Redis die Verzögerungswarteschlange implementiert

Wie Redis die Verzögerungswarteschlange implementiert

WBOY
WBOYnach vorne
2023-05-26 20:39:281558Durchsuche

    Verwenden Sie die

    Abhängigkeitskonfiguration + Redisson automatische Konfiguration [Am einfachsten]

    Zweitens: Verwenden Sie eine separate Redisson-Konfigurationsdatei

      Drittens: Verwenden Sie spring.redis.redisson, um unter dem Konfigurationsschlüssel zu konfigurieren
    • Eine detaillierte Integration finden Sie unter Springboot-Integration Redisson-Konfiguration
    • <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
          <parent>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-parent</artifactId>
              <version>2.3.12.RELEASE</version>
              <relativePath/> <!-- lookup parent from repository -->
          </parent>
          <groupId>com.homeey</groupId>
          <artifactId>redis-delay-queue</artifactId>
          <version>0.0.1-SNAPSHOT</version>
          <name>redis-delay-queue</name>
          <description>redis-delay-queue</description>
          <properties>
              <java.version>1.8</java.version>
          </properties>
          <dependencies>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-data-redis</artifactId>
              </dependency>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-web</artifactId>
              </dependency>
              <!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
              <dependency>
                  <groupId>org.redisson</groupId>
                  <artifactId>redisson-spring-boot-starter</artifactId>
                  <version>3.19.3</version>
              </dependency>
              <dependency>
                  <groupId>org.redisson</groupId>
                  <artifactId>redisson-spring-data-23</artifactId>
                  <version>3.19.3</version>
              </dependency>
          </dependencies>
      
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-maven-plugin</artifactId>
                      <configuration>
                          <excludes>
                              <exclude>
                                  <groupId>org.projectlombok</groupId>
                                  <artifactId>lombok</artifactId>
                              </exclude>
                          </excludes>
                      </configuration>
                  </plugin>
              </plugins>
          </build>
      </project>

      Demo-Code

      spring:
        redis:
          database: 0
          host: localhost
          port: 6379
          timeout: 10000
          lettuce:
            pool:
              max-active: 8
              max-wait: -1
              min-idle: 0
              max-idle: 8
    • Ausführungseffekt
    Prinzipanalyse

    Aus der RedissonDelayedQueue-Implementierung sehen wir, dass es vier Rollen gibt

    Wie Redis die Verzögerungswarteschlange implementiert

    Wie Redis die Verzögerungswarteschlange implementiertredisson_delay_queue_timeout:xxx, sortierter Satzdatentyp, Alle verzögerten Aufgaben werden nach dem Ablaufzeitstempel der verzögerten Aufgaben gespeichert und sortiert (Zeitstempel beim Senden der Aufgabe + Verzögerungszeit), sodass das erste Element am Anfang der Liste die früheste auszuführende Aufgabe in der gesamten verzögerten Warteschlange ist. Dieses Konzept ist Sehr wichtig

    redisson_delay_queue:xxx, Listendatentyp, ich habe vorerst keine Verwendung gefunden, es wird nur hier geschrieben, wenn die Aufgabe gesendet wird, und die darin enthaltenen Elemente werden gelöscht, wenn Die Warteschlange wird übertragenRedissonDelayedQueue实现中我们看到有四个角色

    Wie Redis die Verzögerungswarteschlange implementiert

    • redisson_delay_queue_timeout:xxx,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 + 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务,这个概念很重要

    • redisson_delay_queue:xxx,list数据类型,暂时没发现什么用,只是在提交任务时会写入这里面,队列转移时又会删除里面的元素

    • xxx:list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的

    • redisson_delay_queue_channel:xxx,是一个channel,用来通知客户端开启一个延迟任务

    队列创建

    RedissonDelayedQueue延迟队列创建时,指定了队列转移服务,以及实现延迟队列的四个重要校色的key。核心代码是指定队列转移任务

    package com.homeey.redisdelayqueue.delay;
    
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.redisson.api.RBlockingQueue;
    import org.redisson.api.RDelayedQueue;
    import org.redisson.api.RedissonClient;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 明天的你会因今天到的努力而幸运
     *
     * @author jt4mrg@qq.com
     * 23:11 2023-02-19 2023
     **/
    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class RedissonDelayQueue {
    
        private final RDelayedQueue<String> delayedQueue;
        private final RBlockingQueue<String> blockingQueue;
    
    
        @PostConstruct
        public void init() {
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            executorService.submit(() -> {
                while (true) {
                    try {
                        String task = blockingQueue.take();
                        log.info("rev delay task:{}", task);
                    } catch (Exception e) {
                        log.error("occur error", e);
                    }
                }
            });
        }
    
        public void offerTask(String task, long seconds) {
            log.info("add delay task:{},delay time:{}s", task, seconds);
            delayedQueue.offer(task, seconds, TimeUnit.SECONDS);
        }
    
    
        @Configuration
        static class RedissonDelayQueueConfigure {
    
            @Bean
            public RBlockingQueue<String> blockingQueue(RedissonClient redissonClient) {
                return redissonClient.getBlockingQueue("TOKEN-RENEWAL");
            }
    
            @Bean
            public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockingQueue,
                                                      RedissonClient redissonClient) {
                return redissonClient.getDelayedQueue(blockingQueue);
            }
        }
    }

    生产者

    Wie Redis die Verzögerungswarteschlange implementiert

    核心代码RedissonDelayedQueue#offerAsync

    xxx: Listendatentyp, genannt Zielwarteschlange. Die darin gespeicherten Aufgaben sind alle Aufgaben, die die Verzögerungszeit erreicht haben und von Verbrauchern abgerufen werden können, also die RBlockingQueue In der obigen Demo ruft die Take-Methode die Aufgabe aus dieser Zielwarteschlange ab

    🎜🎜redisson_delay_queue_channel:xxx, einem Kanal, der verwendet wird, um den Client zu benachrichtigen, eine verzögerte Aufgabe zu starten🎜🎜🎜🎜Warteschlangenerstellung🎜 🎜RedissonDelayedQueueBeim Erstellen der verzögerten Warteschlange werden der Warteschlangenübertragungsdienst und die vier wichtigen Farbkorrekturschlüssel für die Implementierung der verzögerten Warteschlange angegeben. Der Kerncode besteht darin, die Warteschlangenübertragungsaufgabe anzugeben implementiert Verzögerungswarteschlange" />🎜🎜Core CodeRedissonDelayedQueue#offerAsync🎜
     QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
                
                @Override
                protected RFuture<Long> pushTaskAsync() {
                    return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                            "local expiredValues = redis.call(&#39;zrangebyscore&#39;, KEYS[2], 0, ARGV[1], &#39;limit&#39;, 0, ARGV[2]); "//拿到zset中过期的值列表
                          + "if #expiredValues > 0 then " //如果有
                              + "for i, v in ipairs(expiredValues) do "
                                  + "local randomId, value = struct.unpack(&#39;dLc0&#39;, v);"//解构消息,在提交任务时打包的消息
                                  + "redis.call(&#39;rpush&#39;, KEYS[1], value);" //放入无前缀的list 队头
                                  + "redis.call(&#39;lrem&#39;, KEYS[3], 1, v);"//移除带前缀list 队尾元素
                              + "end; "
                              + "redis.call(&#39;zrem&#39;, KEYS[2], unpack(expiredValues));" //移除zset中本次读取的过期元素
                          + "end; "
                            // get startTime from scheduler queue head task
                          + "local v = redis.call(&#39;zrange&#39;, KEYS[2], 0, 0, &#39;WITHSCORES&#39;); "//取zset最小分值的元素
                          + "if v[1] ~= nil then "
                             + "return v[2]; " //返回分值,即过期时间
                          + "end "
                          + "return nil;",
                          Arrays.asList(getRawName(), timeoutSetName, queueName),
                          System.currentTimeMillis(), 100);
                }
                
                @Override
                protected RTopic getTopic() {
                    return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
                }
            };
    🎜Consumer🎜🎜Consumer ist das einfachste, einfach BLPOP direkt aus der Liste ohne Präfix lesen🎜

    Das obige ist der detaillierte Inhalt vonWie Redis die Verzögerungswarteschlange implementiert. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Stellungnahme:
    Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen