搜尋
首頁資料庫Redis如何使用Redis的streams

如何使用Redis的streams

Jun 02, 2023 pm 09:42 PM
redisstreams

起源

自從在 Redis 4.0 引入模組後,使用者開始思考如何解決這些問題。其中一個用戶 Timothy Downs 透過 IRC 和我說:

\<forkfork> 我计划给这个模块增加一个事务日志式的数据类型 —— 这意味着大量的订阅者可以在不导致 redis 内存激增的情况下做一些像发布/订阅那样的事情\<forkfork> 订阅者持有他们在消息队列中的位置,而不是让 Redis 必须维护每个消费者的位置和为每个订阅者复制消息</forkfork></forkfork>

他的想法啟發了我。經過幾天的思考,我明白這或許是我們一舉解決所有問題的機會。我需要去重新構思 「日誌」  的概念是什麼。日誌是個基本的程式設計元素,每個人都使用過它,因為它只是簡單地以追加模式開啟一個文件,並以一定的格式寫入資料。然而 Redis  資料結構必須是抽象的。它們在記憶體中,我們使用記憶體並不是因為我們懶,而是因為使用一些指針,我們可以概念化資料結構並把它們抽象,以使它們擺脫明確的限制。例如,一般來說日誌有幾個問題:偏移不是邏輯化的,而是真實的位元組偏移,如果你想要與條目插入的時間相關的邏輯偏移該怎麼辦?我們有範圍查詢可用。同樣,日誌通常很難進行垃圾回收:在一個只能進行追加操作的資料結構中怎麼去刪除舊的元素?好吧,在我們理想的日誌中,我們只需要說,我想要數字***的那個條目,而舊的元素一個也不要,等等。

當我從 Timothy 的想法中受到啟發,去嘗試著寫一個規範的時候,我使用了 Redis 集群中的 radix 樹去實現,優化了它內部的某些部分。這為實現一個有效利用空間的日誌提供了基礎,而且仍然有可能在對數時間logarithmic time內存取範圍。同時,我開始去讀關於Kafka 的流相關的內容以獲得另外的靈感,它也非常適合我的設計,***借鑒了Kafka消費組consumer groups的概念,並且再次針對 Redis 進行最佳化,以適用於Redis  在記憶體中使用的情況。然而,該規範僅停留在紙面上,在一段時間後我幾乎把它從頭到尾重寫了一遍,以便將我與別人討論的所得到的許多建議一起增加到  Redis 升級中。我希望 Redis 流能成為對於時間序列有用的特性,而不僅是一個常見的事件和訊息類別的應用程式。

讓我們寫一些程式碼吧

從 Redis 大會回來後,整個夏天我都在實作一個叫 listpack 的函式庫。這個函式庫是 ziplist.c 的繼任者,那是一個表示在單一指派中的字串元素清單的資料結構。它是一個非常特殊的序列化格式,其特點在於也能夠以逆序(從右到左)解析:以便在各種用例中取代 ziplists。

結合 radix 樹和 listpacks 的特性,它可以很容易地去建立一個空間高效的日誌,並且還是可索引的,這意味著允許透過 ID  和時間進行隨機存取。自從這些就緒後,我開始去寫一些程式碼來實現流資料結構。我還在完成這個實現,不管怎樣,現在在 Github 上的 Redis 的  streams 分支裡它已經可以跑起來了。我並沒有聲稱那個API 是100%  的最終版本,但是,這有兩個有趣的事實:一,在那時只有消費群組是缺失的,加上一些不太重要的操作流的命令,但是,所有的大的面向都已經實現了。二,一旦各方面都比較穩定了之後,我決定大概用兩個月的時間將所有的流的特性向後移植backport到  4.0 分支。這意味著 Redis 用戶想要使用串流,不用等待 Redis 4.2  發布,它們在生產環境馬上就可用了。這是可能的,因為作為一個新的資料結構,幾乎所有的程式碼改變都出現在新的程式碼裡面。除了阻塞列表操作之外:程式碼被重構了,我們對於流和列表阻塞操作共享了相同的程式碼,而大大簡化了  Redis 內部實作。

教學:歡迎使用 Redis 的 streams

在某些方面,你可以認為串流是 Redis 清單的增強版本。流元素不再是單一的字串,而是由欄位fieldvalue所組成的物件。範圍查詢更適用且更快。在流中,每個條目都有一個 ID,它是一個邏輯偏移量。不同的客戶端可以阻塞等待blocking-wait比指定的 ID 更大的元素。 Redis 流的一個基本的命令是 XADD。是的,所有的 Redis 流指令都是以一個 X 為前綴的。

> XADD mystream * sensor-id 1234 temperature 10.51506871964177.0

