Rumah >pangkalan data >Redis >Redis aliran jenis data khas
Artikel ini membawakan anda pengetahuan yang berkaitan tentang Redis, yang terutamanya memperkenalkan kandungan strim jenis data khas Redis menyediakan pelbagai jenis data, dan terdapat empat yang istimewa, hyperloglog. geospatial, aliran, mari kita lihat isu berkaitan aliran, saya harap ia akan membantu semua orang.
Pembelajaran yang disyorkan: Tutorial video Redis
Redis Stream ialah jenis data yang baru ditambah dalam versi Redis 5.0 yang pakar dalam mesej barisan Jenis data yang direka bentuk.
Sebelum Redis 5.0 Stream keluar, pelaksanaan baris gilir mesej mempunyai kelemahannya sendiri, seperti:
Terbitkan dan langgan model, yang tidak boleh diteruskan dan oleh itu tidak boleh boleh dipercayai. Kecacatan menyimpan mesej, dan pelanggan yang berada di luar talian dan menyambung semula tidak boleh membaca mesej sejarah
Cara senarai untuk melaksanakan baris gilir mesej tidak boleh digunakan berulang kali, dan mesej akan dipadamkan selepas ia digunakan , dan pengeluar perlu melaksanakan sendiri ID unik secara global.
Berdasarkan isu di atas, Redis 5.0 memperkenalkan jenis Strim, yang juga merupakan ciri terpenting versi ini Ia digunakan untuk melaksanakan baris gilir mesej dengan sempurna penjanaan automatik mesej global ID unik, sokongan untuk mod mesej pengesahan ack, sokongan untuk mod kumpulan pengguna, dll. menjadikan baris gilir mesej lebih stabil dan boleh dipercayai.
Arahan operasi baris gilir mesej strim:
XADD: Sisipkan mesej, dijamin Untuk itu, ID unik secara global boleh dijana secara automatik
XDEL: Padam mesej berdasarkan ID mesej; seluruh Strim;
# XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...] 127.0.0.1:6379> XADD s1 * name sid10t "1665047636078-0" 127.0.0.1:6379> XADD s1 * name sidiot "1665047646214-0" # XDEL key id [id ...] 127.0.0.1:6379> XDEL s1 1665047646214-0 (integer) 1 # DEL key [key ...] 127.0.0.1:6379> DEL s1 (integer) 1
XLEN: Tanya panjang mesej; : Untuk membaca Untuk mendapatkan mesej, anda boleh membaca data mengikut ID;
# XLEN key 127.0.0.1:6379> XLEN s1 (integer) 2 # XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] 127.0.0.1:6379> XREAD streams s1 0-0 1) 1) "s1" 2) 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" 127.0.0.1:6379> XREAD count 1 streams s1 0-0 1) 1) "s1" 2) 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" # XADD 了一条消息之后的扩展 127.0.0.1:6379> XREAD streams s1 1665047636078-0 1) 1) "s1" 2) 1) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" 2) 1) "1665053702766-0" 2) 1) "age" 2) "18" # XRANGE key start end [COUNT count] 127.0.0.1:6379> XRANGE s1 - + 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" 3) 1) "1665053702766-0" 2) 1) "age" 2) "18" 127.0.0.1:6379> XRANGE s1 1665047636078-0 1665047646214-0 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" # XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count] 127.0.0.1:6379> XLEN s1 (integer) 3 127.0.0.1:6379> XTRIM s1 maxlen 2 (integer) 1 127.0.0.1:6379> XLEN s1 (integer) 2Arahan XACK digunakan untuk mengesahkan kepada baris gilir mesej bahawa pemprosesan mesej telah selesai;
Bahagian kedua mewakili nombor urutan mesej mesej yang dimasukkan dalam milisaat semasa, yang bernombor bermula dari 0. Contohnya, "1665058759764-0" bermaksud mesej pertama dalam milisaat "1665058759764".
Apabila pengguna membaca mesej daripada baris gilir mesej melalui arahan XREAD, mereka boleh menentukan ID mesej dan mula membaca dari mesej seterusnya dengan ID mesej ini (perhatikan bahawa ia adalah mesej input Mula membaca maklumat ID seterusnya, bukan mesej ID input pertanyaan).
Jika anda ingin melaksanakan bacaan menyekat (menyekat apabila tiada data), anda boleh menetapkan item konfigurasi BLOK apabila memanggil XRAED untuk melaksanakan operasi bacaan menyekat serupa dengan BRPOP.# XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read] # 需要注意的是,XGROUP CREATE 的 streams 必须是一个存在的 streams,否则会报错; 127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0 (error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically. # 0-0 从头开始消费,$ 从尾开始消费; 127.0.0.1:6379> XADD myStream * name sid10t "1665057823181-0" 127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0 OK 127.0.0.1:6379> XGROUP CREATE myStream cGroup-tail $ OK # XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...] 127.0.0.1:6379> XREADGROUP Group cGroup-top name count 2 STREAMS myStream > 1) 1) "myStream" 2) 1) 1) "1665058086931-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665058090167-0" 2) 1) "name" 2) "sidiot"
Kaedah asas Strim, menggunakan xadd untuk menyimpan mesej dan xread untuk menyekat membaca mesej dalam gelung, boleh melaksanakan versi ringkas baris gilir mesej Proses interaksi adalah seperti yang ditunjukkan dalam rajah di bawah :
# * 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID # 往名称为 mymq 的消息队列中插入一条消息,消息的键是 name,值是 sid10t 127.0.0.1:6379> XADD mymq * name sid10t "1665058759764-0"Senarai operasi yang diperkenalkan sebelum ini turut disokong. Mari kita lihat fungsi unik Strim. Strim boleh menggunakan XGROUP untuk mencipta kumpulan pengguna Selepas membuat kumpulan pengguna, Strim boleh menggunakan arahan XREADGROUP untuk membenarkan pengguna dalam kumpulan pengguna membaca mesej.
比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq > (nil)
但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息) 。
比如说,刚才 group1 消费组里的 consumer1 消费者消费了一条 id 为 1665058759764-0 的消息,现在用 group2 消费组里的 consumer1 消费者消费消息:
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665058759764-0" 2) 1) "name" 2) "sid10t"
因为我创建两组的消费组都是从第一条消息开始读取,所以可以看到第二组的消费者依然可以消费 id 为 1665058759764-0 的这一条消息。因此,不同的消费组的消费者可以消费同一条消息。
使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。
# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息 127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665060632864-0" 2) 1) "name" 2) "sid10t" # 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息 127.0.0.1:6379> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665060633903-0" 2) 1) "name" 2) "sid10t" # 让 group2 中的 consumer3 从 mymq 消息队列中消费一条消息 127.0.0.1:6379> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665060634962-0" 2) 1) "name" 2) "sid10t"
基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?
Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams “消息已经处理完成”。
消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示:
如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。
例如,我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数,命令如下:
127.0.0.1:6379> XPENDING mymq group2 1) (integer) 4 2) "1665058759764-0" 3) "1665060634962-0" 4) 1) 1) "consumer1" 2) "2" 2) 1) "consumer2" 2) "1" 3) 1) "consumer3" 2) "1"
如果想查看某个消费者具体读取了哪些数据,可以执行下面的命令:
# 查看 group2 里 consumer2 已从 mymq 消息队列中读取了哪些消息 127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2 1) 1) "1665060633903-0" 2) "consumer2" 3) (integer) 1888805 4) (integer) 1
可以看到,consumer2 已读取的消息的 ID 是 1665060633903-0。
一旦消息 1665060633903-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。
127.0.0.1:6379> XACK mymq group2 1665060633903-0 (integer) 1
当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。
127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2 (empty array)
好了,基于 Stream 实现的消息队列就说到这里了,小结一下:
消息保序:XADD/XREAD
阻塞读取:XREAD block
重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;
消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;
支持消费组形式消费数据
Redis 基于 Stream 消息队列与专业的消息队列有哪些差距?
一个专业的消息队列,必须要做到两大块:
消息不可丢。
消息可堆积。
1、Redis Stream 消息会丢失吗?
使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。
Redis Stream 消息队列能不能保证三个环节都不丢失数据?
Redis 生产者会不会丢消息?生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。 从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 ( MQ 中间件) 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。
Redis 消费者会不会丢消息?不会,因为 Stream ( MQ 中间件)会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,但是未被确认的消息。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认 XACK 命令,也能保证消息的不丢失。
Redis 消息中间件会不会丢消息?会,Redis 在以下 2 个场景下,都会导致数据丢失:
Kegigihan AOF dikonfigurasikan untuk menulis ke cakera setiap saat, tetapi proses penulisan cakera ini tidak segerak, dan terdapat kemungkinan kehilangan data apabila Redis terputus; replikasi juga tidak segerak Ya, terdapat juga kemungkinan kehilangan data apabila bertukar antara tuan dan hamba (membuka tetingkap baharu).
Seperti yang anda lihat, Redis tidak dapat menjamin bahawa mesej tidak akan hilang dalam pautan perisian tengah baris gilir. Perisian tengah baris gilir profesional seperti RabbitMQ atau Kafka digunakan untuk menggunakan kluster Apabila pengeluar menerbitkan mesej, perisian tengah baris gilir biasanya menulis "berbilang nod", iaitu, terdapat berbilang salinan Dengan cara ini, Walaupun salah satu nod gagal , data dalam kluster boleh dijamin tidak akan hilang.
2. Bolehkah mesej Redis Stream dikumpul? Data Redis disimpan dalam memori, yang bermaksud bahawa sebaik sahaja tunggakan mesej berlaku, memori Redis akan terus berkembang Jika ia melebihi had memori mesin, ia akan menghadapi risiko OOM.
Jadi Redis’ Stream menyediakan fungsi untuk menentukan panjang maksimum baris gilir untuk mengelakkan situasi ini.
Apabila panjang maksimum baris gilir ditentukan, selepas panjang baris gilir melebihi had atas, mesej lama akan dipadamkan dan hanya mesej baharu dengan panjang tetap akan dikekalkan. Dari sudut pandangan ini, jika Strim mempunyai panjang maksimum yang ditentukan apabila mesej tertunggak, mesej mungkin masih hilang.
Tetapi data baris gilir mesej profesional seperti Kafka dan RabbitMQ disimpan pada cakera Apabila mesej tertunggak, ia hanya mengambil lebih banyak ruang cakera.
Oleh itu, apabila menggunakan Redis sebagai baris gilir, anda akan menghadapi dua masalah:
Mekanisme penerbitan/langganan mempunyai kelemahan berikut, semuanya berkaitan dengan data yang hilang:
Mekanisme penerbitan/langganan tidak dilaksanakan berdasarkan sebarang jenis data, jadi ia tidak mempunyai keupayaan "data persistence" ”, iaitu, operasi berkaitan mekanisme penerbitan/langganan, tidak akan ditulis kepada RDB dan AOF Apabila Redis ranap dan dimulakan semula, semua data mekanisme terbitkan/langganan akan hilang.
Mod terbitkan-langgan ialah mod kerja "hantar dan lupakan" Jika pelanggan pergi ke luar talian dan menyambung semula, dia tidak boleh menggunakan mesej sejarah sebelumnya.
Apabila terdapat tunggakan mesej tertentu di pihak pengguna, iaitu mesej yang dihantar oleh pengeluar, dan pengguna tidak boleh menggunakannya, jika melebihi 32M atau kekal melebihi 8M dalam tempoh 60-an, Hujung pengguna akan diputuskan secara paksa Parameter ini ditetapkan dalam fail konfigurasi Nilai lalai ialah pubsub-output-buffer-limit 32mb 8mb 60.
Pembelajaran yang disyorkan:
Tutorial video RedisAtas ialah kandungan terperinci Redis aliran jenis data khas. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!