>  기사  >  데이터 베이스  >  Redis 스트림을 사용하는 방법

Redis 스트림을 사용하는 방법

PHPz
PHPz앞으로
2023-06-02 21:42:47606검색

Origin

Redis 4.0에 모듈이 도입된 이후로 사용자들은 이러한 문제를 해결하는 방법에 대해 고민하기 시작했습니다. 사용자 중 한 명인 Timothy Downs는 IRC를 통해 다음과 같이 말했습니다.

\<forkfork> 我计划给这个模块增加一个事务日志式的数据类型 &mdash;&mdash; 这意味着大量的订阅者可以在不导致 redis 内存激增的情况下做一些像发布/订阅那样的事情\<forkfork> 订阅者持有他们在消息队列中的位置,而不是让 Redis 必须维护每个消费者的位置和为每个订阅者复制消息

그의 아이디어는 나에게 영감을 주었습니다. 며칠 동안 고민한 끝에 나는 이것이 우리의 모든 문제를 단번에 해결할 수 있는 기회일 수도 있다는 것을 깨달았습니다. '로그'라는 개념이 무엇인지 다시 생각해 볼 필요가 있습니다. 로깅은 단순히 파일을 추가 모드로 열고 특정 형식으로 데이터를 쓰기 때문에 누구나 사용해 본 기본 프로그래밍 요소입니다. 그러나 Redis 데이터 구조는 추상적이어야 합니다. 그들은 메모리에 있고 우리가 게으르기 때문에 메모리를 사용하는 것이 아니라 일부 포인터를 사용하여 데이터 구조를 개념화하고 명시적인 제약 조건에서 추상화할 수 있기 때문에 메모리를 사용합니다. 예를 들어 일반적으로 로깅에는 몇 가지 문제가 있습니다. 오프셋은 논리적이 아니지만 실제 바이트 오프셋입니다. 항목이 삽입된 시간과 관련된 논리적 오프셋을 원하면 어떻게 됩니까? 범위 쿼리가 가능합니다. 마찬가지로 로그는 종종 가비지 수집이 어렵습니다. 추가만 가능한 데이터 구조에서 오래된 요소를 어떻게 삭제합니까? 글쎄요, 우리의 이상적인 로그에서는 단지 숫자 ***가 있는 항목을 원하고 이전 요소는 포함하지 않는 등의 항목을 원한다고 말할 것입니다.

Timothy의 아이디어에 영감을 받아 사양을 작성하려고 할 때 Redis 클러스터의 기수 트리를 사용하여 구현하고 내부 일부를 최적화했습니다. 이는 로그 시간 내에 범위에 액세스할 수 있는 공간 효율적인 로그를 구현하기 위한 기반을 제공합니다. 동시에 추가적인 영감을 얻기 위해 Kafka의 스트림 관련 콘텐츠를 읽기 시작했습니다. 내 디자인에도 매우 적합합니다. ***Kafka소비자 그룹의 개념을 빌려 다시 Redis에 최적화했습니다. , 메모리에서 Redis를 사용하는 상황에 적용합니다. 그러나 사양은 문서에 남아 있었고 다른 사람들과의 논의에서 받은 많은 제안을 Redis 업그레이드에 통합하기 위해 일정 기간 동안 처음부터 끝까지 사양을 거의 다시 작성했습니다. Redis 스트리밍이 단지 일반적인 이벤트나 메시지 형식의 애플리케이션이 아닌 시계열에 유용한 기능이 되기를 바랍니다. 코드를 작성해 봅시다

Redis 컨퍼런스에서 돌아온 후 여름방학 동안 listpack이라는 라이브러리를 구현했습니다. 이 라이브러리는 단일 할당에서 문자열 요소 목록을 나타내는 데이터 구조인 ziplist.c의 후속 버전입니다. 다양한 사용 사례에서 ziplist를 대체하기 위해 역순(오른쪽에서 왼쪽)으로 구문 분석하는 기능도 갖춘 매우 특별한 직렬화 형식입니다.

