在使用RabbitMQ
的時候,生產者在進行訊息投遞的時候如果想知道訊息是否成功的投遞到對應的交換器和在佇列中,有兩種方式可以用來控制訊息投遞的可靠性模式。
由上圖的整個訊息的投遞過程來看,生產者的訊息進入中間件會先到達交換機,然後再從交換器傳遞到佇列中去,也就是分為兩步驟戰略。那麼訊息的遺失情況也就是會出現在這兩個階段中,RabbitMQ 貼心的為我們提供了針對於這兩個部分的可靠新傳遞模式:
## confirm 模式。
return 模式。
confirmCallback 的回呼。可以直接在
rabbitTemplate 實例中進行確認邏輯的設定。如果是使用
XML 設定的話需要在工廠設定開啟
publisher-confirms="true",YAML 的設定就直接
publisher-confirm-type : correlated,他預設是NONE ,需要手動開啟。
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq.xml") public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Test public void producer() throws InterruptedException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println(); if (!b) { // 消息重发之类的处理 System.out.println(s); } else { System.out.println("交换机成功接收消息"); } } }); rabbitTemplate.convertAndSend("default_exchange", "default_queue", "hello world & beordie"); TimeUnit.SECONDS.sleep(5); } }上面的確認是由一個
confirm 的函數執行的,裡面攜帶了三個參數,第一個是配置的相關信息,第二個表示交換器是否成功的接收到訊息,第三個參數是指沒有成功接收訊息的原因。
returnCallback 。在工廠配置中開啟回退模式
publisher-returns="true" ,設定交換器處理訊息失敗的模式(預設 false 直接將訊息丟棄),並新增退回處理的邏輯。
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq.xml") public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Test public void producer() throws InterruptedException { rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { // 重发逻辑处理 System.out.println(message.getBody() + " 投递消息队列失败"); } }); rabbitTemplate.convertAndSend("default_exchange", "default_queue", "hello world & beordie"); TimeUnit.SECONDS.sleep(5); } }
returnedMessage 中攜帶五個參數、分別指的是訊息物件、錯誤碼、錯誤訊息、交換器、路由鍵。
自動確認:acknowledge="none"
:acknowledge="manual"
:acknowledge="auto"
如果設定手動確認的方式,就需要在正常消費訊息之後回呼確認
channel.basicAck(),手動簽收。如果業務處理過程中發生了異常則呼叫 channel.basicNack()
重新傳送訊息。 首先需要在佇列綁定時進行確認機制的配置,設定為手動簽收。
<!-- 绑定队列 --> <rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual"> <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/> </rabbit:listener-container>
生產者一端不用更改,只需要改變消費者的實現進行訊息自動簽收就可以了,正常執行業務則簽收訊息,業務發生錯誤則選擇訊息拒簽,訊息重發或丟棄。
public class ConsumerAck implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { // 消息唯一ID long tag = message.getMessageProperties().getDeliveryTag(); try { String msg = new String(message.getBody(), "utf-8"); channel.basicAck(tag, true); System.out.println("接收消息: " + msg); } catch (Exception e) { System.out.println("接收消息异常"); channel.basicNack(tag, true, true); e.printStackTrace(); } } }
裡面涉及三個簡單的簽收函數,一是正確簽收的
basicAck ,二是單條拒籤的basicReject
,三是批量拒籤的basicNack
。
第一個參數表示訊息在通道中的唯一ID,只針對目前的Channel;第二個參數表示是否批量同意,如果是false的話只會同意簽收目前ID的一條訊息,將其從訊息佇列中刪除,而如果是true 的話將會將此ID之前的訊息一起給同意簽收了。
第一個參數依舊表示訊息的唯一ID,第二個參數表示是否重新回隊發送,false 表示直接丟棄該訊息或有死信佇列可以接收, true 則表示重新回隊進行訊息傳送,所有操作只針對目前的訊息。
比第二個多了一個參數,也就是處於中間位置的布林值,表示是否批量進行。
在用户请求和DB服务处理之间增加消息中间件的隔离,使得突发流量全部让消息队列来抗,降低服务端被冲垮的可能性。让所有的请求都往队列中存,消费端只需要匀速的取出消息进行消费,这样就能保证运行效率,也不会因为后台的阻塞而导致客户端得不到正常的响应(当然指的是一些不需要同步回显的任务)。
只需要在消费者绑定消息队列时指定取出消息的速率即可,需要使用手动签收的方式,每进行一次的签收才会从队列中再取出下一条数据。
<!-- 绑定队列 --> <rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual" prefetch="1"> <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/> </rabbit:listener-container>
消息队列提供了存储在队列中消息的过期时间,分为两个方向的实现,一个是针对于整个队列中的所有消息,也就是队列的过期时间,另一个是针对当前消息的过期时间,也就是针对于单条消息单独设置。
队列的过期时间设置很简单,只需要在创建队列时进行过期时间的指定即可,也可以通过控制台直接创建指定过期时间。一旦队列过期时间到了,队列中还未被消费的消息都将过期,进行队列的过期处理。
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> </rabbit:queue-arguments> </rabbit:queue>
单条消息的过期时间需要在发送的时候进行单独的指定,发送的时候指定配置的额外信息,配置的编写由配置类完成。
如果一条消息的过期时间到了,但是他此时处于队列的中间,那么他将不会被处理,只有当之后处理到时候才会进行判断是否过期。
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置 message 的过期时间 message.getMessageProperties().setExpiration("5000"); // 返回该消息 return message; } }; rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);
如果说同时设置了消息的过期时间和队列的过期时间,那么最终的过期时间由最短的时间进行决定,也就是说如果当前消息的过期时间没到,但是整个队列的过期时间到了,那么队列中的所有消息也自然就过期了,执行过期的处理策略。
死信队列指的是死信交换机,当一条消息成为死信之后可以重新发送到另一个交换机进行处理,而进行处理的这个交换机就叫做死信交换机。
消息成为死信消息有几种情况
队列的消息长度达到限制
消费者拒接消息的时候不把消息重新放入队列中
队列存在消息过期设置,消息超时未被消费
消息存在过期时间,在投递给消费者时发现过期
在创建队列时可以在配置中指定相关的信息,例如死信交换机、队列长度等等,之后的一系列工作就不由程序员进行操作了,MQ 会自己完成配置过的事件响应。
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true"> <rabbit:queue-arguments> <!-- 死信交换机 --> <entry key="x-dead-letter-exchange" value-type="dlx_exchane"/> <!-- 路由 --> <entry key="x-dead-letter-routing-key" value-type="dlx_routing"/> <!-- 队列过期时间 --> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> <!-- 队列长度 --> <entry key="x-max-length" value-type="java.lang.Integer" value="10"/> </rabbit:queue-arguments> </rabbit:queue>
延迟队列指的是消息在进入队列后不会立即被消费,只有到达指定时间之后才会被消费,也就是需要有一个时间的判断条件。
消息队列实际上是没有提供对延迟队列的实现的,但是可以通过 TTL
+ 死信队列
的方式完成,设置一个队列,不被任何的消费者所消费,所有的消息进入都会被保存在里面,设置队列的过期时间,一旦队列过期将所有的消息过渡到绑定的死信队列中。
再由具体的消费者来消费死信队列中的消息,这样就实现了延迟队列的功能。
例如实现一个下单超时支付取消订单的功能:
以上是java中RabbitMQ高階應用方法的詳細內容。更多資訊請關注PHP中文網其他相關文章!