>  기사  >  Java  >  Java의 RabbitMQ 고급 응용 프로그램 방법

Java의 RabbitMQ 고급 응용 프로그램 방법

王林
王林앞으로
2023-04-30 10:40:06712검색

1. 안정적인 메시지 전달

RabbitMQ을 사용할 때, Producer가 메시지 전달 시 해당 스위치와 큐에 메시지가 성공적으로 전달되었는지 알고 싶다면 이 방법은 두 가지가 있습니다. 메시지 전달의 신뢰성 모드를 제어하는 ​​데 사용할 수 있습니다. RabbitMQ 的时候,生产者在进行消息投递的时候如果想知道消息是否成功的投递到对应的交换机和队列中,有两种方式可以用来控制消息投递的可靠性模式 。

Java의 RabbitMQ 고급 응용 프로그램 방법

 由上图的整个消息的投递过程来看,生产者的消息进入到中间件中会首先到达交换机,然后再从交换机传递到队列中去,也就是分为两步走战略。那么消息的丢失情况也就是会出现在这两个阶段中,RabbitMQ 贴心的为我们提供了针对于这两个部分的可靠新传递模式:

  • confirm 模式

  • return 模式

 利用这两个回调模式来确保消息的传递可靠。

 1.1、确认模式

 消息从生产者到交换机之间传递会返回一个 confirmCallback 的回调。可以直接在 rabbitTemplate 实例中进行确认逻辑的设置。如果是使用 XML 配置的话需要在工厂配置开启 publisher-confirms="true"YAML 的配置就直接 publisher-confirm-type: correlated,他默认是 NONE ,需要手动开启。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void producer() throws InterruptedException {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println();
                if (!b) {
                    //	消息重发之类的处理
                    System.out.println(s);
                } else {
                    System.out.println("交换机成功接收消息");
                }
            }
        });
        rabbitTemplate.convertAndSend("default_exchange", "default_queue",
                "hello world & beordie");
        TimeUnit.SECONDS.sleep(5);
    }
}

 上面的确认是由一个 confirm 的函数执行的,里面携带了三个参数,第一个是配置的相关信息,第二个表示交换机是否成功的接收到消息,第三个参数是指没有成功接收消息的原因。

 1.2、退回模式

 从交换机到消息队列投递失败会返回一个 returnCallback 。在工厂配置中开启回退模式 publisher-returns="true" ,设置交换机处理消息失败的模式(默认 false 直接将消息进行丢弃),添加退回处理的逻辑。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void producer() throws InterruptedException {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //  重发逻辑处理
                System.out.println(message.getBody() + " 投递消息队列失败");
            }
        });
        rabbitTemplate.convertAndSend("default_exchange", "default_queue",
                "hello world & beordie");
        TimeUnit.SECONDS.sleep(5);
    }
}

returnedMessage 中携带五个参数、分别指的是消息对象、错误码、错误信息、交换机、路由键。

 1.3、确认机制

 在消费者抓取消息队列中的数据取消费之后会有一个确认机制进行消息的确认,防止因为抓取消息之后但没有消费成功而导致的消息丢失。有三种确认方式:

  • 自动确认acknowledge="none"

  • 手动确认acknowledge="manual"

  • 根据异常情况确认acknowledge="auto"

 其中自动确认是指一旦消息被消费者抓取就自动默认成功,并将消息从消息队列中进行移除,如果这个时候消费端消费出现问题,那么也会是默认消息消费成功,但是实际上是没有消费成功的,也就是当前的消息丢失了。默认的情况就是自动确认机制。

 如果设置手动确认的方式,就需要在正常消费消息之后进行回调确认 channel.basicAck(),手动签收。如果业务处理过程中发生了异常则调用 channel.basicNack() 重新发送消息。

 首先需要在队列绑定时进行确认机制的配置,设置为手动签收。

<!-- 绑定队列 -->
<rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual">
    <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/>
</rabbit:listener-container>

 生产者一端不用更改,只需要改变消费者的实现进行消息自动签收就可以了,正常执行业务则签收消息,业务发生错误则选择消息拒签,消息重发或者丢弃。

public class ConsumerAck implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //  消息唯一ID
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            String msg = new String(message.getBody(), "utf-8");
            channel.basicAck(tag, true);
            System.out.println("接收消息: " + msg);
        } catch (Exception e) {
            System.out.println("接收消息异常");
            channel.basicNack(tag, true, true);
            e.printStackTrace();
        }
    }
}

 里面涉及三个简单的签收函数,一是正确签收的 basicAck ,二是单条拒签的 basicReject ,三是批量拒签的 basicNack

Java의 RabbitMQ 고급 응용 방법
  •  사진에서 위의 전체 메시지 전달 과정을 보면, 미들웨어에 진입한 생산자의 메시지는 먼저 스위치에 도착한 후 스위치에서 큐로 전달되는데, 이는 2단계 전략이다. 그런 다음 이 두 단계에서 메시지 손실이 발생합니다. RabbitMQ는 이 두 부분에 대해 신뢰할 수 있는 새로운 전달 모드인

  • 확인 모드
  • 를 신중하게 제공합니다.

  • 복귀 모드
  • .

 신뢰할 수 있는 메시지 전달을 보장하려면 이 두 가지 콜백 모드를 사용하세요.

1.1. 확인 모드

🎜  메시지가 생산자에서 스위치로 전달되면 confirmCallback 콜백이 반환됩니다. 확인 로직은 rabbitTemplate 인스턴스에서 직접 설정할 수 있습니다. XML을 사용하여 구성하는 경우 공장 구성에서 🎜publisher-confirms="true"🎜를 활성화해야 합니다. YAML 구성은 직접 🎜publisher-confirm-입니다. 유형: 상관🎜. 기본값은 NONE이며 수동으로 켜야 합니다. 🎜
<!-- 绑定队列 -->
<rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true"
                           acknowledge="manual" prefetch="1">
    <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/>
</rabbit:listener-container>
🎜 위 확인은 세 가지 매개변수를 전달하는 confirm 함수에 의해 실행됩니다. 첫 번째는 구성 관련 정보이고, 두 번째는 스위치가 메시지를 성공적으로 수신했는지 여부를 나타냅니다. , 세 번째 매개변수는 메시지가 성공적으로 수신되지 않은 이유를 나타냅니다. 🎜

1.2. 반환 모드

🎜 스위치에서 메시지 대기열로의 전달이 실패하면 returnCallback이 반환됩니다. 공장 구성에서 폴백 모드 🎜publisher-returns="true"🎜를 켜고 스위치가 메시지 처리에 실패하도록 모드를 설정하고(기본값은 false이며 메시지를 직접 삭제) 폴백 처리 로직을 추가합니다. 🎜
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
    </rabbit:queue-arguments>
</rabbit:queue>
🎜 returnedMessage는 각각 메시지 개체, 오류 코드, 오류 메시지, 스위치 및 라우팅 키를 참조하는 5개의 매개변수를 전달합니다. 🎜

1.3. 확인 메커니즘

🎜 소비자가 메시지 대기열의 데이터를 가져오고 소비를 취소한 후에는 메시지를 가져온 후 소비 실패를 방지하기 위해 메시지를 확인하는 확인 메커니즘이 있습니다. . 메시지가 손실됩니다. 세 가지 확인 방법이 있습니다. 🎜🎜🎜🎜🎜자동 확인🎜: acknowledge="none"🎜🎜🎜🎜🎜수동 확인🎜: acknowledge="manual"🎜🎜 🎜 🎜🎜비정상적인 조건에 따라 확인🎜: acknowledge="auto"🎜🎜🎜  자동 확인은 메시지가 소비자에 의해 캡처되면 자동으로 기본값이 됨을 의미합니다. 이때 소비자 소비에 문제가 있으면 기본 메시지 소비는 성공하지만 실제로는 소비에 성공하지 않습니다. 즉, 현재 메시지 분실되었습니다. 기본값은 자동 확인 메커니즘입니다. 🎜🎜 수동 확인 방법을 설정한 경우 메시지를 정상적으로 소비한 후 콜백 확인 channel.basicAck()을 수행하고 수신을 위해 수동으로 서명해야 합니다. 업무 처리 중 예외가 발생하면 channel.basicNack()를 호출하여 메시지를 다시 보내세요. 🎜🎜 먼저 대기열을 바인딩할 때 확인 메커니즘을 구성하고 수동 서명으로 설정해야 합니다. 🎜
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws
        AmqpException {
        //	设置 message 的过期时间
        message.getMessageProperties().setExpiration("5000");
        //	返回该消息
        return message;
    }
};
rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);
🎜  생성자 측을 변경할 필요는 없습니다. 메시지에 자동으로 서명하려면 소비자의 구현만 변경하면 됩니다. 비즈니스 오류가 발생하면 메시지가 서명됩니다. 메시지가 거부되고 메시지가 다시 전송되거나 삭제됩니다. 🎜
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">
    <rabbit:queue-arguments>
        <!-- 死信交换机 -->
        <entry key="x-dead-letter-exchange" value-type="dlx_exchane"/>
        <!-- 路由 -->
        <entry key="x-dead-letter-routing-key" value-type="dlx_routing"/>
        <!-- 队列过期时间 -->
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        <!-- 队列长度 -->
        <entry key="x-max-length" value-type="java.lang.Integer" value="10"/>
    </rabbit:queue-arguments>
