検索
ホームページデータベースRedisRedis が遅延キューを実装する方法

    使用

    依存関係構成

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.12.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.homeey</groupId>
        <artifactId>redis-delay-queue</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>redis-delay-queue</name>
        <description>redis-delay-queue</description>
        <properties>
            <java.version>1.8</java.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson-spring-boot-starter</artifactId>
                <version>3.19.3</version>
            </dependency>
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson-spring-data-23</artifactId>
                <version>3.19.3</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <configuration>
                        <excludes>
                            <exclude>
                                <groupId>org.projectlombok</groupId>
                                <artifactId>lombok</artifactId>
                            </exclude>
                        </excludes>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>

    注: redisson と springboot の互換性の問題を処理します

    構成ファイル

    springboot が redisson を統合するには 3 つの方法があります

    • 最初の: 一般的な redis 構成 redisson 自動構成 [最も単純な]

    • 2 番目のタイプ: 別の redisson 構成ファイルを使用します

    • 3 番目のタイプ: spring.redis.redisson を使用して構成キーの下で構成します

    詳細な統合、SpringBoot 統合 Redisson 構成を表示

    spring:
      redis:
        database: 0
        host: localhost
        port: 6379
        timeout: 10000
        lettuce:
          pool:
            max-active: 8
            max-wait: -1
            min-idle: 0
            max-idle: 8

    デモ コード

    package com.homeey.redisdelayqueue.delay;
    
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.redisson.api.RBlockingQueue;
    import org.redisson.api.RDelayedQueue;
    import org.redisson.api.RedissonClient;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 明天的你会因今天到的努力而幸运
     *
     * @author jt4mrg@qq.com
     * 23:11 2023-02-19 2023
     **/
    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class RedissonDelayQueue {
    
        private final RDelayedQueue<String> delayedQueue;
        private final RBlockingQueue<String> blockingQueue;
    
    
        @PostConstruct
        public void init() {
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            executorService.submit(() -> {
                while (true) {
                    try {
                        String task = blockingQueue.take();
                        log.info("rev delay task:{}", task);
                    } catch (Exception e) {
                        log.error("occur error", e);
                    }
                }
            });
        }
    
        public void offerTask(String task, long seconds) {
            log.info("add delay task:{},delay time:{}s", task, seconds);
            delayedQueue.offer(task, seconds, TimeUnit.SECONDS);
        }
    
    
        @Configuration
        static class RedissonDelayQueueConfigure {
    
            @Bean
            public RBlockingQueue<String> blockingQueue(RedissonClient redissonClient) {
                return redissonClient.getBlockingQueue("TOKEN-RENEWAL");
            }
    
            @Bean
            public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockingQueue,
                                                      RedissonClient redissonClient) {
                return redissonClient.getDelayedQueue(blockingQueue);
            }
        }
    }

    実行効果

    Redis が遅延キューを実装する方法

    原理分析

    からRedissonDelayedQueue実装では 4 つのロールが表示されます

    Redis が遅延キューを実装する方法

    • #redisson_lay_queue_timeout:xxx、ソート セット データ タイプ、ストアすべての遅延タスク。遅延タスクの有効期限タイムスタンプ (タスクが送信されたときのタイムスタンプ遅延時間) に従って並べ替えられるため、リストの先頭の最初の要素が遅延キュー全体で実行される最も早いタスクになります。概念は非常に重要です

    • redisson_delay_queue:xxx、リストデータ型、今のところ使い道が見つかりませんが、提出時にここに書きます。タスク、キュー転送 内部の要素は削除されます

    • xxx: 対象キューと呼ばれるリスト データ型 このキューに格納されているタスクはすべて到達しましたコンシューマーが取得できるタスクであるため、上記のデモの RBlockingQueue の take メソッドは、このターゲット キューからタスク

    • redisson_lay_queue_channel:xxx を取得します。これは、遅延タスクを開始するようにクライアントに通知するために使用されるチャネルです。

    キューの作成

    RedissonDelayedQueue遅延キューが作成されると、キュー転送サービスと、遅延キューを実装するための 4 つの重要な色補正キーが指定されています。コア コードは、キュー転送タスク

     QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
                
                @Override
                protected RFuture<Long> pushTaskAsync() {
                    return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                            "local expiredValues = redis.call(&#39;zrangebyscore&#39;, KEYS[2], 0, ARGV[1], &#39;limit&#39;, 0, ARGV[2]); "//拿到zset中过期的值列表
                          + "if #expiredValues > 0 then " //如果有
                              + "for i, v in ipairs(expiredValues) do "
                                  + "local randomId, value = struct.unpack(&#39;dLc0&#39;, v);"//解构消息,在提交任务时打包的消息
                                  + "redis.call(&#39;rpush&#39;, KEYS[1], value);" //放入无前缀的list 队头
                                  + "redis.call(&#39;lrem&#39;, KEYS[3], 1, v);"//移除带前缀list 队尾元素
                              + "end; "
                              + "redis.call(&#39;zrem&#39;, KEYS[2], unpack(expiredValues));" //移除zset中本次读取的过期元素
                          + "end; "
                            // get startTime from scheduler queue head task
                          + "local v = redis.call(&#39;zrange&#39;, KEYS[2], 0, 0, &#39;WITHSCORES&#39;); "//取zset最小分值的元素
                          + "if v[1] ~= nil then "
                             + "return v[2]; " //返回分值,即过期时间
                          + "end "
                          + "return nil;",
                          Arrays.asList(getRawName(), timeoutSetName, queueName),
                          System.currentTimeMillis(), 100);
                }
                
                @Override
                protected RTopic getTopic() {
                    return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
                }
            };

    Producer

    Redis が遅延キューを実装する方法

    コア コード

    RedissonDelayedQueue#offerAsync

     return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
                    "local value = struct.pack(&#39;dLc0&#39;, tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" //打包消息体:消息id,消息长度,消息值
                  + "redis.call(&#39;zadd&#39;, KEYS[2], ARGV[1], value);"//zset中加入消息及其超时分值
                  + "redis.call(&#39;rpush&#39;, KEYS[3], value);" //向带前缀的list中添加消息
                  // if new object added to queue head when publish its startTime 
                  // to all scheduler workers 
                  + "local v = redis.call(&#39;zrange&#39;, KEYS[2], 0, 0); "//取出zset中第一个元素
                  + "if v[1] == value then " //如果最快过期的元素就是这次发送的消息
                     + "redis.call(&#39;publish&#39;, KEYS[4], ARGV[1]); " //channel中发布一下超时时间
                  + "end;",
                  Arrays.asList(getRawName(), timeoutSetName, queueName, channelName),
                  timeout, randomId, encode(e));
    ##を指定することです。 #Consumer

    コンシューマーにとって最も簡単な方法は、プレフィックスなしでリストから BLPOP を読み取ることです

    以上がRedis が遅延キューを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明
    この記事は亿速云で複製されています。侵害がある場合は、admin@php.cn までご連絡ください。
    Redis:キー価値データストアのガイドRedis:キー価値データストアのガイドMay 02, 2025 am 12:10 AM

    Redisは、データベース、キャッシュ、メッセージブローカーとして使用されるオープンソースメモリデータ構造ストレージであり、高速応答と高い並行性が必要なシナリオに適しています。 1.Redisはメモリを使用してデータを保存し、マイクロ秒の読み取り速度と書き込み速度を提供します。 2.文字列、リスト、コレクションなどのさまざまなデータ構造をサポートします。3。Redisは、RDBおよびAOFメカニズムを介してデータの持続性を実現します。 4.シングルスレッドモデルと多重化テクノロジーを使用して、リクエストを効率的に処理します。 5.パフォーマンス最適化戦略には、LRUアルゴリズムとクラスターモードが含まれます。

    Redis:キャッシュ、セッション管理などRedis:キャッシュ、セッション管理などMay 01, 2025 am 12:03 AM

    Redisの関数には、主にキャッシュ、セッション管理、その他の機能が含まれます。1)キャッシュ関数はメモリを介してデータを保存して読み取り速度を向上させ、eコマースWebサイトなどの高周波アクセスシナリオに適しています。 2)セッション管理関数は、分散システムでセッションデータを共有し、有効期限のあるメカニズムを通じて自動的にクレンジングします。 3)リアルタイムメッセージプッシュおよびマルチスレッドシステムおよびその他のシナリオに適した、パブリッシュサブスクライブモード、分散ロック、カウンターなどのその他の機能。

    Redis:そのコア機能と利点の調査Redis:そのコア機能と利点の調査Apr 30, 2025 am 12:22 AM

    Redisのコア関数には、メモリストレージと持続性メカニズムが含まれます。 1)メモリストレージは、高性能アプリケーションに適した非常に高速な読み取り速度と書き込み速度を提供します。 2)永続性は、RDBとAOFによってデータが失われないことを保証し、選択はアプリケーションのニーズに基づいています。

    Redisのサーバー側操作:提供するものRedisのサーバー側操作:提供するものApr 29, 2025 am 12:21 AM

    redis'sserver-sideoperations offferidions and forexuctingcomplexoperationsontheserver.1)機能を調整することで、javascript、orredis'sscriptinglanguage、infulancingscalabilityandmantenmention

    Redis:データベースまたはサーバー?役割を分かりやすいRedis:データベースまたはサーバー?役割を分かりやすいApr 28, 2025 am 12:06 AM

    redisisbothadatabaseandaserver.1)asadatabase、itusesin memorystorage forfastaccess、理想的なforreal-timeapplicationsandcaching.2)asaserver、itupportspub/submessagingandaging andluascriptingforreal-communicationandserver-sideoperation。

    Redis:NOSQLアプローチの利点Redis:NOSQLアプローチの利点Apr 27, 2025 am 12:09 AM

    Redisは、高性能と柔軟性を提供するNOSQLデータベースです。 1)大規模データと高い並行性の処理に適したキー価値ペアを介してデータを保存します。 2)メモリストレージとシングルスレッドモデルは、速い読み取りと書き込みと原子性を確保します。 3)RDBおよびAOFメカニズムを使用してデータを持続し、高可用性とスケールアウトをサポートします。

    Redis:そのアーキテクチャと目的を理解するRedis:そのアーキテクチャと目的を理解するApr 26, 2025 am 12:11 AM

    Redisは、主にデータベース、キャッシュ、メッセージブローカーとして使用されるメモリデータ構造ストレージシステムです。そのコア機能には、シングルスレッドモデル、I/O多重化、持続メカニズム、複製、クラスタリング機能が含まれます。 Redisは、キャッシュ、セッションストレージ、メッセージキューのための実際のアプリケーションで一般的に使用されます。適切なデータ構造を選択し、パイプラインとトランザクションを使用し、監視とチューニングを使用することにより、パフォーマンスを大幅に改善できます。

    Redis vs. SQLデータベース:重要な違​​いRedis vs. SQLデータベース:重要な違​​いApr 25, 2025 am 12:02 AM

    RedisデータベースとSQLデータベースの主な違いは、Redisが高性能および柔軟性要件に適したインメモリデータベースであることです。 SQLデータベースは、複雑なクエリとデータの一貫性要件に適したリレーショナルデータベースです。具体的には、1)Redisは高速データアクセスとキャッシュサービスを提供し、キャッシュおよびリアルタイムのデータ処理に適した複数のデータ型をサポートします。 2)SQLデータベースは、テーブル構造を介してデータを管理し、複雑なクエリとトランザクション処理をサポートし、データの一貫性を必要とするeコマースや金融システムなどのシナリオに適しています。

    See all articles

    ホットAIツール

    Undresser.AI Undress

    Undresser.AI Undress

    リアルなヌード写真を作成する AI 搭載アプリ

    AI Clothes Remover

    AI Clothes Remover

    写真から衣服を削除するオンライン AI ツール。

    Undress AI Tool

    Undress AI Tool

    脱衣画像を無料で

    Clothoff.io

    Clothoff.io

    AI衣類リムーバー

    Video Face Swap

    Video Face Swap

    完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

    ホットツール

    mPDF

    mPDF

    mPDF は、UTF-8 でエンコードされた HTML から PDF ファイルを生成できる PHP ライブラリです。オリジナルの作者である Ian Back は、Web サイトから「オンザフライ」で PDF ファイルを出力し、さまざまな言語を処理するために mPDF を作成しました。 HTML2FPDF などのオリジナルのスクリプトよりも遅く、Unicode フォントを使用すると生成されるファイルが大きくなりますが、CSS スタイルなどをサポートし、多くの機能強化が施されています。 RTL (アラビア語とヘブライ語) や CJK (中国語、日本語、韓国語) を含むほぼすべての言語をサポートします。ネストされたブロックレベル要素 (P、DIV など) をサポートします。

    Safe Exam Browser

    Safe Exam Browser

    Safe Exam Browser は、オンライン試験を安全に受験するための安全なブラウザ環境です。このソフトウェアは、あらゆるコンピュータを安全なワークステーションに変えます。あらゆるユーティリティへのアクセスを制御し、学生が無許可のリソースを使用するのを防ぎます。

    MantisBT

    MantisBT

    Mantis は、製品の欠陥追跡を支援するために設計された、導入が簡単な Web ベースの欠陥追跡ツールです。 PHP、MySQL、Web サーバーが必要です。デモおよびホスティング サービスをチェックしてください。

    SAP NetWeaver Server Adapter for Eclipse

    SAP NetWeaver Server Adapter for Eclipse

    Eclipse を SAP NetWeaver アプリケーション サーバーと統合します。

    VSCode Windows 64 ビットのダウンロード

    VSCode Windows 64 ビットのダウンロード

    Microsoft によって発売された無料で強力な IDE エディター