这个 XADD 命令将追加指定的条目作为一个指定的流 —— “mystream” 的新元素。上面示例中的这个条目有两个字段:sensor-idtemperature,每个条目在同一个流中可以有不同的字段。使用相同的字段名可以更好地利用内存。有意思的是,字段的排序是可以保证顺序的。XADD 仅返回插入的条目的 ID,因为在第三个参数中是星号(*),表示由命令自动生成 ID。通常这样做就够了,但是也可以去强制指定一个 ID,这种情况用于复制这个命令到从服务器slave serverAOFappend-only file文件。

这个 ID 是由两部分组成的:一个毫秒时间和一个序列号。1506871964177 是毫秒时间,它只是一个毫秒级的 UNIX 时间戳。圆点(.)后面的数字 0  是一个序号,它是为了区分相同毫秒数的条目增加上去的。这两个数字都是 64  位的无符号整数。这意味着,我们可以在流中增加所有想要的条目,即使是在同一毫秒中。ID 的毫秒部分使用 Redis 服务器的当前本地时间生成的  ID 和流中的***一个条目 ID 两者间的***的一个。因此,举例来说,即使是计算机时间回跳,这个 ID  仍然是增加的。在某些情况下,你可以认为流条目的 ID 是完整的 128  位数字。然而,事实上它们与被添加到的实例的本地时间有关,这意味着我们可以在毫秒级的精度的范围随意查询。

正如你想的那样,快速添加两个条目后,结果是仅一个序号递增了。我们可以用一个 MULTI/EXEC 块来简单模拟“快速插入”:

> MULTIOK> XADD mystream * foo 10QUEUED> XADD mystream * bar 20QUEUED> EXEC1) 1506872463535.02) 1506872463535.1

在上面的示例中,也展示了无需指定任何初始模式schema的情况下,对不同的条目使用不同的字段。会发生什么呢?就像前面提到的一样,只有每个块(它通常包含  50-150  个消息内容)的***个消息被使用。并且,相同字段的连续条目都使用了一个标志进行了压缩,这个标志表示与“它们与这个块中的***个条目的字段相同”。因此,使用相同字段的连续消息可以节省许多内存,即使是字段集随着时间发生缓慢变化的情况下也很节省内存。

为了从流中检索数据,这里有两种方法:范围查询,它是通过 XRANGE 命令实现的;流播streaming,它是通过 XREAD 命令实现的。XRANGE 命令仅取得包括从开始到停止范围内的全部条目。因此,举例来说,如果我知道它的 ID,我可以使用如下的命名取得单个条目:

> XRANGE mystream 1506871964177.0 1506871964177.01) 1) 1506871964177.0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "10.5"

不管怎样,你都可以使用指定的开始符号 - 和停止符号 + 表示最小和***的 ID。为了限制返回条目的数量,也可以使用 COUNT 选项。下面是一个更复杂的 XRANGE 示例:

> XRANGE mystream - + COUNT 21) 1) 1506871964177.0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "10.5"2) 1) 1506872463535.0   2) 1) "foo"      2) "10"

这里我们讲的是 ID 的范围,然后,为了取得在一个给定时间范围内的特定范围的元素,你可以使用 XRANGE,因为 ID 的“序号” 部分可以省略。因此,你可以只指定“毫秒”时间即可,下面的命令的意思是:“从 UNIX 时间 1506872463 开始给我 10 个条目”:

127.0.0.1:6379> XRANGE mystream 1506872463000 + COUNT 101) 1) 1506872463535.0   2) 1) "foo"      2) "10"2) 1) 1506872463535.1   2) 1) "bar"      2) "20"

关于 XRANGE 需要注意的最重要的事情是,假设我们在回复中收到 ID,随后连续的 ID 只是增加了序号部分,所以可以使用 XRANGE 遍历整个流,接收每个调用的指定个数的元素。Redis 中的*SCAN 系列命令允许迭代 Redis 数据结构,尽管事实上它们不是为迭代设计的,但这样可以避免再犯相同的错误。

使用 XREAD 处理流播:阻塞新的数据

当我们想通过 ID 或时间去访问流中的一个范围或者是通过 ID 去获取单个元素时,使用 XRANGE 是非常***的。然而,在使用流的案例中,当数据到达时,它必须由不同的客户端来消费时,这就不是一个很好的解决方案,这需要某种形式的汇聚池pooling。(对于 某些 应用程序来说,这可能是个好主意,因为它们仅是偶尔连接查询的)