</rabbit:queue>
🎜 세 가지 간단한 서명 기능이 포함됩니다. 하나는 올바른 서명을 위한 basicAck, 두 번째는 단일 거부를 위한 basicReject, 세 번째는 일괄 처리를 위한 입니다. 거부 basicNack. 🎜🎜🎜🎜🎜basicAck🎜 첫 번째 매개변수는 현재 채널에만 적용되는 채널 내 메시지의 고유 ID를 나타내며, 두 번째 매개변수는 일괄 동의 여부를 나타냅니다. ID는 서명에 동의합니다. 메시지 대기열에서 삭제되며, true인 경우 이 ID 이전의 메시지가 함께 서명됩니다. 🎜🎜🎜🎜🎜basicReject🎜 첫 번째 매개변수는 여전히 메시지의 고유 ID를 나타내고, 두 번째 매개변수는 전송을 위해 다시 대기열할지 여부를 나타냅니다. false는 메시지가 직접 삭제되거나 메시지를 수신하기 위한 배달 못한 편지 대기열이 있음을 나타냅니다. 메시지를 다시 보내려면 모든 작업이 현재 메시지에 대해서만 수행되어야 함을 나타냅니다. 🎜🎜🎜🎜🎜basicNack🎜에는 두 번째 매개변수보다 매개변수가 하나 더 있는데, 중간에 배치 여부를 나타내는 부울 값이 있습니다. 🎜

2、消费端限流

 在用户请求和DB服务处理之间增加消息中间件的隔离,使得突发流量全部让消息队列来抗,降低服务端被冲垮的可能性。让所有的请求都往队列中存,消费端只需要匀速的取出消息进行消费,这样就能保证运行效率,也不会因为后台的阻塞而导致客户端得不到正常的响应(当然指的是一些不需要同步回显的任务)。

Java의 RabbitMQ 고급 응용 프로그램 방법

 只需要在消费者绑定消息队列时指定取出消息的速率即可,需要使用手动签收的方式,每进行一次的签收才会从队列中再取出下一条数据。

<!-- 绑定队列 -->
<rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true"
                           acknowledge="manual" prefetch="1">
    <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/>
</rabbit:listener-container>

3、消息过期时间

 消息队列提供了存储在队列中消息的过期时间,分为两个方向的实现,一个是针对于整个队列中的所有消息,也就是队列的过期时间,另一个是针对当前消息的过期时间,也就是针对于单条消息单独设置。

 队列的过期时间设置很简单,只需要在创建队列时进行过期时间的指定即可,也可以通过控制台直接创建指定过期时间。一旦队列过期时间到了,队列中还未被消费的消息都将过期,进行队列的过期处理。

<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
    </rabbit:queue-arguments>
</rabbit:queue>

 单条消息的过期时间需要在发送的时候进行单独的指定,发送的时候指定配置的额外信息,配置的编写由配置类完成。

 如果一条消息的过期时间到了,但是他此时处于队列的中间,那么他将不会被处理,只有当之后处理到时候才会进行判断是否过期。

MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws
        AmqpException {
        //	设置 message 的过期时间
        message.getMessageProperties().setExpiration("5000");
        //	返回该消息
        return message;
    }
};
rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);

 如果说同时设置了消息的过期时间和队列的过期时间,那么最终的过期时间由最短的时间进行决定,也就是说如果当前消息的过期时间没到,但是整个队列的过期时间到了,那么队列中的所有消息也自然就过期了,执行过期的处理策略。

4、死信队列

 4.1、死信概念

死信队列指的是死信交换机,当一条消息成为死信之后可以重新发送到另一个交换机进行处理,而进行处理的这个交换机就叫做死信交换机。

Java의 RabbitMQ 고급 응용 프로그램 방법

  • 消息成为死信消息有几种情况

    队列的消息长度达到限制

    消费者拒接消息的时候不把消息重新放入队列中

    队列存在消息过期设置,消息超时未被消费

    消息存在过期时间,在投递给消费者时发现过期

 在创建队列时可以在配置中指定相关的信息,例如死信交换机、队列长度等等,之后的一系列工作就不由程序员进行操作了,MQ 会自己完成配置过的事件响应。

<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">
    <rabbit:queue-arguments>
        <!-- 死信交换机 -->
        <entry key="x-dead-letter-exchange" value-type="dlx_exchane"/>
        <!-- 路由 -->
        <entry key="x-dead-letter-routing-key" value-type="dlx_routing"/>
        <!-- 队列过期时间 -->
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        <!-- 队列长度 -->
        <entry key="x-max-length" value-type="java.lang.Integer" value="10"/>
    </rabbit:queue-arguments>
</rabbit:queue>

 4.2、延迟队列

 延迟队列指的是消息在进入队列后不会立即被消费,只有到达指定时间之后才会被消费,也就是需要有一个时间的判断条件。

 消息队列实际上是没有提供对延迟队列的实现的,但是可以通过 TTL + 死信队列 的方式完成,设置一个队列,不被任何的消费者所消费,所有的消息进入都会被保存在里面,设置队列的过期时间,一旦队列过期将所有的消息过渡到绑定的死信队列中。

 再由具体的消费者来消费死信队列中的消息,这样就实现了延迟队列的功能。

 例如实现一个下单超时支付取消订单的功能:

Java의 RabbitMQ 고급 응용 프로그램 방법

위 내용은 Java의 RabbitMQ 고급 응용 프로그램 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제