이 글에서는 특수 데이터 유형 스트림의 관련 콘텐츠를 주로 소개하는 Redis에 대한 관련 지식을 제공합니다. Redis는 비트맵, 하이퍼로그로그, 지리공간, 스트림의 네 가지 특수 데이터 유형을 포함하여 풍부한 데이터 유형을 제공합니다. 스트림 관련 문제에서 모든 분들께 도움이 되기를 바랍니다.
추천 학습: Redis 동영상 튜토리얼
Redis Stream은 Redis 5.0 버전에 새로 추가된 데이터 유형으로 메시지 대기열을 위해 특별히 설계된 데이터 유형입니다.
Redis 5.0 Stream이 출시되기 전에는 메시지 대기열 구현에 모두 다음과 같은 고유한 단점이 있었습니다.
게시 및 구독 모드는 지속할 수 없고 메시지를 안정적으로 저장할 수 없으며 오프라인 재연결 클라이언트에 적합하지 않습니다. 기록 메시지를 읽을 수 없다는 결함;
List의 메시지 대기열 구현 방식은 반복적으로 소비될 수 없습니다. 메시지는 소비된 후에 삭제되며 생산자는 전역적으로 고유한 ID를 직접 구현해야 합니다.
위 문제를 기반으로 Redis 5.0에서는 이번 버전의 가장 중요한 기능이기도 한 Stream 유형을 출시했습니다. 이는 메시지 지속성, 전역 고유 ID 자동 생성을 지원하는 데 사용됩니다. 메시지 확인 모드, 소비자 그룹 모드 등을 지원하여 메시지 대기열을 더욱 안정적이고 신뢰할 수 있게 만듭니다.
스트림 메시지 대기열 작업 명령:
DEL: 전체 스트림 삭제
# XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...] 127.0.0.1:6379> XADD s1 * name sid10t "1665047636078-0" 127.0.0.1:6379> XADD s1 * name sidiot "1665047646214-0" # XDEL key id [id ...] 127.0.0.1:6379> XDEL s1 1665047646214-0 (integer) 1 # DEL key [key ...] 127.0.0.1:6379> DEL s1 (integer) 1
xgroup 생성: 소비자 그룹 생성
XACK:
XPENDING 명령은 각 소비자 그룹에 있는 모든 소비자의 "읽었지만 아직 확인되지 않은" 메시지를 쿼리하는 데 사용할 수 있습니다.
# XLEN key 127.0.0.1:6379> XLEN s1 (integer) 2 # XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] 127.0.0.1:6379> XREAD streams s1 0-0 1) 1) "s1" 2) 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" 127.0.0.1:6379> XREAD count 1 streams s1 0-0 1) 1) "s1" 2) 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" # XADD 了一条消息之后的扩展 127.0.0.1:6379> XREAD streams s1 1665047636078-0 1) 1) "s1" 2) 1) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" 2) 1) "1665053702766-0" 2) 1) "age" 2) "18" # XRANGE key start end [COUNT count] 127.0.0.1:6379> XRANGE s1 - + 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" 3) 1) "1665053702766-0" 2) 1) "age" 2) "18" 127.0.0.1:6379> XRANGE s1 1665047636078-0 1665047646214-0 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" # XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count] 127.0.0.1:6379> XLEN s1 (integer) 3 127.0.0.1:6379> XTRIM s1 maxlen 2 (integer) 1 127.0.0.1:6379> XLEN s1 (integer) 2
삽입에 성공하면 전역 고유 ID: "1665058759764-0"이 반환됩니다. 메시지의 전역 고유 ID는 두 부분으로 구성됩니다.
첫 번째 부분 "1665058759764"는 데이터가 삽입될 때 밀리초 단위로 계산된 현재 서버 시간입니다.
두 번째 부분은 현재 밀리초 내에 삽입된 메시지를 나타냅니다. 0부터 시작하는 일련번호입니다. 예를 들어 "1665058759764-0"은 "1665058759764" 밀리초 내의 첫 번째 메시지를 의미합니다.
소비자가 XREAD 명령을 통해 메시지 대기열에서 메시지를 읽을 때 메시지 ID를 지정하고 이 메시지 ID의 다음 메시지부터 읽기를 시작할 수 있습니다. 입력된 ID를 쿼리하는 메시지).
# XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read] # 需要注意的是,XGROUP CREATE 的 streams 必须是一个存在的 streams,否则会报错; 127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0 (error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically. # 0-0 从头开始消费,$ 从尾开始消费; 127.0.0.1:6379> XADD myStream * name sid10t "1665057823181-0" 127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0 OK 127.0.0.1:6379> XGROUP CREATE myStream cGroup-tail $ OK # XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...] 127.0.0.1:6379> XREADGROUP Group cGroup-top name count 2 STREAMS myStream > 1) 1) "myStream" 2) 1) 1) "1665058086931-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665058090167-0" 2) 1) "name" 2) "sidiot"
읽기 차단(데이터가 없을 때 차단)을 구현하려면 XRAED 호출 시 BLOCK 구성 항목을 설정하여 BRPOP과 유사한 읽기 차단 작업을 구현하면 됩니다.
# * 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID # 往名称为 mymq 的消息队列中插入一条消息,消息的键是 name,值是 sid10t 127.0.0.1:6379> XADD mymq * name sid10t "1665058759764-0"
Stream의 기본 방법은 xadd를 사용하여 메시지를 저장하고 xread를 사용하여 루프에서 메시지 읽기를 차단하여 메시지 대기열의 간단한 버전을 구현합니다.
이 작업 목록은 다음과 같습니다. 이전 버전도 지원됩니다. Stream의 고유한 기능을 살펴보겠습니다. Stream은 XGROUP을 사용하여 소비자 그룹을 생성한 후 XREADGROUP 명령을 사용하여 소비자 그룹의 소비자가 메시지를 읽을 수 있도록 할 수 있습니다.127.0.0.1:6379> XREAD STREAMS mymq 1665058759764-0 (nil) 127.0.0.1:6379> XREAD STREAMS mymq 1665058759763-0 1) 1) "mymq" 2) 1) 1) "1665058759764-0" 2) 1) "name" 2) "sid10t"소비자 그룹 group1의 Consumer1은 mymq 메시지 대기열에서 모든 메시지를 읽습니다. message 명령은 다음과 같습니다.
# 命令最后的 $ 符号表示读取最新的消息 127.0.0.1:6379> XREAD BLOCK 10000 STREAMS mymq $ (nil) (10.01s)
比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq > (nil)
但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息) 。
比如说,刚才 group1 消费组里的 consumer1 消费者消费了一条 id 为 1665058759764-0 的消息,现在用 group2 消费组里的 consumer1 消费者消费消息:
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665058759764-0" 2) 1) "name" 2) "sid10t"
因为我创建两组的消费组都是从第一条消息开始读取,所以可以看到第二组的消费者依然可以消费 id 为 1665058759764-0 的这一条消息。因此,不同的消费组的消费者可以消费同一条消息。
使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。
# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息 127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665060632864-0" 2) 1) "name" 2) "sid10t" # 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息 127.0.0.1:6379> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665060633903-0" 2) 1) "name" 2) "sid10t" # 让 group2 中的 consumer3 从 mymq 消息队列中消费一条消息 127.0.0.1:6379> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665060634962-0" 2) 1) "name" 2) "sid10t"
基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?
Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams “消息已经处理完成”。
消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示:
如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。
例如,我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数,命令如下:
127.0.0.1:6379> XPENDING mymq group2 1) (integer) 4 2) "1665058759764-0" 3) "1665060634962-0" 4) 1) 1) "consumer1" 2) "2" 2) 1) "consumer2" 2) "1" 3) 1) "consumer3" 2) "1"
如果想查看某个消费者具体读取了哪些数据,可以执行下面的命令:
# 查看 group2 里 consumer2 已从 mymq 消息队列中读取了哪些消息 127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2 1) 1) "1665060633903-0" 2) "consumer2" 3) (integer) 1888805 4) (integer) 1
可以看到,consumer2 已读取的消息的 ID 是 1665060633903-0。
一旦消息 1665060633903-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。
127.0.0.1:6379> XACK mymq group2 1665060633903-0 (integer) 1
当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。
127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2 (empty array)
好了,基于 Stream 实现的消息队列就说到这里了,小结一下:
消息保序:XADD/XREAD
阻塞读取:XREAD block
重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;
消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;
支持消费组形式消费数据
Redis 基于 Stream 消息队列与专业的消息队列有哪些差距?
一个专业的消息队列,必须要做到两大块:
消息不可丢。
消息可堆积。
1、Redis Stream 消息会丢失吗?
使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。
Redis Stream 消息队列能不能保证三个环节都不丢失数据?
Redis 生产者会不会丢消息?生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。 从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 ( MQ 中间件) 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。
Redis 消费者会不会丢消息?不会,因为 Stream ( MQ 中间件)会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,但是未被确认的消息。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认 XACK 命令,也能保证消息的不丢失。
Redis 消息中间件会不会丢消息?会,Redis 在以下 2 个场景下,都会导致数据丢失:
AOF 지속성은 매초 디스크에 쓰도록 구성되어 있지만 이 디스크 쓰기 프로세스는 비동기식이며 Redis가 다운되면 데이터 손실 가능성이 있습니다.
마스터-슬레이브 복제도 비동기식이며, 마스터-슬레이브 전환 시 손실. 데이터 가능성(새 창 열림)
보시다시피 Redis는 대기열 미들웨어 링크에서 메시지가 손실되지 않을 것이라고 보장할 수 없습니다. RabbitMQ나 Kafka와 같은 전문 큐 미들웨어는 클러스터를 배포하는 데 사용됩니다. 생산자가 메시지를 게시할 때 큐 미들웨어는 일반적으로 "다중 노드"를 작성합니다. 즉, 이러한 방식으로 노드 중 하나라도 복사본이 있습니다. 실패하더라도 클러스터의 데이터가 손실되지 않도록 보장할 수 있습니다.
2. Redis 스트림 메시지가 누적될 수 있나요?
Redis 데이터는 메모리에 저장됩니다. 즉, 메시지 백로그가 발생하면 Redis 메모리가 계속해서 머신 메모리 한도를 초과하면 OOM 위험에 직면하게 됩니다.
그래서 Redis의 Stream은 이러한 상황을 방지하기 위해 대기열의 최대 길이를 지정하는 기능을 제공합니다.
큐의 최대 길이를 지정할 때 큐 길이가 상한을 초과하면 오래된 메시지는 삭제되고 고정된 길이의 새 메시지만 유지됩니다. 이러한 관점에서 메시지가 백로그될 때 Stream에 지정된 최대 길이가 있으면 메시지가 계속 손실될 수 있습니다.
하지만 Kafka 및 RabbitMQ와 같은 전문 메시지 대기열의 데이터는 디스크에 저장되며 메시지가 백로그되면 더 많은 디스크 공간을 차지합니다.
따라서 Redis를 대기열로 사용하면 두 가지 문제에 직면하게 됩니다.
Redis 자체가 데이터를 잃을 수 있습니다.
메시지 압박이 발생하면 메모리 리소스가 부족해집니다.
Redis를 메시지 대기열로 사용할 수 있는지 여부는 비즈니스 시나리오에 따라 다릅니다.
비즈니스 시나리오가 충분히 간단하고 데이터 손실에 민감하지 않으며 메시지 백로그 가능성이 상대적으로 작은 경우 Redis를 메시지 대기열로 사용하세요. 대기열을 만드는 것은 전적으로 가능합니다.
비즈니스에 메시지 양이 많고 메시지 백로그 확률이 상대적으로 높고 데이터 손실이 허용되지 않는 경우 전문적인 메시지 큐 미들웨어를 사용하는 것이 좋습니다.
보충: Redis 게시/구독 메커니즘을 메시지 대기열로 사용할 수 없는 이유는 무엇입니까?
게시-구독 메커니즘에는 모두 손실된 데이터와 관련된 다음과 같은 단점이 있습니다.
게시/구독 메커니즘은 데이터 유형을 기반으로 구현되지 않으므로 "데이터 지속성" 기능이 없습니다. 게시/구독 메커니즘과 관련되어 있습니다. Redis가 충돌하고 다시 시작되면 작업이 RDB 및 AOF에 기록되지 않습니다. 게시/구독 메커니즘의 모든 데이터가 손실됩니다.
게시 및 구독 모델은 "보내고 잊어버리기" 작업 모드입니다. 구독자가 오프라인 상태가 되어 다시 연결되면 이전 기록 메시지를 사용할 수 없습니다.
소비자가 특정 메시지 백로그, 즉 생산자가 보낸 메시지를 소비할 수 없는 경우, 32M을 초과하거나 60초 이내에 8M 이상으로 유지되면 강제로 연결이 끊어집니다. 매개변수는 구성 파일에 설정되어 있으며 기본값은 client-output-buffer-limit pubsub 32mb 8mb 60입니다.
따라서 게시/구독 메커니즘은 인스턴트 통신 시나리오에만 적합합니다. 예를 들어 센티넬 클러스터 구축(새 창에서 열림) 시나리오에서는 게시/구독 메커니즘을 사용합니다.
추천 학습: Redis 비디오 튜토리얼
위 내용은 Redis 특수 데이터 유형 스트림의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!