Home  >  Article  >  Database  >  How to implement EMQ X Redis data persistence

How to implement EMQ X Redis data persistence

WBOY
WBOYforward
2023-06-02 11:43:371791browse

EMQ , PostgreSQL, MongoDB, Cassandra, AWS DynamoDB and other various databases for quick query by external services or to retain the current running state when the service is down or the client is abnormally offline, and the previous state is restored when the connection is restored; persistence can also be used on the client Agent subscription: when the device client goes online, the persistence module directly loads the preset topic from the database and completes the agent subscription, reducing the complexity of the system design and the communication overhead of client subscription.

Users can also achieve similar functions by subscribing to related topics, but the persistence support built into the enterprise version is more efficient and reliable, greatly reducing the work of developers. measure and improve system stability.

Data persistence is an important feature of EMQ X and is only supported in the enterprise version.

Persistence design

The persistence principle is to call the processing function (action) when the configured event hook is triggered. After the processing function obtains the corresponding data, it processes it according to the configured instructions to achieve Add, delete, modify and check data. The parameters available for the same event hook in different databases are the same, but the processing function (action) differs due to different database characteristics. The entire persistence working model and process are as follows:

One-to-one message storage

EMQ X Redis数据持久化怎么实现

##Publish side publishes a message;
  1. Backend records the message in the database;
  2. Subscribe subscribes to the topic;
  3. Backend gets it from the database Message of this topic;
  4. Send the message to the Subscriber;
  5. Backend removes the message from the database after the Subscriber confirms it;
  6. One-to-many message storage

EMQ X Redis数据持久化怎么实现

PUB side publishes a message;
  1. Backend records the message in the database;
  2. SUB1 and SUB2 subscribe to the topic;
  3. Backend gets the topic from the database Message;
  4. Send a message to SUB1 and SUB2;
  5. Backend records the position of the message read by SUB1 and SUB2, and gets the message from this position next time start.
  6. Redis Data Persistence
This article uses practical examples to illustrate how to store relevant information through Redis.

Redis is a completely open source, free, high-performance key-value database that complies with the BSD protocol.

Compared with other key-value cache products, Redis has the following characteristics:

Redis has extremely high performance, and a single machine supports a read and write speed of 100,000 levels.
  • Redis supports data persistence. It can save the data in the memory to the disk and load it again for use when restarting.
  • Redis not only supports simple key-value type data, but also provides storage of data structures such as list, set, zset, and hash.
  • Redis supports data backup, that is, data backup in master-slave mode.
  • Readers can refer to the official Redis Quick Start to install Redis (when writing this article, the Redis version is 5.0), and start the Redis server through the
  • redis-server
command .

Configuring EMQ X server

For EMQ Redis persistence function, most configurations do not need to be changed. The only thing that needs to be changed may be the address of the Redis server: if the Redis installed by the reader is not on the same server as EMQ X, please specify the correct address and port of the Redis server. As shown below:

## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379
backend.redis.pool1.server = 127.0.0.1:6379

Keep the remaining configuration files unchanged, and then start the plug-in:

emqx_ctl plugins load emqx_backend_redis
Client online status storageWhen the client goes online and offline, Update the online status, online and offline time, and node client list to the Redis database.

Although EMQ

Configuration items

Open the configuration file and configure Backend rules:

## 上线
backend.redis.hook.client.connected.1    =  { "action": { "function": "on_client_connected" }, "pool": "pool1"}

## 下线
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
Usage example

Open the browser

http://127.0.0.1 :18083

EMQ

Open the

redis-cli command line window and execute the command keys *. The results are as follows. Readers can see that two keys are stored in Redis:

127.0.0.1:6379> keys *
1) "mqtt:node:emqx@127.0.0.1"
2) "mqtt:client:sub_client"

连接列表

插件以 mqtt:node:{node_name} 格式的 key 记录节点下客户端列表及连接时间戳信息,等效操作:

## redis key 为 mqtt:node:{node_name}
HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836

字段说明:

## 节点下在线设备信息
127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
1) "sub_client1" # clientid
2) "1542272836" # 上线时间时间戳
3) "sub_client"
4) "1542272836"

连接详细信息

插件以 mqtt:client:{client_id} 格式的 key 记录客户端在线状态、上线时间,等效操作:

## redis key 为 mqtt:client:{client_id}
HMSET mqtt:client:sub_client state 1 online_at 1542272854

字段说明:

## 客户端在线状态
127.0.0.1:6379> HGETALL mqtt:client:sub_client
1) "state"
2) "0" # 0 离线 1 在线
3) "online_at"
4) "1542272854" # 上线时间戳
5) "offline_at"
6) "undefined" # 离线时间戳

客户端代理订阅

当客户端上线时,代理会加载订阅主题,而存储模块会直接从数据库中读取预设待订阅列表。应用程序可以通过数据层的设定或更改来控制代理订阅列表,以便在需要预定主题进行通信和接收消息的情况下与客户端进行通信。

配置项

打开配置文件,配置 Backend 规则:

## hook: client.connected
## action/function: on_subscribe_lookup
backend.redis.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

使用示例

sub_client 设备上线时,需要为其订阅 sub_client/upstreamsub_client/downlink 两个 QoS 1 的主题:

  1. 插件以 mqtt:sub:{client_id} 格式 key 在 Redis 中初始化代理订阅 Hash:

## redis key 为 mqtt:sub:{client_id}
## HSET key {topic} {qos}
127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1
(integer) 0

127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1
(integer) 0
  1. EMQ X 管理控制台 WebSocket 页面,以 clientid sub_client 新建一个客户端连接,切换至订阅页面,可见当前客户端自动订阅了 sub_client/upstreamsub_client/downlink 两个 QoS 1 的主题:

EMQ X Redis数据持久化怎么实现

  1. 切换回管理控制台 WebSocket 页面,向 sub_client/downlink 主题发布消息,可在消息订阅列表收到发布的消息。

持久化发布消息

配置项

打开配置文件,配置 Backend 规则,支持使用 topic 参数进行消息过滤,此处使用 # 通配符存储任意主题消息:

## hook: message.publish
## action/function: on_message_publish

backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

使用示例

在 EMQ X 管理控制台 WebSocket 页面中,使用 clientid sub_client 建立连接,向主题 upstream_topic 发布多条消息。针对每条消息, EMQ X 将持久化消息列表、消息详情两条记录。

消息列表

EMQ X 将消息列表以 message id 持久化至 mqtt:msg:{topic} Redis 集合中:

## 获取 upstream_topic 主题集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
2) "2VFszTClyjpVtLDLrn1u"
3) "2VFszozkwkYOcbEy8QN9"
4) "2VFszpEc7DfbEqC97I3g"
5) "2VFszpSzRviADmcOeuXd"
6) "2VFszpm3kvvLkJTcdmGU"
7) "2VFt0kuNrOktefX6m4nP"
127.0.0.1:6379>

消息详情

每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
 1) "id"
 2) "2VFt0kuNrOktefX6m4nP" ## message id
 3) "from"
 4) "sub_client" ## client id
 5) "qos"
 6) "2"
 7) "topic"
 8) "up/upstream_topic"
 9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"

获取离线消息

配置项

打开配置文件,配置 Backend 规则:

## hook: session.subscribed
## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub

## 一对一离线消息
backend.redis.hook.session.subscribed.1  = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}

## 一对多离线消息
backend.redis.hook.session.subscribed.2  = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}

使用示例

MQTT 离线消息需满足以下条件:

  1. 以 clean_session = false 连接

  2. 订阅 QoS > 0

  3. 发布 QoS > 0

在 EMQ X 管理控制台中以如下配置建立连接,

EMQ X Redis数据持久化怎么实现

持久化 Retain 消息

配置项

打开配置文件,配置 Backend 规则:

## hook: message.publish
## action/function: on_client_connected、on_message_retain

backend.redis.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

backend.redis.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

消息列表

EMQ X 将消息列表以 message id 持久化至 mqtt:retain:{topic} Redis Hash 中:

## 获取 upstream_topic 主题集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
127.0.0.1:6379>

消息详情

每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
 1) "id"
 2) "2VFt0kuNrOktefX6m4nP" ## message id
 3) "from"
 4) "sub_client" ## client id
 5) "qos"
 6) "2"
 7) "topic"
 8) "up/upstream_topic"
 9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"

The above is the detailed content of How to implement EMQ X Redis data persistence. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete