Heim >Datenbank >Redis >So implementieren Sie EMQ X Redis-Datenpersistenz

So implementieren Sie EMQ X Redis-Datenpersistenz

WBOY
WBOYnach vorne
2023-06-02 11:43:371859Durchsuche

Einführung in EMQ, AWS DynamoDB und andere verschiedene Datenbanken zur schnellen Abfrage durch externe Dienste oder zur Beibehaltung des aktuellen Betriebsstatus, wenn der Dienst ausgefallen ist oder der Client ungewöhnlich offline ist, und zur Wiederherstellung des vorherigen Status, wenn die Verbindung wiederhergestellt ist kann für Client-Agent-Abonnements und Geräte-Clients verwendet werden. Wenn das Persistenzmodul online ist, lädt es das voreingestellte Thema direkt aus der Datenbank und schließt das Proxy-Abonnement ab, wodurch die Komplexität des Systemdesigns und der Kommunikationsaufwand für Client-Abonnements verringert werden.

Benutzer können ähnliche Funktionen auch durch das Abonnieren verwandter Themen erreichen, aber die in der Unternehmensversion integrierte Persistenzunterstützung ist effizienter und zuverlässiger, was die Arbeitsbelastung des Entwicklers erheblich reduziert und die Systemstabilität verbessert.

Datenpersistenz ist eine wichtige Funktion von EMQ X und wird nur in der Enterprise-Version unterstützt.

Persistenzdesign

Das Prinzip der Persistenz besteht darin, die Verarbeitungsfunktion (Aktion) aufzurufen, wenn der konfigurierte Ereignis-Hook ausgelöst wird. Nachdem die Verarbeitungsfunktion die entsprechenden Daten erhalten hat, verarbeitet sie diese gemäß den konfigurierten Anweisungen, um das Hinzufügen zu realisieren. Löschen, Ändern und Abfragen von Daten. Die für denselben Event-Hook in verschiedenen Datenbanken verfügbaren Parameter sind dieselben, die Verarbeitungsfunktion (Aktion) unterscheidet sich jedoch aufgrund unterschiedlicher Datenbankeigenschaften. Das gesamte Persistenz-Arbeitsmodell und der Prozess sind wie folgt:

Einzelne Nachrichtenspeicherung

EMQ X Redis数据持久化怎么实现

    Veröffentlichungsseite veröffentlicht eine Nachricht;
  1. Backend zeichnet die Nachricht in der Datenbank auf;
  2. Abonnieren Seite abonniert das Thema;
  3. Backend ruft die Nachricht des Themas aus der Datenbank ab;
  4. Backend entfernt die Nachricht aus der Datenbank,
  5. One-to-many-Nachrichtenspeicher

PUB veröffentlicht eine Nachricht;

EMQ X Redis数据持久化怎么实现

Backend zeichnet die Nachricht in der Datenbank auf;
  1. SUB1 und SUB2 abonnieren das Thema;
  2. Das Backend ruft die Nachricht des Themas aus der Datenbank ab.
  3. Senden Sie Nachrichten an SUB1 und SUB2.
  4. Das Backend zeichnet die gelesenen Nachrichtenpositionen von SUB1 und SUB2 auf, und das nächste Mal, wenn Sie Nachrichten erhalten, beginnt es an dieser Position.
  5. Redis-Datenpersistenz
  6. Dieser Artikel veranschaulicht anhand praktischer Beispiele, wie relevante Informationen über Redis gespeichert werden.

    Redis ist eine vollständig quelloffene, kostenlose, leistungsstarke Schlüsselwertdatenbank, die dem BSD-Protokoll entspricht.
Im Vergleich zu anderen Schlüsselwert-Cache-Produkten weist Redis die folgenden Merkmale auf:

Redis verfügt über eine extrem hohe Leistung und eine einzelne Maschine unterstützt eine Lese- und Schreibgeschwindigkeit von 100.000 Ebenen.

Redis unterstützt die Datenpersistenz. Es kann die Daten im Speicher auf der Festplatte speichern und zur Verwendung beim Neustart erneut laden.
  • Redis unterstützt nicht nur einfache Daten vom Typ Schlüsselwert, sondern bietet auch die Speicherung von Datenstrukturen wie Liste, Menge, Zset und Hash.
  • Redis unterstützt die Datensicherung, also die Datensicherung im Master-Slave-Modus.
  • Leser können auf den offiziellen Redis-Schnellstart verweisen, um Redis zu installieren (beim Schreiben dieses Artikels ist die Redis-Version 5.0) und den Redis-Server über den Befehl redis-server zu starten.
  • Konfigurieren Sie den EMQ X-Server

    Für EMQ. Einige Konfigurationen müssen nicht geändert werden. Das einzige, was geändert werden muss, ist möglicherweise die Adresse des Redis-Servers: Wenn sich das vom Leser installierte Redis nicht auf demselben Server wie EMQ X befindet, geben Sie bitte die korrekte Adresse und den richtigen Port des Redis-Servers an. Wie unten gezeigt:
## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379
backend.redis.pool1.server = 127.0.0.1:6379
Lassen Sie die verbleibenden Konfigurationsdateien unverändert und starten Sie dann das Plug-in:

emqx_ctl plugins load emqx_backend_redis
redis-server 命令来启动 Redis 服务器。

配置 EMQ X 服务器

通过 RPM 方式安装的 EMQ X,Redis 相关的配置文件位于 /etc/emqx/plugins/emqx_backend_redis.conf,如果只是测试 Redis 持久化的功能,大部分配置不需要做更改。唯一需要更改的地方可能是 Redis 服务器的地址:如果读者安装的 Redis 不与 EMQ X 在同一服务器上,请指定正确的 Redis 服务器的地址与端口。如下所示:

## 上线
backend.redis.hook.client.connected.1    =  { "action": { "function": "on_client_connected" }, "pool": "pool1"}

## 下线
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

保持剩下部分的配置文件不变,然后启动该插件:

127.0.0.1:6379> keys *
1) "mqtt:node:emqx@127.0.0.1"
2) "mqtt:client:sub_client"

客户端在线状态存储

客户端上下线时,更新在线状态、上下线时间、节点客户端列表至 Redis 数据库。

尽管 EMQ X 本身提供了设备在线状态 API,但在需要频繁获取客户端在线状态、上下线时间的场景下,直接从数据库获取该记录比调用 EMQ X API 更高效。

配置项

打开配置文件,配置 Backend 规则:

## redis key 为 mqtt:node:{node_name}
HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836

使用示例

浏览器打开 http://127.0.0.1:18083 EMQ X 管理控制台,在 工具 -> Websocket 中新建一个客户端连接,指定 clientid 为 sub_client:

EMQ X Redis数据持久化怎么实现

打开 redis-cli 命令行窗口,执行命令 keys *Client-Online-Statusspeicher

🎜Wenn der Client online und offline geht, aktualisieren Sie den Online-Status, die Online- und Offline-Zeit und Knoten-Client-Liste in die Redis-Datenbank. 🎜🎜Obwohl EMQ 🎜🎜Konfigurationselement🎜🎜Öffnen Sie die Konfigurationsdatei und konfigurieren Sie Backend-Regeln: 🎜
## 节点下在线设备信息
127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
1) "sub_client1" # clientid
2) "1542272836" # 上线时间时间戳
3) "sub_client"
4) "1542272836"
🎜Anwendungsbeispiel🎜🎜Öffnen Sie den Browser http://127.0.0.1:18083 EMQ X-Verwaltungskonsole in Tools -> Websocket Erstellen Sie eine neue Client-Verbindung und geben Sie die Client-ID als sub_client an:🎜🎜So implementieren Sie EMQ /code >, das Ergebnis ist wie folgt, der Leser kann sehen, dass in Redis zwei Schlüssel gespeichert sind: 🎜<pre class="brush:php;toolbar:false">127.0.0.1:6379> keys *
1) "mqtt:node:emqx@127.0.0.1"
2) "mqtt:client:sub_client"</pre><h4>连接列表</h4><p>插件以 <code>mqtt:node:{node_name}</code> 格式的 key 记录节点下客户端列表及连接时间戳信息,等效操作:</p><pre class="brush:php;toolbar:false">## redis key 为 mqtt:node:{node_name}
HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836</pre><p>字段说明:</p><pre class="brush:php;toolbar:false">## 节点下在线设备信息
127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
1) "sub_client1" # clientid
2) "1542272836" # 上线时间时间戳
3) "sub_client"
4) "1542272836"</pre><h4>连接详细信息</h4><p>插件以 <code>mqtt:client:{client_id}</code> 格式的 key 记录客户端在线状态、上线时间,等效操作:</p><pre class="brush:php;toolbar:false">## redis key 为 mqtt:client:{client_id}
HMSET mqtt:client:sub_client state 1 online_at 1542272854</pre><p>字段说明:</p><pre class="brush:php;toolbar:false">## 客户端在线状态
127.0.0.1:6379> HGETALL mqtt:client:sub_client
1) "state"
2) "0" # 0 离线 1 在线
3) "online_at"
4) "1542272854" # 上线时间戳
5) "offline_at"
6) "undefined" # 离线时间戳</pre><h3>客户端代理订阅</h3><p>当客户端上线时,代理会加载订阅主题,而存储模块会直接从数据库中读取预设待订阅列表。应用程序可以通过数据层的设定或更改来控制代理订阅列表,以便在需要预定主题进行通信和接收消息的情况下与客户端进行通信。</p><h4>配置项</h4><p>打开配置文件,配置 Backend 规则:</p><pre class="brush:php;toolbar:false">## hook: client.connected
## action/function: on_subscribe_lookup
backend.redis.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}</pre><h4>使用示例</h4><p>当 <code>sub_client</code> 设备上线时,需要为其订阅 <code>sub_client/upstream</code> 与 <code>sub_client/downlink</code> 两个 QoS 1 的主题:</p><ol class=
  • 插件以 mqtt:sub:{client_id} 格式 key 在 Redis 中初始化代理订阅 Hash:

  • ## redis key 为 mqtt:sub:{client_id}
    ## HSET key {topic} {qos}
    127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1
    (integer) 0
    
    127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1
    (integer) 0
    1. EMQ X 管理控制台 WebSocket 页面,以 clientid sub_client 新建一个客户端连接,切换至订阅页面,可见当前客户端自动订阅了 sub_client/upstreamsub_client/downlink 两个 QoS 1 的主题:

    EMQ X Redis数据持久化怎么实现

    1. 切换回管理控制台 WebSocket 页面,向 sub_client/downlink 主题发布消息,可在消息订阅列表收到发布的消息。

    持久化发布消息

    配置项

    打开配置文件,配置 Backend 规则,支持使用 topic 参数进行消息过滤,此处使用 # 通配符存储任意主题消息:

    ## hook: message.publish
    ## action/function: on_message_publish
    
    backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

    使用示例

    在 EMQ X 管理控制台 WebSocket 页面中,使用 clientid sub_client 建立连接,向主题 upstream_topic 发布多条消息。针对每条消息, EMQ X 将持久化消息列表、消息详情两条记录。

    消息列表

    EMQ X 将消息列表以 message id 持久化至 mqtt:msg:{topic} Redis 集合中:

    ## 获取 upstream_topic 主题集合中所有 message id
    127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1
    1) "2VFsyhDm0cPIQvnY9osj"
    2) "2VFszTClyjpVtLDLrn1u"
    3) "2VFszozkwkYOcbEy8QN9"
    4) "2VFszpEc7DfbEqC97I3g"
    5) "2VFszpSzRviADmcOeuXd"
    6) "2VFszpm3kvvLkJTcdmGU"
    7) "2VFt0kuNrOktefX6m4nP"
    127.0.0.1:6379>

    消息详情

    每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

    ## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
    127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
     1) "id"
     2) "2VFt0kuNrOktefX6m4nP" ## message id
     3) "from"
     4) "sub_client" ## client id
     5) "qos"
     6) "2"
     7) "topic"
     8) "up/upstream_topic"
     9) "payload"
    10) "{ "cmd": "reboot" }"
    11) "ts"
    12) "1542338754" ## pub 时间戳
    13) "retain"
    14) "false"

    获取离线消息

    配置项

    打开配置文件,配置 Backend 规则:

    ## hook: session.subscribed
    ## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub
    
    ## 一对一离线消息
    backend.redis.hook.session.subscribed.1  = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
    
    ## 一对多离线消息
    backend.redis.hook.session.subscribed.2  = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}

    使用示例

    MQTT 离线消息需满足以下条件:

    1. 以 clean_session = false 连接

    2. 订阅 QoS > 0

    3. 发布 QoS > 0

    在 EMQ X 管理控制台中以如下配置建立连接,

    EMQ X Redis数据持久化怎么实现

    持久化 Retain 消息

    配置项

    打开配置文件,配置 Backend 规则:

    ## hook: message.publish
    ## action/function: on_client_connected、on_message_retain
    
    backend.redis.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
    
    backend.redis.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

    消息列表

    EMQ X 将消息列表以 message id 持久化至 mqtt:retain:{topic} Redis Hash 中:

    ## 获取 upstream_topic 主题集合中所有 message id
    127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1
    1) "2VFsyhDm0cPIQvnY9osj"
    127.0.0.1:6379>

    消息详情

    每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

    ## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
    127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
     1) "id"
     2) "2VFt0kuNrOktefX6m4nP" ## message id
     3) "from"
     4) "sub_client" ## client id
     5) "qos"
     6) "2"
     7) "topic"
     8) "up/upstream_topic"
     9) "payload"
    10) "{ "cmd": "reboot" }"
    11) "ts"
    12) "1542338754" ## pub 时间戳
    13) "retain"
    14) "false"

    Das obige ist der detaillierte Inhalt vonSo implementieren Sie EMQ X Redis-Datenpersistenz. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Stellungnahme:
    Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen