>  기사  >  Java  >  SpringBoot+RabbitMQ를 사용하여 안정적인 메시지 전송을 달성하는 방법

SpringBoot+RabbitMQ를 사용하여 안정적인 메시지 전송을 달성하는 방법

王林
王林앞으로
2023-05-29 22:34:491883검색

    환경 구성

    SpringBootRabbitMQ를 통합하여 메시지 전송을 구현합니다. SpringBoot 整合 RabbitMQ 实现消息的发送。

    1.添加 maven 依赖

           <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    2.添加 application.yml 配置文件

    spring:
      rabbitmq:
        host: 192.168.3.19
        port: 5672
        username: admin
        password: xxxx

    3.配置交换机、队列以及绑定

        @Bean
        public DirectExchange myExchange() {
            DirectExchange directExchange = new DirectExchange("myExchange");
            return directExchange;
        }
    
        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue");
            return queue;
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
        }

    4.生产发送消息

        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/send")
        public String send(String message) {
            rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);
            System.out.println("【发送消息】" + message)
            return "【send message】" + message;
        }

    5.消费者接收消息

        @RabbitListener(queuesToDeclare = @Queue("myQueue"))
        public void process(String msg, Channel channel, Message message) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date();
            String time = sdf.format(date);
            System.out.println("【接收信息】" + msg + " 当前时间" + time);

    6.调用生产端发送消息 hello,控制台输出:

    【发送消息】hello
    【接收信息】hello 当前时间2022-05-12 10:21:14

    说明消息已经被成功接收。

    消息丢失分析

    SpringBoot+RabbitMQ를 사용하여 안정적인 메시지 전송을 달성하는 방법

    一条消息的从生产到消费,消息丢失可能发生在以下几个阶段:

    • 生产端丢失: 生产者无法传输到 RabbitMQ

    • 存储端丢失: RabbitMQ 存储自身挂了

    • 消费端丢失:存储由于网络问题,无法发送到消费端,或者消费挂了,无法发送正常消费

    RabbitMQ 从生产端、储存端、消费端都对可靠性传输做很好的支持。

    生产阶段

    生产阶段通过请求确认机制,来确保消息的可靠传输。当发送消息到 RabbitMQ 服务器 之后,RabbitMQ 收到消息之后,给发送返回一个请求确认,表示RabbitMQ 服务器已成功的接收到了消息。

    配置application.yml

    spring:
      rabbitmq:
        # 消息确认机制 生产者 -> 交换机
        publisher-confirms: true
        # 消息返回机制  交换机 -> 队列
        publisher-returns: true

    配置

    @Configuration
    @Slf4j
    public class RabbitConfig {
    
        @Autowired
        private ConnectionFactory connectionFactory;
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    log.info("【correlationData】:" + correlationData);
                    log.info("【ack】" + ack);
                    log.info("【cause】" + cause);
                    if (ack) {
                        log.info("【发送成功】");
                    } else {
                        log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause);
                    }
                }
            });
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.warn("【消息发送失败】");
                    log.info("【message】" + message);
                    log.info("【replyCode】" + replyCode);
                }
            });
    
            return rabbitTemplate;
        }
    }

    消息从 生产者 到 交换机, 有confirmCallback 确认模式。发送消息成功后消息会调用方法confirm(CorrelationData correlationData, boolean ack, String cause),根据 ack 判断消息是否发送成功。

    消息从 交换机 到 队列,有returnCallback 退回模式。

    发送消息 product message 控制台输出如下:

    【发送消息】product message
    【接收信息】product message 当前时间2022-05-12 11:27:56
    【correlationData】:null
    【ack】true
    【cause】null
    【发送成功】

    生产端模拟消息丢失

    这里有两个方案:

    • 发送消息后立马关闭 broke,后者把网络关闭,但是broker关闭之后控制台一直就会报错,发送消息也报500错误。

    • 发送不存在的交换机:

    // myExchange 修改成 myExchangexxxxx
    rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);

    结果:

    【correlationData】:null
    【ack】false
    【cause】channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
    【发送失败】

    当发送失败可以对消息进行重试

    交换机正确,发送不存在的队列:

    交换机接收到消息,返回成功通知,控制台输出:

    【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
    【ack】true
    【cause】null
    【发送成功】

    交换机没有找到队列,返回失败信息:

    【消息发送失败】
    【message】product message
    【replyCode】312

    RabbitMQ

    开启队列持久化,创建的队列和交换机默认配置是持久化的。首先把队列和交换机设置正确,修改消费监听的队列,使得消息存放在队列里

    修改队列的持久化,修改成非持久化:

        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue",false);
            return queue;
        }

    发送消息之后,消息存放在队列中,然后重启 RabbitMQ,消息不存在了。
    设置队列持久化:

        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue",true);
            return queue;
        }

    重启之后,队列的消息还存在。

    消费端

    消费端默认开始 ack

    1. maven 종속성 추가

    spring:
      rabbitmq:
        # 手动消息确认
        listener:
          simple:
            acknowledge-mode: manual

    2. application.yml 구성 파일 추가

    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    3. 스위치, 대기열 및 바인딩 구성

        @RabbitListener(queuesToDeclare = @Queue("myQueue"))
        public void process(String msg, Channel channel, Message message) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date();
            String time = sdf.format(date);
            System.out.println("【接收信息】" + msg + " 当前时间" + time);
            System.out.println(message.getMessageProperties().getDeliveryTag());
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }

    4. 소비자는

    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    6라는 메시지를 받습니다. hello 메시지를 보내기 위해 프로덕션 종료를 호출합니다. 콘솔 출력은

    [Send message] hello
    [Receive message] hello입니다. 현재 시간 2022- 05-12 10:21:14

    SpringBoot+RabbitMQ를 사용하여 안정적인 메시지 전송을 달성하는 방법 메시지가 성공적으로 수신되었음을 나타냅니다.

    메시지 손실 분석

    🎜SpringBoot+RabbitMQ를 사용하여 안정적인 메시지 전송을 달성하는 방법 🎜🎜메시지 생성부터 소비까지 다음 단계에서 메시지 손실이 발생할 수 있습니다. 🎜
    • 🎜생산 종료 손실: 생산자는 RabbitMQ🎜
    • 🎜저장소 측 손실: RabbitMQ 저장소 자체가 다운되었습니다🎜
    • 🎜소비자 측 손실: 소비자 측에 네트워크 문제로 스토리지를 보낼 수 없거나 소비가 중단되어 정상적인 소비가 전송되지 않습니다🎜
    • 🎜🎜RabbitMQ는 생산 측에서 안정적인 전송을 잘 지원합니다. 저장 끝, 소비자 끝. 🎜🎜제작 단계🎜🎜제작 단계에서는 안정적인 메시지 전송을 보장하기 위해 요청 확인 메커니즘을 사용합니다. RabbitMQ 서버에 메시지를 보낸 후 RabbitMQ는 메시지를 수신하고 요청 확인을 보낸 사람에게 반환하여 RabbitMQ 서버가 메시지를 성공적으로 수신했음을 나타냅니다. 🎜🎜application.yml🎜rrreee🎜구성 🎜rrreee🎜 confirmCallback 확인 모드를 사용하여 Producer에서 switch로의 메시지를 구성합니다. 메시지가 성공적으로 전송된 후 메시지는 confirm(CorrelationData CorrelationData, boolean ack, String cause) 메서드를 호출하고 ack를 기반으로 메시지가 성공적으로 전송되었는지 확인합니다. . 🎜🎜스위치에서 로의 메시지에는 returnCallback 반환 모드가 있습니다. 🎜🎜메시지 보내기 제품 메시지 콘솔 출력은 다음과 같습니다. 🎜
      🎜【메시지 보내기】제품 메시지
      【메시지 받기】제품 메시지 현재 시간 2022-05-12 11: 27: 56
      [correlationData]:null
      [ack]true
      [cause]null
      [성공적으로 전송됨]🎜

      생산 종료 시뮬레이션 메시지 손실됨🎜여기에는 두 가지 해결 방법이 있습니다. 🎜
      • 🎜메시지를 보낸 후 즉시 브로커를 닫습니다. 후자는 네트워크를 종료하지만 브로커가 닫힌 후에는 콘솔은 항상 오류를 보고합니다. 보내기 메시지도 500 오류를 보고했습니다. 🎜
      • 🎜존재하지 않는 스위치 보내기: 🎜
      • 🎜rrreee🎜결과: 🎜
        🎜[correlationData]:null
        [ack]false
        [cause] 채널 오류; 프로토콜 방법: #method(reply-code=404, reply-text=NOT_FOUND - 가상 호스트 '/', class-id=60, method-id=40에서 'myExchangexxxxx' 교환 없음)[전송 실패]🎜
        🎜전송 실패 시 메시지를 다시 시도할 수 있습니다🎜🎜스위치가 정확합니다. 존재하지 않는 대기열로 전송합니다.🎜🎜스위치가 메시지를 수신하고 성공 알림을 반환합니다. 및 콘솔 출력:🎜🎜【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
        [ack]true
        [cause]null
        [ 성공적으로 전송됨]🎜 🎜스위치가 대기열을 찾지 못했고 실패 정보를 반환했습니다. 🎜
        🎜[메시지 전송 실패]
        [message]제품 메시지
        [replyCode]312🎜🎜RabbitMQ🎜🎜큐 지속성을 활성화하면 생성된 큐와 스위치가기본 구성이 지속됩니다. 먼저 큐와 스위치를 올바르게 설정하고 메시지가 큐에 저장되도록 소비 모니터링용 큐를 수정합니다. 🎜🎜큐의 지속성을 비지속성으로 수정: 🎜rrreee🎜메시지를 보낸 후 메시지는 큐에 저장된 다음 RabbitMQ를 다시 시작하고 메시지는 더 이상 존재하지 않습니다.
        대기열 지속성 설정: 🎜rrreee🎜다시 시작한 후에도 대기열의 메시지는 계속 존재합니다. 🎜🎜소비자 측🎜🎜소비자 측은 기본적으로 ack 자동 확인 모드를 시작합니다. 대기열 메시지가 소비자에게 수신되면 대기열의 메시지가 있는지 여부에 관계없이 자동으로 삭제됩니다. 소비자 측의 메시지. 따라서 소비자가 메시지를 성공적으로 소비할 수 있도록 하려면 자동 모드를 수동 확인 모드로 변경하세요. 🎜🎜application.yml 파일을 수정하세요🎜rrreee🎜메시지를 소비하고 받은 후 수동 확인이 필요합니다. 🎜rrreeerrreee🎜If 추가되지 않음: 🎜rrreee🎜두 개의 메시지 보내기 🎜🎜메시지가 수신된 후 확인이 없으며 다시 대기열에 넣습니다. 🎜🎜🎜🎜🎜프로젝트를 다시 시작하면 대기열 메시지가 다음으로 전송됩니다. 소비자에게 전달되지만 확인 확인이 없으면 계속해서 대기열에 다시 들어가게 됩니다. 🎜

        channel.basicAck를 추가한 후 프로젝트를 다시 시작하세요channel.basicAck 之后,再重启项目

        SpringBoot+RabbitMQ를 사용하여 안정적인 메시지 전송을 달성하는 방법

        队列消息就被删除了

        basicAck 方法最后一个参数 multiple 表示是删除之前的队列。

        multiple 设置成 true

        SpringBoot+RabbitMQ를 사용하여 안정적인 메시지 전송을 달성하는 방법

        SpringBoot+RabbitMQ를 사용하여 안정적인 메시지 전송을 달성하는 방법큐 메시지가 삭제됩니다

        🎜basicAck multiple 메서드의 마지막 매개변수는 삭제됨을 의미합니다. 삭제 전 큐. 🎜🎜multipletrue로 설정되고 모든 후속 대기열이 지워집니다 🎜🎜🎜🎜

    위 내용은 SpringBoot+RabbitMQ를 사용하여 안정적인 메시지 전송을 달성하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    성명:
    이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제