Maison >base de données >Redis >Comment implémenter la persistance des données EMQ X Redis

Comment implémenter la persistance des données EMQ X Redis

WBOY
WBOYavant
2023-06-02 11:43:371859parcourir

Introduction à EMQ, AWS DynamoDB et d'autres bases de données diverses pour une requête rapide par des services externes ou conserver l'état d'exécution actuel lorsque le service est en panne ou que le client est anormalement hors ligne, et restaurer l'état précédent lorsque la connexion est restaurée. être utilisé pour les abonnements d'agent client, les clients d'appareil. Lorsqu'il est en ligne, le module de persistance charge directement le sujet prédéfini à partir de la base de données et termine l'abonnement proxy, réduisant ainsi la complexité de la conception du système et la surcharge de communication des abonnements clients.

Les utilisateurs peuvent également obtenir des fonctions similaires en s'abonnant à des sujets connexes, mais le support de persistance intégré à la version entreprise est plus efficace et fiable, réduisant considérablement la charge de travail du développeur et améliorant la stabilité du système.

La persistance des données est une fonctionnalité importante d'EMQ X et n'est prise en charge que dans la version entreprise.

Conception de persistance

Le principe de la persistance est d'appeler la fonction de traitement (action) lorsque le hook d'événement configuré est déclenché. Une fois que la fonction de traitement a obtenu les données correspondantes, elle les traite selon les instructions configurées pour réaliser l'ajout, suppression, modification et interrogation des données. Les paramètres disponibles pour le même hook d'événement dans différentes bases de données sont les mêmes, mais la fonction de traitement (action) diffère en raison des différentes caractéristiques de la base de données. L'ensemble du modèle de travail et du processus de persistance est le suivant :

Stockage des messages individuels

EMQ X Redis数据持久化怎么实现

    Le côté publication publie un message
  1. Le backend enregistre le message dans la base de données ; le côté s'abonne au sujet ;
  2. Backend récupère le message du sujet de la base de données ;
  3. envoie le message à l'abonné ;
  4. Backend supprime le message de la base de données après que l'abonné l'a confirmé ;
  5. Stockage des messages un à plusieurs

PUB publie un message ;

EMQ X Redis数据持久化怎么实现

Backend enregistre le message dans la base de données
  1. SUB1 et SUB2 s'abonnent au sujet ;
  2. Le backend récupère le message du sujet de la base de données ;

  3. Envoyer des messages à SUB1 et SUB2 ;

  4. Backend enregistre les positions des messages lus de SUB1 et SUB2, et la prochaine fois que vous recevrez des messages, il commencera à partir de cette position.

  5. Persistance des données Redis

    Cet article utilise des exemples pratiques pour illustrer comment stocker des informations pertinentes via Redis.
  6. Redis est une base de données clé-valeur entièrement open source, gratuite et hautes performances, conforme au protocole BSD.

    Par rapport à d'autres produits de cache clé-valeur, Redis présente les caractéristiques suivantes :

Redis a des performances extrêmement élevées et une seule machine prend en charge une vitesse de lecture et d'écriture de 100 000 niveaux.

Redis prend en charge la persistance des données. Il peut enregistrer les données en mémoire sur le disque et les charger à nouveau pour les utiliser lors du redémarrage.

  • Redis prend non seulement en charge les données simples de type clé-valeur, mais fournit également le stockage de structures de données telles que la liste, l'ensemble, le zset et le hachage.

  • Redis prend en charge la sauvegarde des données, c'est-à-dire la sauvegarde des données en mode maître-esclave.

  • Les lecteurs peuvent se référer au Quick Start officiel de Redis pour installer Redis (lors de la rédaction de cet article, la version de Redis est 5.0) et démarrer le serveur Redis via la commande redis-server.

    Configurer le serveur EMQ X
  • Pour EMQ Certaines configurations n'ont pas besoin d'être modifiées. La seule chose qui doit être modifiée peut être l'adresse du serveur Redis : si le Redis installé par le lecteur n'est pas sur le même serveur qu'EMQ X, veuillez spécifier l'adresse et le port corrects du serveur Redis. Comme indiqué ci-dessous :

    ## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379
    backend.redis.pool1.server = 127.0.0.1:6379
    Conservez les fichiers de configuration restants inchangés, puis démarrez le plug-in :
