首頁 >Java >java教程 >SpringbootRocketMQ怎麼實現廣播訊息

SpringbootRocketMQ怎麼實現廣播訊息

PHPz
PHPz轉載
2023-05-11 20:13:161187瀏覽

RocketMQ訊息模式主要有兩種:廣播模式、叢集模式(負載平衡模式)

廣播模式是每個消費者,都會消費訊息;

負載平衡模式是每一個消費只會被某一個消費者消費一次;

我們業務上一般用的是負載平衡模式,當然一些特殊場景需要用到廣播模式,比如發送一個訊息到郵箱,手機,站內提示;

我們可以透過@RocketMQMessageListenermessageModel屬性值來設置,MessageModel.BROADCASTING是廣播模式,MessageModel.CLUSTERINGING 是預設叢集負載平衡模式

下面來介紹下springboot rockermq 整合實作廣播訊息

  • 建立Springboot項目,加入rockermq 依賴

<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>
  • 設定rocketmq

## 埠
server:
  port: 8083

#設定rocketmq
rocketmq:
  name-server: 127.0.0.1:9876
  #生產者
  producer:
    #生產者群組名,規定在一個應用程式裡面必須唯一
 
    #生產者群組名稱,規定在一個應用程式裡面必須唯一
    group : group1
    #訊息傳送的逾時時間預設3000ms
    send-message-timeout: 3000
    #訊息達到4096位元組的時候,訊息就會被壓縮。預設4096
    compress-message-body-threshold: 4096
    #最大的訊息限制,預設為128K
#    max-message-size: 4194304
    max-message-size: 4194304
    重複發送訊息次數#同步傳送訊息# retry-times-when-send-failed: 3
    #在內部傳送失敗時是否重試其他代理,這個參數在有多個broker時才生效
    retry-next-server: true

    #非同步訊息發送失敗重試的次數
    retry-times-when-send-async-failed: 3
  • 生產端:新一個controller 來做訊息

    生產端以正常發送邏輯發送訊息即可
  • package com.example.springbootrocketdemo.controller;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    /**
     * 广播消息
     * @author qzz
     */
    @RestController
    public class RocketMQBroadCOntroller {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        /**
         * 发送广播消息
         */
        @RequestMapping("/testBroadSend")
        public void testSyncSend(){
            //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
            //参数二:消息内容
            for(int i=0;i<10;i++){
                rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i);
            }
        }
    }

  • #建立兩個消費者來消費訊息

#我們先叢集負載平衡測試,加上messageModel=MessageModel.CLUSTERING

消費者1:

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者1,消费消息:"+s);
    }
}
    消費者2: 與消費者1在同一個consumerGroup 和topic
  • package com.example.springbootrocketdemo.config;
    import org.apache.rocketmq.spring.annotation.MessageModel;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    /**
     * 广播消息
     * 配置RocketMQ监听
     * MessageModel.CLUSTERING:集群模式
     * MessageModel.BROADCASTING:广播模式
     * @author qzz
     */
    @Service
    @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
    public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
        @Override
        public void onMessage(String s) {
            System.out.println("集群模式 消费者2,消费消息:"+s);
        }
    }

  • 啟動服務,測試群集模式消費

#叢集模式測試: 兩個消費者平攤訊息SpringbootRocketMQ怎麼實現廣播訊息

  • 把上面兩個消費者的messageModel 屬性值修改成廣播模式

消費者1:

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("广播消息1 广播模式,消费消息:"+s);
    }
}
    消費者2: 與消費者1在同一個consumerGroup 和topic
  • package com.example.springbootrocketdemo.config;
    import org.apache.rocketmq.spring.annotation.MessageModel;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    /**
     * 广播消息
     * 配置RocketMQ监听
     * MessageModel.CLUSTERING:集群模式
     * MessageModel.BROADCASTING:广播模式
     * @author qzz
     */
    @Service
    @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
    public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
        @Override
        public void onMessage(String s) {
            System.out.println("广播消息2 广播模式,消费消息:"+s);
        }
    }

  • #重啟服務,測試廣播模式消費

SpringbootRocketMQ怎麼實現廣播訊息

###################################################################################### ##

以上是SpringbootRocketMQ怎麼實現廣播訊息的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除