기수 트리와 리스트팩의 기능을 결합하면 인덱스도 가능하고 공간 효율적인 로그를 쉽게 구축할 수 있습니다. 즉, ID와 시간에 따른 임의 액세스가 허용됩니다. 이를 통해 스트리밍 데이터 구조를 구현하는 코드를 작성하기 시작했습니다. 아직 구현을 마무리하는 중이지만 이제 Github의 Redis 스트림 분기에서 실행되고 있습니다. API가 100% 최종이라고 주장하는 것은 아니지만 두 가지 흥미로운 사실이 있습니다. 하나는 그 당시 소비자 그룹만 누락되었고 흐름을 조작하는 데 덜 중요한 명령도 있었지만 모든 큰 측면은 달성. 둘째, 모든 측면이 상대적으로 안정되면 약 두 달 안에 모든 스트리밍 기능을 4.0 브랜치로 백포트하기로 결정했습니다. 이는 스트림을 사용하려는 Redis 사용자가 Redis 4.2가 출시될 때까지 기다릴 필요 없이 즉시 프로덕션에서 사용할 수 있음을 의미합니다. 이는 새로운 데이터 구조로서 거의 모든 코드 변경 사항이 새 코드에 나타나기 때문에 가능합니다. 차단 목록 작업 외에도 코드가 리팩터링되었습니다. 스트림 및 목록 차단 작업에 대해 동일한 코드를 공유하므로 Redis의 내부 구현이 크게 단순화됩니다. ziplist.c 的继任者,那是一个表示在单个分配中的字符串元素列表的数据结构。它是一个非常特殊的序列化格式,其特点在于也能够以逆序(从右到左)解析:以便在各种用例中替代 ziplists。

结合 radix 树和 listpacks 的特性,它可以很容易地去构建一个空间高效的日志,并且还是可索引的,这意味着允许通过 ID  和时间进行随机访问。自从这些就绪后,我开始去写一些代码以实现流数据结构。我还在完成这个实现,不管怎样,现在在 Github 上的 Redis 的  streams 分支里它已经可以跑起来了。我并没有声称那个 API 是 100%  的最终版本,但是,这有两个有意思的事实:一,在那时只有消费群组是缺失的,加上一些不太重要的操作流的命令,但是,所有的大的方面都已经实现了。二,一旦各个方面比较稳定了之后,我决定大概用两个月的时间将所有的流的特性向后移植backport到  4.0 分支。这意味着 Redis 用户想要使用流,不用等待 Redis 4.2  发布,它们在生产环境马上就可用了。这是可能的,因为作为一个新的数据结构,几乎所有的代码改变都出现在新的代码里面。除了阻塞列表操作之外:该代码被重构了,我们对于流和列表阻塞操作共享了相同的代码,而极大地简化了  Redis 内部实现。

教程:欢迎使用 Redis 的 streams

在某些方面,你可以认为流是 Redis 列表的一个增强版本。流元素不再是一个单一的字符串,而是一个字段fieldvalue组成的对象。范围查询更适用而且更快。在流中,每个条目都有一个 ID,它是一个逻辑偏移量。不同的客户端可以阻塞等待blocking-wait比指定的 ID 更大的元素。Redis 流的一个基本的命令是 XADD。是的,所有的 Redis 流命令都是以一个 X

튜토리얼: Redis 스트림에 오신 것을 환영합니다🎜어떤 면에서는 스트림을 Redis 목록의 향상된 버전으로 생각할 수 있습니다. 스트림 요소는 더 이상 단일 문자열이 아니라 🎜field🎜field🎜🎜 및 🎜value🎜value🎜🎜로 구성된 개체입니다. 범위 쿼리가 더 적용 가능하고 빠릅니다. 스트림에서 각 항목에는 논리적 오프셋인 ID가 있습니다. 다양한 클라이언트는 지정된 것보다 큰 ID를 가진 요소를 🎜block-wait🎜blocking-wait🎜🎜할 수 있습니다. Redis 스트리밍의 기본 명령은 XADD입니다. 예, 모든 Redis 스트리밍 명령에는 X 접두사가 붙습니다. 🎜
> XADD mystream * sensor-id 1234 temperature 10.51506871964177.0

这个 XADD 命令将追加指定的条目作为一个指定的流 —— “mystream” 的新元素。上面示例中的这个条目有两个字段:sensor-idtemperature,每个条目在同一个流中可以有不同的字段。使用相同的字段名可以更好地利用内存。有意思的是,字段的排序是可以保证顺序的。XADD 仅返回插入的条目的 ID,因为在第三个参数中是星号(*),表示由命令自动生成 ID。通常这样做就够了,但是也可以去强制指定一个 ID,这种情况用于复制这个命令到从服务器slave serverAOFappend-only file文件。

这个 ID 是由两部分组成的:一个毫秒时间和一个序列号。1506871964177 是毫秒时间,它只是一个毫秒级的 UNIX 时间戳。圆点(.)后面的数字 0  是一个序号,它是为了区分相同毫秒数的条目增加上去的。这两个数字都是 64  位的无符号整数。这意味着,我们可以在流中增加所有想要的条目,即使是在同一毫秒中。ID 的毫秒部分使用 Redis 服务器的当前本地时间生成的  ID 和流中的***一个条目 ID 两者间的***的一个。因此,举例来说,即使是计算机时间回跳,这个 ID  仍然是增加的。在某些情况下,你可以认为流条目的 ID 是完整的 128  位数字。然而,事实上它们与被添加到的实例的本地时间有关,这意味着我们可以在毫秒级的精度的范围随意查询。

正如你想的那样,快速添加两个条目后,结果是仅一个序号递增了。我们可以用一个 MULTI/EXEC 块来简单模拟“快速插入”:

> MULTIOK> XADD mystream * foo 10QUEUED> XADD mystream * bar 20QUEUED> EXEC1) 1506872463535.02) 1506872463535.1

在上面的示例中,也展示了无需指定任何初始模式schema的情况下,对不同的条目使用不同的字段。会发生什么呢?就像前面提到的一样,只有每个块(它通常包含  50-150  个消息内容)的***个消息被使用。并且,相同字段的连续条目都使用了一个标志进行了压缩,这个标志表示与“它们与这个块中的***个条目的字段相同”。因此,使用相同字段的连续消息可以节省许多内存,即使是字段集随着时间发生缓慢变化的情况下也很节省内存。

为了从流中检索数据,这里有两种方法:范围查询,它是通过 XRANGE 命令实现的;流播streaming,它是通过 XREAD 命令实现的。XRANGE 命令仅取得包括从开始到停止范围内的全部条目。因此,举例来说,如果我知道它的 ID,我可以使用如下的命名取得单个条目:

> XRANGE mystream 1506871964177.0 1506871964177.01) 1) 1506871964177.0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "10.5"

不管怎样,你都可以使用指定的开始符号 - 和停止符号 + 表示最小和***的 ID。为了限制返回条目的数量,也可以使用 COUNT 选项。下面是一个更复杂的 XRANGE 示例:

> XRANGE mystream - + COUNT 21) 1) 1506871964177.0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "10.5"2) 1) 1506872463535.0   2) 1) "foo"      2) "10"

这里我们讲的是 ID 的范围,然后,为了取得在一个给定时间范围内的特定范围的元素,你可以使用 XRANGE,因为 ID 的“序号” 部分可以省略。因此,你可以只指定“毫秒”时间即可,下面的命令的意思是:“从 UNIX 时间 1506872463 开始给我 10 个条目”:

127.0.0.1:6379> XRANGE mystream 1506872463000 + COUNT 101) 1) 1506872463535.0   2) 1) "foo"      2) "10"2) 1) 1506872463535.1   2) 1) "bar"      2) "20"

关于 XRANGE 需要注意的最重要的事情是,假设我们在回复中收到 ID,随后连续的 ID 只是增加了序号部分,所以可以使用 XRANGE 遍历整个流,接收每个调用的指定个数的元素。Redis 中的*SCAN 系列命令允许迭代 Redis 数据结构,尽管事实上它们不是为迭代设计的,但这样可以避免再犯相同的错误。

使用 XREAD 处理流播:阻塞新的数据

当我们想通过 ID 或时间去访问流中的一个范围或者是通过 ID 去获取单个元素时,使用 XRANGE 是非常***的。然而,在使用流的案例中,当数据到达时,它必须由不同的客户端来消费时,这就不是一个很好的解决方案,这需要某种形式的汇聚池pooling。(对于 某些 应用程序来说,这可能是个好主意,因为它们仅是偶尔连接查询的)

