>Java >java지도 시간 >RocketMQ는 Springboot에서 메시지 전송 및 수신을 어떻게 구현합니까?

RocketMQ는 Springboot에서 메시지 전송 및 수신을 어떻게 구현합니까?

WBOY
WBOY앞으로
2023-05-18 17:19:061792검색

springboot+rockermq는 간단한 메시지 송수신을 실현합니다

일반 메시지를 보내는 방법에는 단방향 전송, 동기 전송, 비동기 전송의 세 가지가 있습니다.

다음은 일반 메시지의 송수신을 실현하기 위한 springboot+rockermq의 통합을 소개합니다

  • Springboot 프로젝트 생성 및 rockermq 종속성 추가

<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>
  • rocketmq 구성

# 포트
server :
포트: 8083

# 로켓mq 구성
rocketmq:
name-server: 127.0.0.1:9876
#Producer
producer:
#Producer 그룹 이름은 애플리케이션에서 고유해야 합니다
그룹: group1
#Timeout for 메시지 보내기 기본 시간은 3000ms
send-message-timeout: 3000
# 메시지가 4096바이트에 도달하면 메시지가 압축됩니다. 기본값은 4096
압축 메시지 본문 임계값: 4096
#최대 메시지 제한, 기본값은 128K
최대 메시지 크기: 4194304
# 실패한 동기화 메시지 전송에 대한 재시도 횟수
retry-times-when-send-failed : 3
#내부 전송 실패 시 다른 에이전트를 재시도할지 여부, 이 매개변수는 브로커가 여러 개 있는 경우에만 적용됩니다.
retry-next-server: true
#비동기 메시지 전송 실패 시 재시도 횟수 retry-times-when-send -async-failed: 3

  • 메시지를 보낼 새 컨트롤러 만들기:

  • package com.example.springbootrocketdemo.controller;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    /**
     * 普通信息的三种方式:同步、异步、单向
     * @author qzz
     */
    @RestController
    public class RocketMQCOntroller {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        /**
         * 发送普通消息
         * convertAndSend(String destination, Object payload) 发送字符串比较方便
         */
        @RequestMapping("/send")
        public void send(){
            rocketMQTemplate.convertAndSend("test-topic","test-message");
        }
        /**
         * 发送同步消息
         */
        @RequestMapping("/testSyncSend")
        public void testSyncSend(){
            //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
            //参数二:消息内容
            SendResult sendResult = rocketMQTemplate.syncSend("test-topic","同步消息测试");
            System.out.println(sendResult);
        }
        /**
         * 发送异步消息
         */
        @RequestMapping("/testASyncSend")
        public void testASyncSend(){
            //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
            //参数二:消息内容
            //参数三:回调
            rocketMQTemplate.asyncSend("test-topic", "异步消息测试", new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("消息发送异常");
                    throwable.printStackTrace();
                }
            });
        }
        /**
         * 发送单向消息
         */
        @RequestMapping("/testOneWay")
        public void testOneWay(){
            //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
            //参数二:消息内容
            rocketMQTemplate.sendOneWay("test-topic","单向消息测试");
        }
    }
SpringBoot는 다양한 형식으로 메시지를 보내는 데 사용할 수 있는 RocketMQTemplate 템플릿 클래스를 제공합니다.

보내는 방법은 주제 테스트-주제를 지정합니다.

  • RocketMQConsumerListener를 수신하고, 메시지를 수신하고, 메시지를 소비하는 새 메시지 소비자를 생성합니다.

  • package com.example.springbootrocketdemo.config;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    /**
     * 消费消息
     * 配置RocketMQ监听
     * @author qzz
     */
    @Service
    @RocketMQMessageListener(consumerGroup = "test",topic = "test-topic")
    public class RocketMQConsumerListener implements RocketMQListener<String> {
        @Override
        public void onMessage(String s) {
            System.out.println("消费消息:"+s);
        }
    }
소비자 클래스는 RocketMQListener 인터페이스를 구현하고 메시지 유형 문자열을 동적으로 지정해야 합니다.

RocketMQListener接口,以及动态指定消息类型String。

类上要加上@RocketMQMessageListener注解클래스에 @RocketMQMessageListener 주석을 추가하고 토픽 토픽 test-topic을 지정하고 소비자 그룹 테스트

간단한 메시지 송수신이 완료됩니다!

  • 서비스 시작 및 메시지 소비 테스트

RocketMQ는 Springboot에서 메시지 전송 및 수신을 어떻게 구현합니까?

RocketMQ는 Springboot에서 메시지 전송 및 수신을 어떻게 구현합니까?

동기 메시지 테스트:

RocketMQ는 Springboot에서 메시지 전송 및 수신을 어떻게 구현합니까?

비동기 메시지 테스트:

RocketMQ는 Springboot에서 메시지 전송 및 수신을 어떻게 구현합니까?

단방향 메시지 테스트:

RocketMQ는 Springboot에서 메시지 전송 및 수신을 어떻게 구현합니까?

위 내용은 RocketMQ는 Springboot에서 메시지 전송 및 수신을 어떻게 구현합니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제