首頁  >  文章  >  Java  >  springboot + rabbitmq如何用訊息確認

springboot + rabbitmq如何用訊息確認

coldplay.xixi
coldplay.xixi轉載
2020-07-01 17:35:342381瀏覽

springboot + rabbitmq如何用訊息確認

最近部門號召大夥多組織一些技術分享會,說是要活躍公司的技術氛圍,但早就看穿一切的我知道,這T M 就是為了刷KPI。不過,話說回來這的確是件好事,與其開那些沒味的扯皮會,多做技術交流還是很有助於個人成長的。

於是乎我主動報名參加了分享,咳咳咳~ ,真的不是為了那點KPI,就是想和大夥一起學習學習!

springboot + rabbitmq如何用訊息確認

相關學習推薦:Java影片教學

#這次我分享的是springboot rabbitmq 如何實現訊息確認機制,以及在實際開發中的一點踩坑經驗,其實整體的內容比較簡單,有時候事情就是這麼神奇,越是簡單的東西就越容易出錯。

可以看到使用了 RabbitMQ 以後,我們的業務連結明顯變長了,雖然做到了系統間的解耦,但可能造成訊息遺失的場景也增加了。例如:

  • 訊息生產者- > rabbitmq伺服器(訊息傳送失敗)

  • rabbitmq伺服器本身故障導致訊息遺失

  • #訊息消費者- > rabbitmq服務(消費訊息失敗)

springboot + rabbitmq如何用訊息確認
#所以說能不使用中間件就盡量不要用,如果為了用而用只會徒增煩惱。開啟訊息確認機制以後,儘管很大程度上保證了訊息的準確送達,但由於頻繁的確認交互,rabbitmq 整體效率變低,吞吐量下降嚴重,不是非常重要的訊息真心不建議你用訊息確認機制。


下邊我們先來實作springboot rabbitmq訊息確認機制,再對遇到的問題做具體分析。

一、準備環境

1、引入rabbitmq 依賴套件

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、修改application.properties 設定

設定中需要開啟發送端消費端 的訊息確認。

spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true####################################################
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true

3、定義Exchange 和Queue

定義交換器confirmTestExchange 和佇列confirm_test_queue ,並將佇列綁定在交換機上。

@Configurationpublic class QueueConfig {

    @Bean(name = "confirmTestQueue")
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue", true, false, false);
    }

    @Bean(name = "confirmTestExchange")
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirmTestExchange");
    }

    @Bean    public Binding confirmTestFanoutExchangeAndQueue(
            @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
            @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
    }}

rabbitmq 的訊息確認分為兩部分:傳送訊息確認 和 訊息接收確認。

springboot + rabbitmq如何用訊息確認

二、訊息發送確認

#發送訊息確認:用來確認生產者producer 將訊息傳送到brokerbroker 上的交換器exchange 再投遞給佇列queue的過程中,訊息是否成功投遞。

訊息從 producerrabbitmq broker#有一個 confirmCallback 確認模式。

訊息從 exchangequeue 投遞失敗有一個 returnCallback 退回模式。

我們可以利用這兩個Callback來確保消的100%送達。

1、 ConfirmCallback確認模式

訊息只要被 rabbitmq broker 接收到就會觸發 confirmCallback 回呼 。

@Slf4j
@Componentpublic class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (!ack) {
            log.error("消息发送异常!");
        } else {
            log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
    }}

實作介面ConfirmCallback ,重寫其confirm()方法,方法內有三個參數correlationDataackcause

  • correlationData:物件內部只有一個 id 屬性,用來表示目前訊息的唯一性。
  • ack:訊息投遞到broker 的狀態,true表示成功。
  • cause:表示投遞失敗的原因。

但訊息被 broker 接收到只能表示已經到達 MQ伺服器,並不能保證訊息一定會被投遞到目標 queue 裡。所以接下來需要用到 returnCallback

2、 ReturnCallback 退回模式

如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

@Slf4j
@Componentpublic class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }}

实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

下边是具体的消息发送,在rabbitTemplate中设置 ConfirmReturn 回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建一个 CorrelationData对象,添加一个id10000000000

