首页 >Java >java教程 >Java RabbitMQ高级特性实例分析

Java RabbitMQ高级特性实例分析

WBOY
WBOY转载
2023-04-29 20:25:05859浏览

    消息的可靠投递

    在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

    • confirm 确认模式

    • return 退回模式

    rabbitmq整个消息投递的路径为:

    producer—>rabbitmq broker—>exchange—>queue—>consumer

    • 消息从producer到exchange则会返回一个confirmCallback

    • 消息从exchange—>queue投递失败则会返回一个returnCallback

    我们可以利用这两个callback控制消息的可靠性投递

    确认模式

    消息从 producer 到 exchange 则会返回一个 confirmCallback

    以spring整合rabbitmq为例,修改rabbitmq配置文件,在connectionFactory中添加publisher-confirms属性并设置值为true

    <!--
    * 确认模式:
    * 步骤:
    * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
    -->
    <!-- 定义rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"
                                   publisher-confirms="true"/>
    /*
     * 确认模式:
     * 步骤:
     * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
     */
    @Test
        public void queueTest(){
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                /**
                 *
                 * @param correlationData 相关配置信息
                 * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
                 * @param cause 失败原因
                 */
                    System.out.println("confirm方法被执行了....");
                    if (ack) {
                        //接收成功
                        System.out.println("接收成功消息" + cause);
                    } else {
                        //接收失败
                        System.out.println("接收失败消息" + cause);
                        //做一些处理,让消息再次发送。
                    }
                }
            });
            //路由键与队列同名
            rabbitTemplate.convertAndSend("spring_queue", "message confirm....");
        }

    Java RabbitMQ高级特性实例分析

    因为正常向队列中发送了消息,所以返回的cause值为空,如果出现异常,cause为异常原因

    退回模式

    消息从 exchange–>queue 投递失败则会返回一个 returnCallback

    1.开启回退模式:publisher-returns=“true”

        <!-- 定义rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"
                                   publisher-returns="true"/>

    2.设置Exchange处理消息失败的模式:setMandatory,然后设置ReturnCallBack

        @Test
        public void queueTest(){
            //1.设置交换机处理失败消息的模式
            rabbitTemplate.setMandatory(true);
            //2.设置ReturnCallBack
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                /**
                 * @param message    消息对象
                 * @param replyCode  错误码
                 * @param replyText  错误信息
                 * @param exchange   交换机
                 * @param routingKey 路由键
                 */
                @Override
                public void returnedMessage(Message message, int replyCode, String
                        replyText, String exchange, String routingKey) {
                    System.out.println("return 执行了....");
                    System.out.println(message);
                    System.out.println(replyCode);
                    System.out.println(replyText);
                    System.out.println(exchange);
                    System.out.println(routingKey);
                    //处理
                }
            });
            //手动添加错误路由模拟错误发生
            rabbitTemplate.convertAndSend("spring_topic_exchange", "return123", "return message...");
        }

    此处只有发生错误才会返回消息,因此手动加上一个错误,给发送消息添加路由值return123,实际上并没有这个路由,运行返回消息如下。

    Java RabbitMQ高级特性实例分析

    Consumer Ack

    ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

    有三种确认方式:

    • 自动确认:acknowledge=“none”

    • 手动确认:acknowledge=“manual”

    • 根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦,没有进行学习)

    其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

    还是以spring整合rabbitmq为例,rabbitmq配置文件中设置确认方式

    <rabbit:listener-container connection-factory="connectionFactory"
    acknowledge="manual">
    .....

    监听类代码如下:

    public class AckListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //1.接收转换消息
                System.out.println(new String(message.getBody()));
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                int i = 3/0;//出现错误
                // 3. 手动签收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                //4.拒绝签收
                /*
                 *第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会
                 *重新发送该消息给消费端
                 */
                channel.basicNack(deliveryTag,true,true);
                //channel.basicReject(deliveryTag,true);
            }
        }
    }

    因为出现异常调用channel.basicNack()方法,让其自动重新发送消息,所以无限循环输出内容

    Java RabbitMQ高级特性实例分析

    消费端限流

    Java RabbitMQ高级特性实例分析

    当我们的 Rabbitmq 服务器积压了有上万条未处理的消息时,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,我们无法约束生产端,这是用户的行为。所以我们应该对消费端限流,rabbitmq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(给channel或者consume设置Qos值)未被确认前,不进行消费新消息。

    1.确保ack机制为手动确认

    2.listener-container配置属性perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。

    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true" acknowledge="manual" prefetch="1">
            <rabbit:listener ref="topicListenerACK" queue-names="spring_topic_queue_well2"/>
    </rabbit:listener-container>

    生产者,发送五条消息

        @Test
        public void topicTest(){
    /**
     * 参数1:交换机名称
     * 参数2:路由键名
     * 参数3:发送的消息内容
     */
            for (int i=0;i<5;i++){
                rabbitTemplate.convertAndSend("spring_topic_exchange", "xzk.a", "发送到spring_topic_exchange交换机xzk.cn的消息"+i);
            }
        }
    }

    生产者注释掉channel.basicAck(deliveryTag,true)即不确认收到消息

    public class AckListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //1.接收转换消息
                System.out.println(new String(message.getBody()));
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                // 3. 手动签收
                //channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                //4.拒绝签收
                /*
                 *第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会
                 *重新发送该消息给消费端
                 */
                channel.basicNack(deliveryTag,true,true);
            }
        }
    }

    此时启动消费者再运行生产者之后,发现消费者发送了五条消息,实际上生产者只接受到了一条消息,达到限流作用

    Java RabbitMQ高级特性实例分析

    观察rabbitmq控制台,发现有1条unack消息。4条ready消息,还没到达consumer。和我们设置的prefetchCount=1限流情况相符。

    Java RabbitMQ高级特性实例分析

    把channel.basicAck(deliveryTag,true)的注释取消掉,即可以自动确认收到消息,重新运行消费者,接收到了另外的四条消息

    Java RabbitMQ高级特性实例分析

    Java RabbitMQ高级特性实例分析

    TTL(Time To Live)

    Time To Live,消息过期时间设置

    设置某个队列为过期队列

    设置交换机,队列以及队列过期时间为10000ms

     <!--ttl-->
        <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
            <rabbit:queue-arguments>
                <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
            </rabbit:queue-arguments>
        </rabbit:queue>
        <rabbit:topic-exchange name="test_exchange_ttl">
            <rabbit:bindings>
                <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"/>
            </rabbit:bindings>
        </rabbit:topic-exchange>

    生产者发送10条消息

        @Test
        public void testTtl() {
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl...");
            }

    Java RabbitMQ高级特性实例分析

    十秒钟后,过期消息消失

    Java RabbitMQ高级特性实例分析

    设置单独某个消息过期

    设置交换机和队列

    <rabbit:queue name="test_queue_ttl" id="test_queue_ttl"/>
    <rabbit:topic-exchange name="test_exchange_ttl">
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"/>     
        </rabbit:bindings>
    </rabbit:topic-exchange>

    生产者发送特定过期消息,用到了MessagePostProcessor这个api

     @Test
        public void testTtl() {
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //1.设置message信息
                    message.getMessageProperties().setExpiration("5000");//消息的过期时间
                    //2.返回该消息
                    return message;
                }
            };
            //消息单独过期
            rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl...",messagePostProcessor);
        }

    Java RabbitMQ高级特性实例分析

    5s之后

    Java RabbitMQ高级特性实例分析

    注:

    1.如果同时设置队列过期和消息过期,系统会根据哪个过期的时间短而选用哪儿个。

    2.设置单独消息过期时,如果该消息不为第一个接受的消息,则不过期。

    死信队列

    死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Deadmessage后,可以被重新发送到另一个交换机,这个交换机就是DLX。

    Java RabbitMQ高级特性实例分析

    消息成为死信的三种情况:

    • 队列消息长度到达限制;

    • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

    • 原队列存在消息过期设置,消息到达超时时间未被消费;

    队列绑定死信交换机:

    给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

    Java RabbitMQ高级特性实例分析

    实现

    1.声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)

    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
        <!--正常队列绑定死信交换机-->
        <rabbit:queue-arguments>
            <!--x-dead-letter-exchange:死信交换机名称-->
            <entry key="x-dead-letter-exchange" value="exchange_dlx" />
            <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
            <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
            <!--4.1 设置队列的过期时间 ttl-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
            <!--4.2 设置队列的长度限制 max-length -->
            <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx">
            </rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx)

    <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    3.生产端测试

    /**
    * 发送测试死信消息:
    * 1. 过期时间
    * 2. 长度限制
    * 3. 消息拒收
    */
    @Test
    public void testDlx(){
        //1. 测试过期时间,死信消息
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
        //2. 测试长度限制后,消息死信
        /* for (int i = 0; i < 20; i++) {
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
        }*/
        //3. 测试消息拒收
        //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
    }

    4.消费端监听

    public class DlxListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //1.接收转换消息
                System.out.println(new String(message.getBody()));
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                int i = 3/0;//出现错误
                //3. 手动签收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                System.out.println("出现异常,拒绝接受");
                //4.拒绝签收,不重回队列 requeue=false
                channel.basicNack(deliveryTag,true,false);
            }
        }
    }
    <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx">
    </rabbit:listener>

    延迟队列

    延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。c

    需求:

    1.下单后,30分钟未支付,取消订单,回滚库存。

    2.新用户注册成功7天后,发送短信问候。

    实现方式:

    • 定时器

    • 延迟队列

    定时器的实现方式不够优雅,我们采取延迟队列的方式

    Java RabbitMQ高级特性实例分析

    不过很可惜,在RabbitMQ中并未提供延迟队列功能。

    但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

    Java RabbitMQ高级特性实例分析

    配置

    <!--
    延迟队列:
            1. 定义正常交换机(order_exchange)和队列(order_queue)
            2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
            3. 绑定,设置正常队列过期时间为30分钟
    -->
    <!-- 定义正常交换机(order_exchange)和队列(order_queue)-->
    <rabbit:queue id="order_queue" name="order_queue">
    <!-- 绑定,设置正常队列过期时间为30分钟-->
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="order_exchange">
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!-- 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
    <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="order_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    生产端测试

    @Test
    public void testDelay() throws InterruptedException {
        //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2019年8月17日16:41:47");
        /*//2.打印倒计时10秒
        for (int i = 10; i > 0 ; i--) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }*/
    }

    消费端监听

    public class OrderListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
    		long deliveryTag = message.getMessageProperties().getDeliveryTag();
    		try {
    			//1.接收转换消息
    			System.out.println(new String(message.getBody()));
    			//2. 处理业务逻辑
    			System.out.println("处理业务逻辑...");
    			System.out.println("根据订单id查询其状态...");
    			System.out.println("判断状态是否为支付成功");
    			System.out.println("取消订单,回滚库存....");
    			//3. 手动签收
    			channel.basicAck(deliveryTag,true);
    		} catch (Exception e) {
    			//e.printStackTrace();
    			System.out.println("出现异常,拒绝接受");
    			//4.拒绝签收,不重回队列 requeue=false
    			channel.basicNack(deliveryTag,true,false);
    		}
    	}
    }
    <rabbit:listener ref="orderListener" queue-names="order_queue_dlx">
    </rabbit:listener>

    以上是Java RabbitMQ高级特性实例分析的详细内容。更多信息请关注PHP中文网其他相关文章!

    声明:
    本文转载于:yisu.com。如有侵权,请联系admin@php.cn删除