Heim  >  Artikel  >  Datenbank  >  Was ist die Methode zur Implementierung einer Verzögerungswarteschlange in Redis?

Was ist die Methode zur Implementierung einer Verzögerungswarteschlange in Redis?

WBOY
WBOYnach vorne
2023-05-30 11:29:252324Durchsuche

1. Vorwort

1.1. Was ist eine Verzögerungswarteschlange? Der größte Unterschied zwischen einer Verzögerungswarteschlange und einer gewöhnlichen Warteschlange spiegelt sich in ihrem Verzögerungsattribut wider. Die Elemente einer gewöhnlichen Warteschlange werden zuerst verarbeitet die Reihenfolge, in der sie zur Warteschlange hinzugefügt werden, und den Elementen in der Verzögerungswarteschlange wird eine Verzögerungszeit zugewiesen, wenn sie zur Warteschlange hinzugefügt werden, was angibt, dass sie hoffen, nach der angegebenen Zeit verarbeitet zu werden. Die Struktur einer Verzögerungswarteschlange ähnelt eher einer zeitgewichteten geordneten Heap-Struktur als einer herkömmlichen Warteschlange.

1.2. Anwendungsszenarien

In einigen Geschäftsszenarien stoßen wir häufig auf Funktionen, die zu einem bestimmten Zeitpunkt oder nach einem bestimmten Zeitraum ausgeführt werden müssen. Zum Beispiel die folgenden Szenarien:

Erstellen Sie eine neue Bestellung, wenn die Zahlung nicht innerhalb der angegebenen Zeit erfolgt, muss die Mitnahme automatisch storniert werden oder das Taxi wird zehn Minuten vor der voraussichtlichen Ankunftszeit daran erinnert, oder Fahrer, dass die Zeitüberschreitung überschritten wird, bestätigt der Benutzer den Empfang nicht innerhalb der angegebenen Zeit. Die geplante Besprechung wird Sie so bald wie möglich daran erinnern, an der Besprechung teilzunehmen Das Meeting beginnt. Der tägliche Wochenbericht erinnert Sie daran, so bald wie möglich eine halbe Stunde vor Ablauf der Frist einzureichen.

1.3 Warum die Verzögerungswarteschlange verwenden? Die einfachste und effektivste Methode besteht darin, eine geplante Aufgabe zu schreiben, um die Datenbank zu scannen und eine Geschäftsrealisierung zu erreichen. Wenn die Datenmenge Millionen oder mehrere zehn Millionen erreicht, ist es leicht, betroffen zu sein, wenn die Datenbank regelmäßig gescannt wird. Ich glaube, jeder weiß, dass es sehr ineffizient sein wird, die Tabelle regelmäßig zu scannen, wenn das Zeitintervall relativ klein ist, der nächste Scan beginnt, bevor der Scan abgeschlossen ist. Zu diesem Zeitpunkt kann es sehr effektiv sein, eine Verzögerungswarteschlange zu verwenden.

Mehrere Möglichkeiten zur Implementierung der Verzögerungswarteschlange