emqx_ctl plugins load emqx_backend_redis
Stockage de l'état en ligne du client

redis-server 命令来启动 Redis 服务器。

配置 EMQ X 服务器

通过 RPM 方式安装的 EMQ X,Redis 相关的配置文件位于 /etc/emqx/plugins/emqx_backend_redis.conf,如果只是测试 Redis 持久化的功能,大部分配置不需要做更改。唯一需要更改的地方可能是 Redis 服务器的地址:如果读者安装的 Redis 不与 EMQ X 在同一服务器上,请指定正确的 Redis 服务器的地址与端口。如下所示:

## 上线
backend.redis.hook.client.connected.1    =  { "action": { "function": "on_client_connected" }, "pool": "pool1"}

## 下线
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

保持剩下部分的配置文件不变,然后启动该插件:

127.0.0.1:6379> keys *
1) "mqtt:node:emqx@127.0.0.1"
2) "mqtt:client:sub_client"

客户端在线状态存储

客户端上下线时,更新在线状态、上下线时间、节点客户端列表至 Redis 数据库。

尽管 EMQ X 本身提供了设备在线状态 API,但在需要频繁获取客户端在线状态、上下线时间的场景下,直接从数据库获取该记录比调用 EMQ X API 更高效。

配置项

打开配置文件,配置 Backend 规则:

## redis key 为 mqtt:node:{node_name}
HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836

使用示例

浏览器打开 http://127.0.0.1:18083 EMQ X 管理控制台,在 工具 -> Websocket 中新建一个客户端连接,指定 clientid 为 sub_client:

EMQ X Redis数据持久化怎么实现

打开 redis-cli 命令行窗口,执行命令 keys *Lorsque le client est en ligne et hors ligne, mettez à jour l'état en ligne, l'heure en ligne et hors ligne, et liste de clients de nœuds à la base de données Redis.

🎜Bien qu'EMQ 🎜🎜Élément de configuration🎜🎜Ouvrez le fichier de configuration et configurez les règles du Backend : 🎜
## 节点下在线设备信息
127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
1) "sub_client1" # clientid
2) "1542272836" # 上线时间时间戳
3) "sub_client"
4) "1542272836"
🎜Exemple d'utilisation🎜🎜Ouvrez le navigateur http://127.0.0.1:18083 Console de gestion EMQ X, dans Outils -> Websocket Créez une nouvelle connexion client, spécifiez l'ID client comme sub_client :🎜🎜Comment implémenter la persistance des données EMQ X Redis🎜🎜Ouvrez la fenêtre de ligne de commande redis-cli et exécutez la commande keys *, le résultat est le suivant, les lecteurs peuvent voir que deux clés sont stockées dans Redis : 🎜<pre class="brush:php;toolbar:false">127.0.0.1:6379&gt; keys * 1) &quot;mqtt:node:emqx@127.0.0.1&quot; 2) &quot;mqtt:client:sub_client&quot;</pre> <h4>连接列表</h4> <p>插件以 <code>mqtt:node:{node_name} 格式的 key 记录节点下客户端列表及连接时间戳信息,等效操作:

## redis key 为 mqtt:node:{node_name}
HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836

字段说明:

## 节点下在线设备信息
127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
1) "sub_client1" # clientid
2) "1542272836" # 上线时间时间戳
3) "sub_client"
4) "1542272836"

连接详细信息

插件以 mqtt:client:{client_id} 格式的 key 记录客户端在线状态、上线时间,等效操作:

## redis key 为 mqtt:client:{client_id}
HMSET mqtt:client:sub_client state 1 online_at 1542272854

字段说明:

## 客户端在线状态
127.0.0.1:6379> HGETALL mqtt:client:sub_client
1) "state"
2) "0" # 0 离线 1 在线
3) "online_at"
4) "1542272854" # 上线时间戳
5) "offline_at"
6) "undefined" # 离线时间戳

客户端代理订阅

当客户端上线时,代理会加载订阅主题,而存储模块会直接从数据库中读取预设待订阅列表。应用程序可以通过数据层的设定或更改来控制代理订阅列表,以便在需要预定主题进行通信和接收消息的情况下与客户端进行通信。

配置项

打开配置文件,配置 Backend 规则:

## hook: client.connected
## action/function: on_subscribe_lookup
backend.redis.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

使用示例

sub_client 设备上线时,需要为其订阅 sub_client/upstreamsub_client/downlink 两个 QoS 1 的主题:

  1. 插件以 mqtt:sub:{client_id} 格式 key 在 Redis 中初始化代理订阅 Hash:

## redis key 为 mqtt:sub:{client_id}
## HSET key {topic} {qos}
127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1
(integer) 0

127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1
(integer) 0
  1. EMQ X 管理控制台 WebSocket 页面,以 clientid sub_client 新建一个客户端连接,切换至订阅页面,可见当前客户端自动订阅了 sub_client/upstreamsub_client/downlink 两个 QoS 1 的主题:

EMQ X Redis数据持久化怎么实现

  1. 切换回管理控制台 WebSocket 页面,向 sub_client/downlink 主题发布消息,可在消息订阅列表收到发布的消息。

持久化发布消息

配置项

打开配置文件,配置 Backend 规则,支持使用 topic 参数进行消息过滤,此处使用 # 通配符存储任意主题消息:

## hook: message.publish
## action/function: on_message_publish

backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

使用示例

在 EMQ X 管理控制台 WebSocket 页面中,使用 clientid sub_client 建立连接,向主题 upstream_topic 发布多条消息。针对每条消息, EMQ X 将持久化消息列表、消息详情两条记录。

消息列表

EMQ X 将消息列表以 message id 持久化至 mqtt:msg:{topic} Redis 集合中:

## 获取 upstream_topic 主题集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
2) "2VFszTClyjpVtLDLrn1u"
3) "2VFszozkwkYOcbEy8QN9"
4) "2VFszpEc7DfbEqC97I3g"
5) "2VFszpSzRviADmcOeuXd"
6) "2VFszpm3kvvLkJTcdmGU"
7) "2VFt0kuNrOktefX6m4nP"
127.0.0.1:6379>

消息详情

每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
 1) "id"
 2) "2VFt0kuNrOktefX6m4nP" ## message id
 3) "from"
 4) "sub_client" ## client id
 5) "qos"
 6) "2"
 7) "topic"
 8) "up/upstream_topic"
 9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"

获取离线消息

配置项

打开配置文件,配置 Backend 规则:

## hook: session.subscribed
## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub

## 一对一离线消息
backend.redis.hook.session.subscribed.1  = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}

## 一对多离线消息
backend.redis.hook.session.subscribed.2  = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}

使用示例

MQTT 离线消息需满足以下条件:

  1. 以 clean_session = false 连接

  2. 订阅 QoS > 0

  3. 发布 QoS > 0

在 EMQ X 管理控制台中以如下配置建立连接,

EMQ X Redis数据持久化怎么实现

持久化 Retain 消息

配置项

打开配置文件,配置 Backend 规则:

## hook: message.publish
## action/function: on_client_connected、on_message_retain

backend.redis.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

backend.redis.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

消息列表

EMQ X 将消息列表以 message id 持久化至 mqtt:retain:{topic} Redis Hash 中:

## 获取 upstream_topic 主题集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
127.0.0.1:6379>

消息详情

每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
 1) "id"
 2) "2VFt0kuNrOktefX6m4nP" ## message id
 3) "from"
 4) "sub_client" ## client id
 5) "qos"
 6) "2"
 7) "topic"
 8) "up/upstream_topic"
 9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer