>  기사  >  Java  >  Springboot의 RocketMQ에서 브로드캐스트 메시지를 구현하는 방법

Springboot의 RocketMQ에서 브로드캐스트 메시지를 구현하는 방법

PHPz
PHPz앞으로
2023-05-11 20:13:161119검색

RocketMQ 메시지 모드에는 브로드캐스트 모드와 클러스터 모드(로드 밸런싱 모드)의 두 가지가 있습니다.

브로드캐스트 모드는 모든 소비자가 메시지를 소비한다는 것을 의미합니다.

로드 밸런싱 모드는 각 소비가 특정 소비자에 의해 한 번만 소비된다는 것을 의미합니다.

우리는 일반적으로 비즈니스에서 로드 밸런싱 모드를 사용합니다. 물론 이메일, 휴대폰 또는 현장 프롬프트로 메시지를 보내는 등 일부 특수한 시나리오에서는 브로드캐스트 모드를 사용해야 합니다. 은 기본 클러스터 로드 밸런싱 모드입니다

브로드캐스트 메시지를 구현하기 위해 springboot+rockermq 통합을 도입하겠습니다@RocketMQMessageListenermessageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING

Springboot 프로젝트 생성 및 rockermq 종속성 추가
  • <!--rocketMq依赖-->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>

rocketmq 구성
  • # 포트
서버:
포트: 8083


# 구성 Rocketmq
rocketmq:

name-server: 127.0.0.1:9876

#Producer
producer:
# Producer 그룹 이름은 애플리케이션에서 고유해야 합니다
group: group1
# 기본 시간 제한 메시지 전송 시간은 3000ms
send -message-timeout: 3000
# 메시지가 4096바이트에 도달하면 메시지가 압축됩니다. 기본값은 4096
압축 메시지 본문 임계값: 4096
#최대 메시지 제한, 기본값은 128K
최대 메시지 크기: 4194304
# 실패한 동기화 메시지 전송에 대한 재시도 횟수
retry-times-when-send-failed : 3
#내부 전송 실패 시 다른 에이전트를 재시도할지 여부, 이 매개변수는 브로커가 여러 개 있는 경우에만 적용됩니다.
retry-next-server: true
#비동기 메시지 전송 실패 시 재시도 횟수 retry-times-when-send -async-failed: 3



프로덕션 측: 메시지를 보내기 위한 새 컨트롤러 생성
  • 프로덕션 측에서는 일반적인 전송 로직에 따라 메시지를 보낼 수 있습니다

    package com.example.springbootrocketdemo.controller;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    /**
     * 广播消息
     * @author qzz
     */
    @RestController
    public class RocketMQBroadCOntroller {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        /**
         * 发送广播消息
         */
        @RequestMapping("/testBroadSend")
        public void testSyncSend(){
            //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
            //参数二:消息内容
            for(int i=0;i<10;i++){
                rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i);
            }
        }
    }

두 개의 소비자를 생성하여 메시지 소비

  • 먼저 클러스터 로드 밸런싱 테스트를 수행하고 messageModel=MessageModel.CLUSTERING

  • Consumer 1:
package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者1,消费消息:"+s);
    }
}

Consumer 2: Consumer 1

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者2,消费消息:"+s);
    }
}

과 동일한 ConsumerGroup 및 주제에서 서비스를 시작하고 클러스터 모드를 테스트합니다. Consumption

  • 클러스터 모드 테스트: 두 Consumer가 메시지를 동등하게 공유

Springboot의 RocketMQ에서 브로드캐스트 메시지를 구현하는 방법위 두 Consumer의 messageModel 속성 값을 브로드캐스트 모드로 변경

  • Consumer 1:

    package com.example.springbootrocketdemo.config;
    import org.apache.rocketmq.spring.annotation.MessageModel;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    /**
     * 广播消息
     * 配置RocketMQ监听
     * MessageModel.CLUSTERING:集群模式
     * MessageModel.BROADCASTING:广播模式
     * @author qzz
     */
    @Service
    @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
    public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
        @Override
        public void onMessage(String s) {
            System.out.println("广播消息1 广播模式,消费消息:"+s);
        }
    }
  • Consumer 2: In the 소비자와 동일한 소비자 그룹 및 주제 1
package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("广播消息2 广播模式,消费消息:"+s);
    }
}

서비스를 다시 시작하고 방송 모드 소비를 테스트

위 내용은 Springboot의 RocketMQ에서 브로드캐스트 메시지를 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제