Quarz-geplante Aufgaben

  • DelayQueue-Verzögerungswarteschlange

  • Redis sortierter Satz Redis

  • Abgelaufener Schlüsselüberwachungsrückruf

  • RabbitMQ Dead Letter Queue

  • RabbitMQ implementiert eine Verzögerungswarteschlange basierend auf dem Plug-in-Rad-Zeit-Rad-Algorithmus. Verwenden Sie die Ablaufzeit der Aufgabe als Bewertung, verwenden Sie die Standardreihenfolgefunktion von zset, ermitteln Sie das Element mit dem kleinsten Bewertungswert (dh die zuletzt abgelaufene Aufgabe) und beurteilen Sie die Systemzeit und die Ablaufzeit der Aufgabe . Wenn die Ablaufzeit erreicht ist, führen Sie das Geschäft aus, löschen Sie die abgelaufene Aufgabe und fahren Sie mit der Beurteilung des nächsten Elements fort. Wenn es nicht abgelaufen ist, wird es für einen bestimmten Zeitraum (z. B. 1 Sekunde) in den Ruhezustand versetzt Wenn es leer ist, schläft es auch eine Zeit lang.

  • Fügen Sie über den Befehl zadd Elemente zur Warteschlangenverzögerungswarteschlange hinzu und legen Sie den Bewertungswert fest, um die Ablaufzeit des Elements anzuzeigen. Fügen Sie der Verzögerungswarteschlange drei Order1, Order2 und Order3 hinzu, die nach 10 Sekunden und 20 Sekunden ablaufen bzw. 30 Sekunden.
  • zadd delayqueue 3 order3

    Der Verbraucher fragt die Warteschlangenverzögerung ab, sortiert die Elemente und vergleicht die Mindestzeit mit der aktuellen Zeit. Wenn sie kleiner als die aktuelle Zeit ist, bedeutet dies, dass der Schlüssel abgelaufen ist und entfernt wurde.
  • /**
     * 消费消息
     */
    public void pollOrderQueue() {
        while (true) {
            Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);
            String value = ((Tuple) set.toArray()[0]).getElement();
            int score = (int) ((Tuple) set.toArray()[0]).getScore();
            Calendar cal = Calendar.getInstance();
            int nowSecond = (int) (cal.getTimeInMillis() / 1000);
            if (nowSecond >= score) {
                jedis.zrem(DELAY_QUEUE, value);
                System.out.println(sdf.format(new Date()) + " removed key:" + value);
            }
            if (jedis.zcard(DELAY_QUEUE) <= 0) {
                System.out.println(sdf.format(new Date()) + " zset empty ");
                return;
            }
            Thread.sleep(1000);
        }
    }
Wir sehen, dass die Ausführungsergebnisse wie erwartet sind:

2020-05-07 13:24:09 hinzufügen fertig.

2020-05-07 13:24:19 entfernter Schlüssel:order1

2020-05-07 13:24:29 entfernter Schlüssel:order2Was ist die Methode zur Implementierung einer Verzögerungswarteschlange in Redis?2020-05-07 13:24:39 entfernter Schlüssel:order3

2020-05-07 13:24:39 zset empty

3. Redis abgelaufener Schlüssel-Listening-Callback

Redis key Das Ablauf-Callback-Ereignis kann auch den Effekt einer Verzögerung der Warteschlange erzielen. Einfach ausgedrückt aktivieren wir das Ereignis zur Überwachung, ob der Schlüssel abgelaufen ist, und ein Callback-Ereignis wird ausgelöst.

Um notify-keyspace-events Ex zu aktivieren, müssen Sie die Datei redis.conf bearbeiten. notify-keyspace-events Ex

Redis-Abhörkonfiguration, Bean RedisMessageListenerContainer injizieren.


Zweitens konfigurieren Sie den Redis-Listener. Schreiben Sie abschließend die Rückrufmethode zum Abhören des Redis-Schlüsselablaufs.

@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
         return container;
    }
}

Beim Schreiben der Rückruf-Abhörmethode für den Redis-Ablauf muss KeyExpirationEventMessageListener geerbt werden, was dem Abhören von MQ-Nachrichten etwas ähnelt.
rrree

Dieser Code wurde ganz einfach geschrieben. Testen Sie als Nächstes einen Schlüssel im Redis-Cli-Client und legen Sie eine Ablaufzeit von 3 Sekunden fest.

set xiaofu 123 ex 3

Diesen abgelaufenen Schlüssel auf der Konsole erfolgreich überwacht.

Der überwachte abgelaufene Schlüssel ist: xiaofu

4. Geplante Aufgabe von Quartz

Quartz ist ein sehr klassisches Aufgabenplanungs-Framework. Als Redis und RabbitMQ noch nicht weit verbreitet waren, wurde die Funktion zur Stornierung von Bestellungen im Laufe der Zeit durch geplante Aufgaben implementiert. von.

Quartz-Abhängigkeiten importieren

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
    super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
      String expiredKey = message.toString();
      System.out.println("监听到key:" + expiredKey + "已过期");
    }
}

Geplante Aufgaben schreiben

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
>在启动类中使用@EnableScheduling注解开启定时任务功能。
```java
@SpringBootApplication
@EnableScheduling
public class DelayQueueApplication {
    public static void main(String[] args) {
        SpringApplication.run(DelayQueueApplication.class, args);
    }
}

5. DelayQueue-Verzögerungswarteschlange

JDK bietet eine Reihe von APIs zum Implementieren von Verzögerungswarteschlangen, die sich in DelayQueue unter dem Paket Java.util.concurrent befinden.

DelayQueue是一个BlockingQueue(无界阻塞)队列,它本质就是封装了一个PriorityQueue(优先队列),PriorityQueue内部使用完全二叉堆(不知道的自行了解哈)来实现队列元素排序,我们在向DelayQueue队列中添加元素时,会给元素一个Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay时间才允许从队列中取出。队列中可以放基本数据类型或自定义实体类,在存放基本数据类型时,优先队列中元素默认升序排列,自定义实体类就需要我们根据类属性值比较计算了。 先简单实现一下看看效果,添加三个order入队DelayQueue,分别设置订单在当前时间的5秒、10秒、15秒后取消。

要实现DelayQueue延时队列,队中元素要implements Delayed 接口,这哥接口里只有一个getDelay方法,用于设置延期时间。在Order类中,compareTo方法的作用是对队列中的元素进行排列。

public class Order implements Delayed {
/**
 * 延迟时间
 */
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private long time;
String name;
public Order(String name, long time, TimeUnit unit) {
    this.name = name;
    this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
    return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
    Order Order = (Order) o;
    long diff = this.time - Order.time;
    if (diff <= 0) {
        return -1;
    } else {
        return 1;
    }
}
}

DelayQueue的put方法是线程安全的,因为put方法内部使用了ReentrantLock锁进行线程同步。DelayQueue还提供了两种出队的方法poll()和take() , poll()为非阻塞获取,没有到期的元素直接返回null;take()阻塞方式获取,没有到期的元素线程将会等待。

public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
    Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
    Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
    Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
    DelayQueue<Order> delayQueue = new DelayQueue<>();
    delayQueue.put(Order1);
    delayQueue.put(Order2);
    delayQueue.put(Order3);
    System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
    while (delayQueue.size() != 0) {
        /**
         * 取队列头部元素是否过期
         */
        Order task = delayQueue.poll();
        if (task != null) {
            System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name,  
            LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        }
        Thread.sleep(1000);
    }
}
}

上边只是简单的实现入队与出队的操作,实际开发中会有专门的线程,负责消息的入队与消费。

执行后看到结果如下,Order1、Order2、Order3 分别在 5秒、10秒、15秒后被执行,至此就用DelayQueue实现了延时队列。

订单延迟队列开始时间:2020-05-06 14:59:09

订单:{Order1}被取消, 取消时间:{2020-05-06 14:59:14}

订单:{Order2}被取消, 取消时间:{2020-05-06 14:59:19}

订单:{Order3}被取消, 取消时间:{2020-05-06 14:59:24}

6、RabbitMQ 延时队列

利用 RabbitMQ 做延时队列是比较常见的一种方式,而实际上RabbitMQ 自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTL和 DXL这两个属性间接实现的。

先来认识一下 TTL和 DXL两个概念:

Time To Live(TTL) :

TTL 顾名思义:指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。

RabbitMQ 可以从两种维度设置消息过期时间,分别是队列和消息本身

设置队列过期时间,那么队列中所有消息都具有相同的过期时间。可以在队列中为每条消息单独设置过期时间,即使每个消息的TTL不同也可以实现。若队列和队列中消息的TTL同时被设置,则TTL的值以两者中较小的那个为准。如果队列中的消息存储时间超过了预设的TTL过期时间,那么它就会变成Dead Letter(死信)。

Dead Letter Exchanges(DLX)

DLX即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQ的 Queue(队列)可以配置两个参数x-dead-letter-exchange 和 x-dead-letter-routing-key(可选),一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。

x-dead-letter-exchange:队列中出现Dead Letter后将Dead Letter重新路由转发到指定 exchange(交换机)。

x-dead-letter-routing-key:指定routing-key发送,一般为要指定转发的队列。

队列出现Dead Letter的情况有:

消息或者队列的TTL过期

队列达到最大长度

消息被消费端拒绝(basic.reject or basic.nack)

下边结合一张图看看如何实现超30分钟未支付关单功能,我们将订单消息A0001发送到延迟队列order.delay.queue,并设置x-message-tt消息存活时间为30分钟,当到达30分钟后订单消息A0001成为了Dead Letter(死信),延迟队列检测到有死信,通过配置x-dead-letter-exchange,将死信重新转发到能正常消费的关单队列,直接监听关单队列处理关单逻辑即可。

Was ist die Methode zur Implementierung einer Verzögerungswarteschlange in Redis?

发送消息时指定消息延迟的时间

public void send(String delayTimes) {
    amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延迟数据", message -> {
      // 设置延迟毫秒值
      message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
      return message;
    });
  }
}

设置延迟队列出现死信后的转发规则

/**
   * 延时队列
   */
  @Bean(name = "order.delay.queue")
  public Queue getMessageQueue() {
    return QueueBuilder
        .durable(RabbitConstant.DEAD_LETTER_QUEUE)
        // 配置到期后转发的交换
        .withArgument("x-dead-letter-exchange", "order.close.exchange")
        // 配置到期后转发的路由键
        .withArgument("x-dead-letter-routing-key", "order.close.queue")
        .build();
  }

7、时间轮

前面几种实现延迟队列的方法相对简单,比较易于理解。相比之下,时间轮算法稍微有点抽象。kafka、netty都有基于时间轮算法实现延时队列,下边主要实践Netty的延时队列讲一下时间轮是什么原理。

先来看一张时间轮的原理图,解读一下时间轮的几个基本概念

Was ist die Methode zur Implementierung einer Verzögerungswarteschlange in Redis?

wheel :时间轮,图中的圆盘可以看作是钟表的刻度。举个例子,如果一圈round的长度为24秒,共分成8个刻度,那么每个刻度代表3秒。那么时间精度就是 3秒。时间长度 / 刻度数值越大,精度越大。

当添加一个定时、延时任务A,假如会延迟25秒后才会执行,可时间轮一圈round 的长度才24秒,那么此时会根据时间轮长度和刻度得到一个圈数 round和对应的指针位置 index,也是就任务A会绕一圈指向0格子上,此时时间轮会记录该任务的round和 index信息。指针处于0格,当 round=0 且 index=0 时不会执行任务A,因为 round=0 不符合条件。

所以每一个格子代表的是一些时间,比如1秒和25秒 都会指向0格子上,而任务则放在每个格子对应的链表中,这点和HashMap的数据有些类似。

Netty构建延时队列主要用HashedWheelTimer,HashedWheelTimer底层数据结构依然是使用DelayedQueue,只是采用时间轮的算法来实现。

下面我们用Netty 简单实现延时队列,HashedWheelTimer构造函数比较多,解释一下各参数的含义。

ThreadFactory :表示用于生成工作线程,一般采用线程池;

tickDuration和unit:每格的时间间隔,默认100ms;

ticksPerWheel:一圈下来有几格,默认512,而如果传入数值的不是2的N次方,则会调整为大于等于该参数的一个2的N次方数值,有利于优化hash值的计算。

public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
    this(threadFactory, tickDuration, unit, ticksPerWheel, true);
  }

TimerTask:一个定时任务的实现接口,其中run方法包装了定时任务的逻辑。

Timeout:一个定时任务提交到Timer之后返回的句柄,通过这个句柄外部可以取消这个定时任务,并对定时任务的状态进行一些基本的判断。 Timer:是HashedWheelTimer实现的父接口,仅定义了如何提交定时任务和如何停止整个定时机制。

public class NettyDelayQueue {
  public static void main(String[] args) {
    final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);
    //定时任务
    TimerTask task1 = new TimerTask() {
      public void run(Timeout timeout) throws Exception {
        System.out.println("order1 5s 后执行 ");
        timer.newTimeout(this, 5, TimeUnit.SECONDS);//结束时候再次注册
      }
    };
    timer.newTimeout(task1, 5, TimeUnit.SECONDS);
    TimerTask task2 = new TimerTask() {
      public void run(Timeout timeout) throws Exception {
        System.out.println("order2 10s 后执行");
        timer.newTimeout(this, 10, TimeUnit.SECONDS);//结束时候再注册
      }
    };
    timer.newTimeout(task2, 10, TimeUnit.SECONDS);
    //延迟任务
    timer.newTimeout(new TimerTask() {
      public void run(Timeout timeout) throws Exception {
        System.out.println("order3 15s 后执行一次");
      }
    }, 15, TimeUnit.SECONDS);
  }
}

从执行的结果看,order3、order3延时任务只执行了一次,而order2、order1为定时任务,按照不同的周期重复执行。

order1 5s 后执行
order2 10s 后执行
order3 15s 后执行一次
order1 5s 后执行
order2 10s 后执行

Das obige ist der detaillierte Inhalt vonWas ist die Methode zur Implementierung einer Verzögerungswarteschlange in Redis?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen