首頁 >資料庫 >Redis >EMQ X Redis資料持久化怎麼實現

EMQ X Redis資料持久化怎麼實現

WBOY
WBOY轉載
2023-06-02 11:43:371833瀏覽

EMQ X 資料持久化簡介

資料持久化的主要使用場景包括將客戶端上離線狀態,訂閱主題訊息,訊息內容,訊息抵達後發送訊息回執等作業記錄到Redis、MySQL 、PostgreSQL、MongoDB、Cassandra、AWS DynamoDB 等各類別資料庫中供外部服務快速查詢或在服務宕機/客戶端異常離線時保留目前運作狀態,連線復原時復原到先前狀態;持久化也可用於客戶端代理訂閱,設備用戶端上線時,持久化模組直接從資料庫載入預設的主題並完成代理訂閱,降低系統設計複雜度和減少客戶端訂閱通訊開銷。

使用者也可以透過訂閱相關主題的方式來實現類似的功能,但是在企業版中內建的這些持久化的支援執行效率更高、可靠性更強,大大降低了開發者的工作量並提升了系統穩定性。

資料持久化是 EMQ X 的重要功能,僅在企業版支援。

持久化設計

持久化原理是配置事件鉤子觸發時調用處理函數(action),處理函數獲取到相應的資料後按照配置的指令進行處理,實現資料的增、刪、改、查。相同事件鉤子在不同資料庫中可用參數是一樣的,但處理函數(action)因資料庫特性不同而有所差異。整個持久化工作模式和流程如下:

一對一訊息儲存

EMQ X Redis数据持久化怎么实现

  1. Publish 端發布一則訊息;

  2. Backend 將訊息記錄資料庫中;

  3. Subscribe 端訂閱主題;

  4. Backend 從資料庫取得該主題的訊息;

  5. 發送訊息給Subscribe 端;

  6. Subscribe 端確認後Backend 從資料庫移除該訊息;

一對多訊息儲存

EMQ X Redis数据持久化怎么实现

  1. #PUB 端發布一則訊息;

  2. #Backend 將訊息記錄在資料庫中;

  3. SUB1 和SUB2 訂閱主題;

  4. Backend 從資料庫中取得該主題的訊息;

  5. 發送訊息給SUB1 和SUB2;

  6. Backend 記錄SUB1 和SUB2 已讀取訊息位置,下次取得訊息從該位置開始。

Redis 資料持久化

本文以實際範例來說明如何透過 Redis 來儲存相關的資訊。

Redis 是完全開源免費遵守 BSD 協議的高效能 key-value 資料庫。

比較其他 key-value 快取產品 Redis 有以下特點:

  • #Redis 效能極高,單機支援十萬等級的讀寫速度。

  • Redis 支援資料的持久化,可以將記憶體中的資料保存在磁碟中,重啟的時候可以再次載入進行使用。

  • Redis 不僅僅支援簡單的 key-value 類型的數據,同時還提供 list,set,zset,hash 等資料結構的儲存。

  • Redis 支援資料的備份,即 master-slave 模式的資料備份。

讀者可以參考Redis 官方的Quick Start 來安裝Redis(寫本文的時候,Redis 版本為5.0),透過redis-server 指令來啟動Redis 伺服器。

設定EMQ X 伺服器

透過RPM 方式安裝的EMQ X,Redis 相關的設定檔位於/etc/emqx/plugins/emqx_backend_redis.conf,如果只是測試Redis 持久化的功能,大部分設定不需要做更改。唯一需要更改的地方可能是 Redis 伺服器的位址:如果讀者安裝的 Redis 不與 EMQ X 在同一台伺服器上,請指定正確的 Redis 伺服器的位址與連接埠。如下所示:

## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379
backend.redis.pool1.server = 127.0.0.1:6379

保持剩下部分的設定檔不變,然後啟動該外掛程式:

emqx_ctl plugins load emqx_backend_redis

客戶端線上狀態儲存

客戶端上下線時,更新線上狀態、上下線時間、節點用戶端清單至Redis 資料庫。

儘管 EMQ X 本身提供了設備在線狀態 API,但在需要頻繁獲取客戶端在線狀態、上上下線時間的場景下,直接從資料庫獲取該記錄比調用 EMQ X API 更有效率。

設定項目

開啟設定文件,設定Backend 規則:

## 上线
backend.redis.hook.client.connected.1    =  { "action": { "function": "on_client_connected" }, "pool": "pool1"}

## 下线
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

使用範例

瀏覽器開啟http://127.0.0.1 :18083 EMQ X 管理控制台,在工具 -> Websocket 中新建一個客戶端連接,指定clientid 為sub_client:

EMQ X Redis数据持久化怎么实现

#開啟redis-cli 命令列窗口,執行指令keys *,結果如下所示,讀者可以看到在Redis 中儲存了兩個key:

127.0.0.1:6379> keys *
1) "mqtt:node:emqx@127.0.0.1"
2) "mqtt:client:sub_client"

连接列表

插件以 mqtt:node:{node_name} 格式的 key 记录节点下客户端列表及连接时间戳信息,等效操作:

## redis key 为 mqtt:node:{node_name}
HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836

字段说明:

## 节点下在线设备信息
127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
1) "sub_client1" # clientid
2) "1542272836" # 上线时间时间戳
3) "sub_client"
4) "1542272836"

连接详细信息

插件以 mqtt:client:{client_id} 格式的 key 记录客户端在线状态、上线时间,等效操作:

## redis key 为 mqtt:client:{client_id}
HMSET mqtt:client:sub_client state 1 online_at 1542272854

字段说明:

## 客户端在线状态
127.0.0.1:6379> HGETALL mqtt:client:sub_client
1) "state"
2) "0" # 0 离线 1 在线
3) "online_at"
4) "1542272854" # 上线时间戳
5) "offline_at"
6) "undefined" # 离线时间戳

客户端代理订阅

当客户端上线时,代理会加载订阅主题,而存储模块会直接从数据库中读取预设待订阅列表。应用程序可以通过数据层的设定或更改来控制代理订阅列表,以便在需要预定主题进行通信和接收消息的情况下与客户端进行通信。

配置项

打开配置文件,配置 Backend 规则:

## hook: client.connected
## action/function: on_subscribe_lookup
backend.redis.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

使用示例

sub_client 设备上线时,需要为其订阅 sub_client/upstreamsub_client/downlink 两个 QoS 1 的主题:

  1. 插件以 mqtt:sub:{client_id} 格式 key 在 Redis 中初始化代理订阅 Hash:

## redis key 为 mqtt:sub:{client_id}
## HSET key {topic} {qos}
127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1
(integer) 0

127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1
(integer) 0
  1. EMQ X 管理控制台 WebSocket 页面,以 clientid sub_client 新建一个客户端连接,切换至订阅页面,可见当前客户端自动订阅了 sub_client/upstreamsub_client/downlink 两个 QoS 1 的主题:

EMQ X Redis数据持久化怎么实现

  1. 切换回管理控制台 WebSocket 页面,向 sub_client/downlink 主题发布消息,可在消息订阅列表收到发布的消息。

持久化发布消息

配置项

打开配置文件,配置 Backend 规则,支持使用 topic 参数进行消息过滤,此处使用 # 通配符存储任意主题消息:

## hook: message.publish
## action/function: on_message_publish

backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

使用示例

在 EMQ X 管理控制台 WebSocket 页面中,使用 clientid sub_client 建立连接,向主题 upstream_topic 发布多条消息。针对每条消息, EMQ X 将持久化消息列表、消息详情两条记录。

消息列表

EMQ X 将消息列表以 message id 持久化至 mqtt:msg:{topic} Redis 集合中:

## 获取 upstream_topic 主题集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
2) "2VFszTClyjpVtLDLrn1u"
3) "2VFszozkwkYOcbEy8QN9"
4) "2VFszpEc7DfbEqC97I3g"
5) "2VFszpSzRviADmcOeuXd"
6) "2VFszpm3kvvLkJTcdmGU"
7) "2VFt0kuNrOktefX6m4nP"
127.0.0.1:6379>

消息详情

每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
 1) "id"
 2) "2VFt0kuNrOktefX6m4nP" ## message id
 3) "from"
 4) "sub_client" ## client id
 5) "qos"
 6) "2"
 7) "topic"
 8) "up/upstream_topic"
 9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"

获取离线消息

配置项

打开配置文件,配置 Backend 规则:

## hook: session.subscribed
## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub

## 一对一离线消息
backend.redis.hook.session.subscribed.1  = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}

## 一对多离线消息
backend.redis.hook.session.subscribed.2  = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}

使用示例

MQTT 离线消息需满足以下条件:

  1. 以 clean_session = false 连接

  2. 订阅 QoS > 0

  3. 发布 QoS > 0

在 EMQ X 管理控制台中以如下配置建立连接,

EMQ X Redis数据持久化怎么实现

持久化 Retain 消息

配置项

打开配置文件,配置 Backend 规则:

## hook: message.publish
## action/function: on_client_connected、on_message_retain

backend.redis.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

backend.redis.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

消息列表

EMQ X 将消息列表以 message id 持久化至 mqtt:retain:{topic} Redis Hash 中:

## 获取 upstream_topic 主题集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
127.0.0.1:6379>

消息详情

每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
 1) "id"
 2) "2VFt0kuNrOktefX6m4nP" ## message id
 3) "from"
 4) "sub_client" ## client id
 5) "qos"
 6) "2"
 7) "topic"
 8) "up/upstream_topic"
 9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"

以上是EMQ X Redis資料持久化怎麼實現的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除