XREAD 命令是为读取设计的,在同一个时间,从多个流中仅指定我们从该流中得到的***条目的  ID。此外,如果没有数据可用,我们可以要求阻塞,当数据到达时,就解除阻塞。类似于阻塞列表操作产生的效果,但是这里并没有消费从流中得到的数据,并且多个客户端可以同时访问同一份数据。

这里有一个典型的 XREAD 调用示例:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ $

它的意思是:从 mystreamotherstream 取得数据。如果没有数据可用,阻塞客户端 5000 毫秒。在 STREAMS 选项之后指定我们想要监听的关键字,***的是指定想要监听的 ID,指定的 ID 为 $ 的意思是:假设我现在需要流中的所有元素,因此,只需要从下一个到达的元素开始给我。

如果我从另一个客户端发送这样的命令:

> XADD otherstream * message “Hi There”

XREAD 侧会出现什么情况呢?

1) 1) "otherstream"   2) 1) 1) 1506935385635.0         2) 1) "message"            2) "Hi There"

与收到的数据一起,我们也得到了数据的关键字。在下次调用中,我们将使用接收到的***消息的 ID:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ 1506935385635.0

依次类推。然而需要注意的是使用方式,客户端有可能在一个非常大的延迟之后再次连接(因为它处理消息需要时间,或者其它什么原因)。在这种情况下,期间会有很多消息堆积,为了确保客户端不被消息淹没,以及服务器不会因为给单个客户端提供大量消息而浪费太多的时间,使用 XREADCOUNT 选项是非常明智的。

流封顶

目前看起来还不错……然而,有些时候,流需要删除一些旧的消息。幸运的是,这可以使用 XADD 命令的 MAXLEN 选项去做:

> XADD mystream MAXLEN 1000000 * field1 value1 field2 value2

它是基本意思是,如果在流中添加新元素后发现消息数量超过了 1000000 个,那么就删除旧的消息,以便于元素总量重新回到 1000000 以内。它很像是在列表中使用的 RPUSH + LTRIM,但是,这次我们是使用了一个内置机制去完成的。然而,需要注意的是,上面的意思是每次我们增加一个新的消息时,我们还需要另外的工作去从流中删除旧的消息。这将消耗一些 CPU 资源,所以在计算 MAXLEN 之前,尽可能使用 ~ 符号,以表明我们不要求非常 精确 的 1000000 个消息,就是稍微多一些也不是大问题:

> XADD mystream MAXLEN ~ 1000000 * foo bar

这种方式的 XADD 仅当它可以删除整个节点的时候才会删除消息。相比普通的 XADD,这种方式几乎可以自由地对流进行封顶。

消费组(开发中)

这是***个 Redis 中尚未实现而在开发中的特性。灵感也是来自 Kafka,尽管在这里是以不同的方式实现的。重点是使用了 XREAD,客户端也可以增加一个 GROUP <name> 选项。相同组的所有客户端将自动得到 不同的 消息。当然,同一个流可以被多个组读取。在这种情况下,所有的组将收到流中到达的消息的相同副本。但是,在每个组内,消息是不会重复的。

当指定组时,能够指定一个 RETRY <milliseconds> 选项去扩展组:在这种情况下,如果消息没有通过 XACK 进行确认,它将在指定的毫秒数后进行再次投递。这将为消息投递提供更佳的可靠性,这种情况下,客户端没有私有的方法将消息标记为已处理。这一部分也正在开发中。

内存使用和节省加载时间

因为用来建模 Redis 流的设计,内存使用率是非常低的。这取决于它们的字段、值的数量和长度,对于简单的消息,每使用 100MB  内存可以有几百万条消息。此外,该格式设想为需要极少的序列化:listpack 块以 radix  树节点方式存储,在磁盘上和内存中都以相同方式表示的,因此它们可以很轻松地存储和读取。例如,Redis 可以在 0.3 秒内从 RDB 文件中读取  500 万个条目。这使流的复制和持久存储非常高效。

我还计划允许从条目中间进行部分删除。现在仅实现了一部分,策略是在条目在标记中标识条目为已删除,并且,当已删除条目占全部条目的比例达到指定值时,这个块将被回收重写,如果需要,它将被连到相邻的另一个块上,以避免碎片化。

위 내용은 Redis 스트림을 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제