@Autowired    private RabbitTemplate rabbitTemplate;

    @Autowired    private ConfirmCallbackService confirmCallbackService;

    @Autowired    private ReturnCallbackService returnCallbackService;

    public void sendMessage(String exchange, String routingKey, Object msg) {

        /**
         * 确保消息发送失败后可以重新返回到队列中
         * 注意:yml需要配置 publisher-returns: true
         */
        rabbitTemplate.setMandatory(true);

        /**
         * 消费者确认收到消息后,手动ack回执回调处理
         */
        rabbitTemplate.setConfirmCallback(confirmCallbackService);

        /**
         * 消息投递到队列失败回调处理
         */
        rabbitTemplate.setReturnCallback(returnCallbackService);

        /**
         * 发送消息
         */
        rabbitTemplate.convertAndSend(exchange, routingKey, msg,
                message -> {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                },
                new CorrelationData(UUID.randomUUID().toString()));
    }

三、消息接收确认

消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(ack)的过程。使用@RabbitHandler注解标注的方法要增加 channel(信道)、message 两个参数。

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")public class ReceiverMessage1 {

    @RabbitHandler    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("小富收到消息:{}", msg);

            //TODO 具体业务

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }  catch (Exception e) {

            if (message.getMessageProperties().getRedelivered()) {

                log.error("消息已重复处理失败,拒绝再次接收...");

                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {

                log.error("消息即将再次返回队列处理...");

                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); 
            }
        }
    }}

消费消息有三种回执方法,我们来分析一下每种方法的含义。

1、basicAck

basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

void basicAck(long deliveryTag, boolean multiple)

deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行acknackreject等操作。

multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

2、basicNack

basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投递序号。

multiple:是否批量确认。

requeue:值为 true 消息将重新入队列。

3、basicReject

basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投递序号。

requeue:值为 true 消息将重新入队列。

四、测试

发送消息测试一下消息确认机制是否生效,从执行结果上看发送者发消息后成功回调,消费端成功的消费了消息。
springboot + rabbitmq如何用訊息確認
用抓包工具Wireshark 观察一下rabbitmq amqp协议交互的变化,也多了 ack 的过程。
springboot + rabbitmq如何用訊息確認

五、踩坑日志

1、不消息确认

这是一个非常没技术含量的坑,但却是非常容易犯错的地方。

开启消息确认机制,消费消息别忘了channel.basicAck,否则消息会一直存在,导致重复消费。
springboot + rabbitmq如何用訊息確認

2、消息无限投递

在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0 发生异常后将消息重新投入队列。

@RabbitHandler    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("消费者 2 号收到:{}", msg);

            int a = 1 / 0;

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {

            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

但是有个问题是,业务代码一旦出现 bug 99.9%的情况是不会自动修复,一条消息会被无限投递进队列,消费端无限执行,导致了死循环。

springboot + rabbitmq如何用訊息確認

本地的CPU被瞬间打满了,大家可以想象一下当时在生产环境导致服务死机,我是有多慌。

springboot + rabbitmq如何用訊息確認
而且rabbitmq management 只有一条未被确认的消息。

springboot + rabbitmq如何用訊息確認

经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。

消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行。导致消息队列处理出现阻塞,导致正常消息也无法运行。

而我们当时的解决方案是,先将消息进行应答,此时消息队列会删除该条消息,同时我们再次发送该消息到消息队列,异常消息就放在了消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 重新发送消息到队尾channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    JSON.toJSONBytes(msg));

但这种方法并没有解决根本问题,错误消息还是会时不时报错,后面优化设置了消息重试次数,达到了重试上限以后,手动确认,队列删除此消息,并将消息持久化入MySQL并推送报警,进行人工处理和定时任务做补偿。

3、重复消费

如何保证 MQ 的消费是幂等性,这个需要根据具体业务而定,可以借助MySQL、或者redis 将消息持久化,通过再消息中的唯一性属性校验。

以上是springboot + rabbitmq如何用訊息確認的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:learnku.com。如有侵權,請聯絡admin@php.cn刪除