XREAD 命令是为读取设计的,在同一个时间,从多个流中仅指定我们从该流中得到的***条目的  ID。此外,如果没有数据可用,我们可以要求阻塞,当数据到达时,就解除阻塞。类似于阻塞列表操作产生的效果,但是这里并没有消费从流中得到的数据,并且多个客户端可以同时访问同一份数据。

这里有一个典型的 XREAD 调用示例:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ $

它的意思是:从 mystreamotherstream 取得数据。如果没有数据可用,阻塞客户端 5000 毫秒。在 STREAMS 选项之后指定我们想要监听的关键字,***的是指定想要监听的 ID,指定的 ID 为 $ 的意思是:假设我现在需要流中的所有元素,因此,只需要从下一个到达的元素开始给我。

如果我从另一个客户端发送这样的命令:

> XADD otherstream * message “Hi There”

XREAD 侧会出现什么情况呢?

1) 1) "otherstream"   2) 1) 1) 1506935385635.0         2) 1) "message"            2) "Hi There"

与收到的数据一起,我们也得到了数据的关键字。在下次调用中,我们将使用接收到的***消息的 ID:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ 1506935385635.0

依次类推。然而需要注意的是使用方式,客户端有可能在一个非常大的延迟之后再次连接(因为它处理消息需要时间,或者其它什么原因)。在这种情况下,期间会有很多消息堆积,为了确保客户端不被消息淹没,以及服务器不会因为给单个客户端提供大量消息而浪费太多的时间,使用 XREADCOUNT 选项是非常明智的。

流封顶

目前看起来还不错……然而,有些时候,流需要删除一些旧的消息。幸运的是,这可以使用 XADD 命令的 MAXLEN 选项去做:

> XADD mystream MAXLEN 1000000 * field1 value1 field2 value2

它是基本意思是,如果在流中添加新元素后发现消息数量超过了 1000000 个,那么就删除旧的消息,以便于元素总量重新回到 1000000 以内。它很像是在列表中使用的 RPUSH + LTRIM,但是,这次我们是使用了一个内置机制去完成的。然而,需要注意的是,上面的意思是每次我们增加一个新的消息时,我们还需要另外的工作去从流中删除旧的消息。这将消耗一些 CPU 资源,所以在计算 MAXLEN 之前,尽可能使用 ~ 符号,以表明我们不要求非常 精确 的 1000000 个消息,就是稍微多一些也不是大问题:

> XADD mystream MAXLEN ~ 1000000 * foo bar

这种方式的 XADD 仅当它可以删除整个节点的时候才会删除消息。相比普通的 XADD,这种方式几乎可以自由地对流进行封顶。

消费组(开发中)

这是***个 Redis 中尚未实现而在开发中的特性。灵感也是来自 Kafka,尽管在这里是以不同的方式实现的。重点是使用了 XREAD,客户端也可以增加一个 GROUP <name></name> 选项。相同组的所有客户端将自动得到 不同的 消息。当然,同一个流可以被多个组读取。在这种情况下,所有的组将收到流中到达的消息的相同副本。但是,在每个组内,消息是不会重复的。

当指定组时,能够指定一个 RETRY <milliseconds></milliseconds> 选项去扩展组:在这种情况下,如果消息没有通过 XACK 进行确认,它将在指定的毫秒数后进行再次投递。这将为消息投递提供更佳的可靠性,这种情况下,客户端没有私有的方法将消息标记为已处理。这一部分也正在开发中。

内存使用和节省加载时间

因为用来建模 Redis 流的设计,内存使用率是非常低的。这取决于它们的字段、值的数量和长度,对于简单的消息,每使用 100MB  内存可以有几百万条消息。此外,该格式设想为需要极少的序列化:listpack 块以 radix  树节点方式存储,在磁盘上和内存中都以相同方式表示的,因此它们可以很轻松地存储和读取。例如,Redis 可以在 0.3 秒内从 RDB 文件中读取  500 万个条目。这使流的复制和持久存储非常高效。

我还计划允许从条目中间进行部分删除。现在仅实现了一部分,策略是在条目在标记中标识条目为已删除,并且,当已删除条目占全部条目的比例达到指定值时,这个块将被回收重写,如果需要,它将被连到相邻的另一个块上,以避免碎片化。

以上是如何使用Redis的streams的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文轉載於:亿速云。如有侵權,請聯絡admin@php.cn刪除
REDIS:鍵值數據存儲的指南REDIS:鍵值數據存儲的指南May 02, 2025 am 12:10 AM

Redis是一個開源的內存數據結構存儲,用作數據庫、緩存和消息代理,適合需要快速響應和高並發的場景。 1.Redis使用內存存儲數據,提供微秒級的讀寫速度。 2.它支持多種數據結構,如字符串、列表、集合等。 3.Redis通過RDB和AOF機制實現數據持久化。 4.使用單線程模型和多路復用技術高效處理請求。 5.性能優化策略包括LRU算法和集群模式。

REDIS:緩存,會話管理等REDIS:緩存,會話管理等May 01, 2025 am 12:03 AM

Redis的功能主要包括緩存、會話管理和其他功能:1)緩存功能通過內存存儲數據,提高讀取速度,適用於電商網站等高頻訪問場景;2)會話管理功能在分佈式系統中共享會話數據,並通過過期時間機制自動清理;3)其他功能如發布-訂閱模式、分佈式鎖和計數器,適用於實時消息推送和多線程系統等場景。

REDIS:探索其核心功能和好處REDIS:探索其核心功能和好處Apr 30, 2025 am 12:22 AM

Redis的核心功能包括內存存儲和持久化機制。 1)內存存儲提供極快的讀寫速度,適用於高性能應用。 2)持久化通過RDB和AOF兩種方式確保數據不丟失,選擇依據應用需求。

REDIS的服務器端操作:它提供的REDIS的服務器端操作:它提供的Apr 29, 2025 am 12:21 AM

Redis'sserver-sedierations offerfunctions andTriggersForexeCutingCompleXoperationsontheserver.1)函數functionsAllowCompOustomoperationsInlua,JavaScript,javaScript,orredis'sscriptinglanguigh,增強效率和增強性。 2)

REDIS:數據庫還是服務器?揭開角色的神秘面紗REDIS:數據庫還是服務器?揭開角色的神秘面紗Apr 28, 2025 am 12:06 AM

redisisbothadatabaseandaserver.1)asadatabase,ituseSin-memorystorageforfastaccess,ifealforreal-timeapplications andCaching.2)Asaserver,ItsupportsPub/submessagingAndluAsessingandluAsessingandluascriptingftingftingftingftingftingftingftingfinteral-timecommunicationandserverserverserverserverserverserverserver-soperations。

REDIS:NOSQL方法的優勢REDIS:NOSQL方法的優勢Apr 27, 2025 am 12:09 AM

Redis是NoSQL數據庫,提供高性能和靈活性。 1)通過鍵值對存儲數據,適合處理大規模數據和高並發。 2)內存存儲和單線程模型確保快速讀寫和原子性。 3)使用RDB和AOF機制進行數據持久化,支持高可用性和橫向擴展。

REDIS:了解其架構和目的REDIS:了解其架構和目的Apr 26, 2025 am 12:11 AM

Redis是一种内存数据结构存储系统,主要用作数据库、缓存和消息代理。它的核心特点包括单线程模型、I/O多路复用、持久化机制、复制与集群功能。Redis在实际应用中常用于缓存、会话存储和消息队列,通过选择合适的数据结构、使用管道和事务、以及进行监控和调优,可以显著提升其性能。

REDIS與SQL數據庫:關鍵差異REDIS與SQL數據庫:關鍵差異Apr 25, 2025 am 12:02 AM

Redis和SQL數據庫的主要區別在於:Redis是內存數據庫,適用於高性能和靈活性需求;SQL數據庫是關係型數據庫,適用於復雜查詢和數據一致性需求。具體來說,1)Redis提供高速數據訪問和緩存服務,支持多種數據類型,適用於緩存和實時數據處理;2)SQL數據庫通過表格結構管理數據,支持複雜查詢和事務處理,適用於電商和金融系統等需要數據一致性的場景。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

mPDF

mPDF

mPDF是一個PHP庫,可以從UTF-8編碼的HTML產生PDF檔案。原作者Ian Back編寫mPDF以從他的網站上「即時」輸出PDF文件,並處理不同的語言。與原始腳本如HTML2FPDF相比,它的速度較慢,並且在使用Unicode字體時產生的檔案較大,但支援CSS樣式等,並進行了大量增強。支援幾乎所有語言,包括RTL(阿拉伯語和希伯來語)和CJK(中日韓)。支援嵌套的區塊級元素(如P、DIV),

Safe Exam Browser

Safe Exam Browser

Safe Exam Browser是一個安全的瀏覽器環境,安全地進行線上考試。該軟體將任何電腦變成一個安全的工作站。它控制對任何實用工具的訪問,並防止學生使用未經授權的資源。

MantisBT

MantisBT

Mantis是一個易於部署的基於Web的缺陷追蹤工具,用於幫助產品缺陷追蹤。它需要PHP、MySQL和一個Web伺服器。請查看我們的演示和託管服務。

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

將Eclipse與SAP NetWeaver應用伺服器整合。

VSCode Windows 64位元 下載

VSCode Windows 64位元 下載

微軟推出的免費、功能強大的一款IDE編輯器