>  기사  >  데이터 베이스  >  EMQ X Redis 데이터 지속성을 구현하는 방법

EMQ X Redis 데이터 지속성을 구현하는 방법

WBOY
WBOY앞으로
2023-06-02 11:43:371804검색

EMQ, AWS DynamoDB 및 기타 다양한 데이터베이스를 도입하여 외부 서비스에 의한 빠른 쿼리를 수행하거나 서비스가 다운되거나 클라이언트가 비정상적으로 오프라인일 때 현재 실행 상태를 유지하고 연결 지속성이 복원될 때 이전 상태로 복원할 수도 있습니다. 클라이언트 에이전트 구독, 장치 클라이언트에 사용됩니다. 온라인일 때 지속성 모듈은 데이터베이스에서 미리 설정된 주제를 직접 로드하고 프록시 구독을 완료하여 시스템 설계의 복잡성과 클라이언트 구독 통신 오버헤드를 줄입니다.

사용자는 관련 주제를 구독하여 비슷한 기능을 수행할 수도 있지만, 엔터프라이즈 버전에 내장된 지속성 지원이 더 효율적이고 안정적이어서 개발자의 작업량을 크게 줄이고 시스템 안정성을 향상시킵니다.

데이터 지속성은 EMQ X의 중요한 기능이며 엔터프라이즈 버전에서만 지원됩니다.

지속성 설계

지속성의 원리는 구성된 이벤트 후크가 트리거될 때 처리 기능(액션)을 호출하는 것입니다. 처리 기능은 해당 데이터를 얻은 후 구성된 지침에 따라 처리하여 추가를 실현합니다. 데이터 삭제, 수정 및 쿼리. 서로 다른 데이터베이스에서 동일한 이벤트 후크에 사용할 수 있는 매개변수는 동일하지만 데이터베이스 특성이 다르기 때문에 처리 기능(작업)이 다릅니다. 전체 지속성 작업 모델 및 프로세스는 다음과 같습니다.

일대일 메시지 저장

EMQ X Redis数据持久化怎么实现

    게시 측에서 메시지를 게시합니다.
  1. 백엔드는 메시지를 데이터베이스에 기록합니다.
  2. 백엔드는 데이터베이스에서 주제의 메시지를 받습니다.
  3. 구독자가 확인한 후
  4. 백엔드는 메시지를 데이터베이스에서 제거합니다.
  5. 일대다 메시지 저장

PUB는 메시지를 게시합니다.

EMQ X Redis数据持久化怎么实现

백엔드는 메시지를 데이터베이스에 기록합니다.
  1. SUB2는 주제를 구독합니다.
  2. 백엔드는 데이터베이스에서 주제의 메시지를 가져옵니다.

    SUB1 및 SUB2에 메시지 보내기
  3. 백엔드는 SUB1 및 SUB2의 읽은 메시지 위치를 기록하고 다음에 메시지를 받을 때 이 위치에서 시작합니다.
  4. Redis 데이터 지속성
  5. 이 문서에서는 실제 예제를 사용하여 Redis를 통해 관련 정보를 저장하는 방법을 설명합니다.

    Redis는 BSD 프로토콜을 준수하는 완전 오픈 소스, 무료, 고성능 키-값 데이터베이스입니다.
  6. 다른 키-값 캐시 제품과 비교하여 Redis는 다음과 같은 특징을 가지고 있습니다.

Redis는 매우 높은 성능을 가지며 단일 머신은 100,000 레벨의 읽기 및 쓰기 속도를 지원합니다.

Redis는 메모리의 데이터를 디스크에 저장했다가 다시 로드하여 다시 시작할 수 있도록 지원합니다.

Redis는 단순한 키-값 유형의 데이터를 지원할 뿐만 아니라 list, set, zset, hash와 같은 데이터 구조의 저장도 제공합니다.
  • Redis는 데이터 백업, 즉 마스터-슬레이브 모드에서의 데이터 백업을 지원합니다.
  • 독자는 공식 Redis Quick Start를 참조하여 Redis를 설치하고(이 글을 작성할 당시 Redis 버전은 5.0임) redis-server 명령을 통해 Redis 서버를 시작할 수 있습니다.
  • EMQ X 서버 구성

    EMQ의 경우 일부 구성은 변경할 필요가 없습니다. 변경해야 할 유일한 것은 Redis 서버의 주소일 수 있습니다. 리더가 설치한 Redis가 EMQ X와 동일한 서버에 없는 경우 Redis 서버의 올바른 주소와 포트를 지정하십시오. 아래와 같이:
## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379
backend.redis.pool1.server = 127.0.0.1:6379
  • 나머지 구성 파일을 변경하지 않고 유지한 다음 플러그인을 시작합니다.

    emqx_ctl plugins load emqx_backend_redis
    클라이언트 온라인 상태 저장
  • 클라이언트가 온라인 및 오프라인이 되면 온라인 상태, 온라인 및 오프라인 시간 및 노드 클라이언트 목록을 Redis 데이터베이스에 추가합니다.

    redis-server 命令来启动 Redis 服务器。

    配置 EMQ X 服务器

    通过 RPM 方式安装的 EMQ X,Redis 相关的配置文件位于 /etc/emqx/plugins/emqx_backend_redis.conf,如果只是测试 Redis 持久化的功能,大部分配置不需要做更改。唯一需要更改的地方可能是 Redis 服务器的地址:如果读者安装的 Redis 不与 EMQ X 在同一服务器上,请指定正确的 Redis 服务器的地址与端口。如下所示:

    ## 上线
    backend.redis.hook.client.connected.1    =  { "action": { "function": "on_client_connected" }, "pool": "pool1"}
    
    ## 下线
    backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

    保持剩下部分的配置文件不变,然后启动该插件:

    127.0.0.1:6379> keys *
    1) "mqtt:node:emqx@127.0.0.1"
    2) "mqtt:client:sub_client"

    客户端在线状态存储

    客户端上下线时,更新在线状态、上下线时间、节点客户端列表至 Redis 数据库。

    尽管 EMQ X 本身提供了设备在线状态 API,但在需要频繁获取客户端在线状态、上下线时间的场景下,直接从数据库获取该记录比调用 EMQ X API 更高效。

    配置项

    打开配置文件,配置 Backend 规则:

    ## redis key 为 mqtt:node:{node_name}
    HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836

    使用示例

    浏览器打开 http://127.0.0.1:18083 EMQ X 管理控制台,在 工具 -> Websocket 中新建一个客户端连接,指定 clientid 为 sub_client:

    EMQ X Redis数据持久化怎么实现

    打开 redis-cli 命令行窗口,执行命令 keys *EMQ이긴 하지만

    🎜구성 항목🎜🎜구성 파일을 열고 백엔드 규칙을 구성합니다: 🎜
    ## 节点下在线设备信息
    127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
    1) "sub_client1" # clientid
    2) "1542272836" # 上线时间时间戳
    3) "sub_client"
    4) "1542272836"
    🎜사용 예🎜🎜http://127.0.0.1:18083 EMQ X 관리 콘솔에서 브라우저를 엽니다. 도구 -> Websocket 새 클라이언트 연결을 만들고 클라이언트 ID를 sub_client로 지정합니다.🎜🎜EMQ X Redis 데이터 지속성을 구현하는 방법🎜🎜redis-cli 명령줄 창을 열고 keys * 결과는 다음과 같습니다. 독자는 Redis에 두 개의 키가 저장되어 있음을 알 수 있습니다. 🎜<pre class="brush:php;toolbar:false">127.0.0.1:6379&gt; keys * 1) &quot;mqtt:node:emqx@127.0.0.1&quot; 2) &quot;mqtt:client:sub_client&quot;</pre> <h4>连接列表</h4> <p>插件以 <code>mqtt:node:{node_name} 格式的 key 记录节点下客户端列表及连接时间戳信息,等效操作:

    ## redis key 为 mqtt:node:{node_name}
    HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836

    字段说明:

    ## 节点下在线设备信息
    127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
    1) "sub_client1" # clientid
    2) "1542272836" # 上线时间时间戳
    3) "sub_client"
    4) "1542272836"

    连接详细信息

    插件以 mqtt:client:{client_id} 格式的 key 记录客户端在线状态、上线时间,等效操作:

    ## redis key 为 mqtt:client:{client_id}
    HMSET mqtt:client:sub_client state 1 online_at 1542272854

    字段说明:

    ## 客户端在线状态
    127.0.0.1:6379> HGETALL mqtt:client:sub_client
    1) "state"
    2) "0" # 0 离线 1 在线
    3) "online_at"
    4) "1542272854" # 上线时间戳
    5) "offline_at"
    6) "undefined" # 离线时间戳

    客户端代理订阅

    当客户端上线时,代理会加载订阅主题,而存储模块会直接从数据库中读取预设待订阅列表。应用程序可以通过数据层的设定或更改来控制代理订阅列表,以便在需要预定主题进行通信和接收消息的情况下与客户端进行通信。

    配置项

    打开配置文件,配置 Backend 规则:

    ## hook: client.connected
    ## action/function: on_subscribe_lookup
    backend.redis.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

    使用示例

    sub_client 设备上线时,需要为其订阅 sub_client/upstreamsub_client/downlink 两个 QoS 1 的主题:

    1. 插件以 mqtt:sub:{client_id} 格式 key 在 Redis 中初始化代理订阅 Hash:

    ## redis key 为 mqtt:sub:{client_id}
    ## HSET key {topic} {qos}
    127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1
    (integer) 0
    
    127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1
    (integer) 0
    1. EMQ X 管理控制台 WebSocket 页面,以 clientid sub_client 新建一个客户端连接,切换至订阅页面,可见当前客户端自动订阅了 sub_client/upstreamsub_client/downlink 两个 QoS 1 的主题:

    EMQ X Redis数据持久化怎么实现

    1. 切换回管理控制台 WebSocket 页面,向 sub_client/downlink 主题发布消息,可在消息订阅列表收到发布的消息。

    持久化发布消息

    配置项

    打开配置文件,配置 Backend 规则,支持使用 topic 参数进行消息过滤,此处使用 # 通配符存储任意主题消息:

    ## hook: message.publish
    ## action/function: on_message_publish
    
    backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

    使用示例

    在 EMQ X 管理控制台 WebSocket 页面中,使用 clientid sub_client 建立连接,向主题 upstream_topic 发布多条消息。针对每条消息, EMQ X 将持久化消息列表、消息详情两条记录。

    消息列表

    EMQ X 将消息列表以 message id 持久化至 mqtt:msg:{topic} Redis 集合中:

    ## 获取 upstream_topic 主题集合中所有 message id
    127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1
    1) "2VFsyhDm0cPIQvnY9osj"
    2) "2VFszTClyjpVtLDLrn1u"
    3) "2VFszozkwkYOcbEy8QN9"
    4) "2VFszpEc7DfbEqC97I3g"
    5) "2VFszpSzRviADmcOeuXd"
    6) "2VFszpm3kvvLkJTcdmGU"
    7) "2VFt0kuNrOktefX6m4nP"
    127.0.0.1:6379>

    消息详情

    每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

    ## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
    127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
     1) "id"
     2) "2VFt0kuNrOktefX6m4nP" ## message id
     3) "from"
     4) "sub_client" ## client id
     5) "qos"
     6) "2"
     7) "topic"
     8) "up/upstream_topic"
     9) "payload"
    10) "{ "cmd": "reboot" }"
    11) "ts"
    12) "1542338754" ## pub 时间戳
    13) "retain"
    14) "false"

    获取离线消息

    配置项

    打开配置文件,配置 Backend 规则:

    ## hook: session.subscribed
    ## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub
    
    ## 一对一离线消息
    backend.redis.hook.session.subscribed.1  = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
    
    ## 一对多离线消息
    backend.redis.hook.session.subscribed.2  = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}

    使用示例

    MQTT 离线消息需满足以下条件:

    1. 以 clean_session = false 连接

    2. 订阅 QoS > 0

    3. 发布 QoS > 0

    在 EMQ X 管理控制台中以如下配置建立连接,

    EMQ X Redis数据持久化怎么实现

    持久化 Retain 消息

    配置项

    打开配置文件,配置 Backend 规则:

    ## hook: message.publish
    ## action/function: on_client_connected、on_message_retain
    
    backend.redis.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
    
    backend.redis.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

    消息列表

    EMQ X 将消息列表以 message id 持久化至 mqtt:retain:{topic} Redis Hash 中:

    ## 获取 upstream_topic 主题集合中所有 message id
    127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1
    1) "2VFsyhDm0cPIQvnY9osj"
    127.0.0.1:6379>

    消息详情

    每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

    ## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
    127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
     1) "id"
     2) "2VFt0kuNrOktefX6m4nP" ## message id
     3) "from"
     4) "sub_client" ## client id
     5) "qos"
     6) "2"
     7) "topic"
     8) "up/upstream_topic"
     9) "payload"
    10) "{ "cmd": "reboot" }"
    11) "ts"
    12) "1542338754" ## pub 时间戳
    13) "retain"
    14) "false"

    위 내용은 EMQ X Redis 데이터 지속성을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    성명:
    이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제