>  기사  >  Java  >  Java RabbitMQ의 지속성 및 릴리스 확인 구현 방법

Java RabbitMQ의 지속성 및 릴리스 확인 구현 방법

王林
王林앞으로
2023-04-25 16:19:081275검색

    1. 지속성

    RabbitMQ 서비스가 중지되더라도 메시지 생성자가 보낸 메시지는 손실되지 않습니다. 기본적으로 RabbitMQ가 종료되거나 충돌할 때 대기열과 메시지는 무시됩니다. 메시지가 손실되지 않도록 하려면 큐와 메시지를 모두 지속성으로 표시해야 합니다.

    1.1 지속성 구현

    1. 대기열 지속성: 대기열을 생성할 때 channel.queueDeclare();의 두 번째 매개변수를 true로 변경합니다. channel.queueDeclare();第二个参数改为true。

    2.消息持久化:在使用信道发送消息时channel.basicPublish();将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN表示持久化消息。

    /**
     * @Description 持久化MQ
     * @date 2022/3/7 9:14
     */
    public class Producer3 {
        private static final String LONG_QUEUE = "long_queue";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 持久化队列
            channel.queueDeclare(LONG_QUEUE,true,false,false,null);
            Scanner scanner = new Scanner(System.in);
            int i = 0;
            while (scanner.hasNext()){
                i++;
                String msg = scanner.next() + i;
                // 持久化消息
                channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                System.out.println("发送消息:'" + msg + "'成功");
            }
        }
    }

    但是存储消息还有存在一个缓存的间隔点,没有真正的写入磁盘,持久性保证不够强,但是对于简单队列而言也绰绰有余。

    1.2 不公平分发

    轮询分发的方式在消费者处理效率不同的情况下并不适用。所以真正的公平应该是遵循能者多劳的前提。

    在消费者处修改channel.basicQos(1);表示开启不公平分发

    /**
     * @Description 不公平分发消费者
     * @date 2022/3/7 9:27
     */
    public class Consumer2 {
        private static final String LONG_QUEUE = "long_queue";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                // 模拟并发沉睡三十秒
                try {
                    Thread.sleep(30000);
                    System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            // 设置不公平分发
            channel.basicQos(1);
            channel.basicConsume(LONG_QUEUE,false,deliverCallback,
                    consumerTag -> {
                        System.out.println(consumerTag + "消费者取消消费");
                    });
        }
    }

    1.3 测试不公平分发

    测试目的:是否能实现能者多劳。

    测试方法:两个消费者睡眠不同的事件来模拟处理事件不同,如果处理时间(睡眠时间)短的能够处理多个消息就代表目的达成。

    先启动生产者创建队列,再分别启动两个消费者。

    生产者按照顺序发四条消息:

    Java RabbitMQ의 지속성 및 릴리스 확인 구현 방법

    睡眠时间短的线程A接收到了三条消息

    Java RabbitMQ의 지속성 및 릴리스 확인 구현 방법

    而睡眠时间长的线程B只接收到的第二条消息:

    Java RabbitMQ의 지속성 및 릴리스 확인 구현 방법

    因为线程B在处理消息时消耗的时间较长,所以就将其他消息分配给了线程A。

    实验成功!

    1.4 预取值

    消息的发送和手动确认都是异步完成的,因此就存在一个未确认消息的缓冲区,开发人员希望能够限制缓冲区的大小,用来避免缓冲区里面无限制的未确认消息问题。

    这里的预期值就值得是上述方法channel.basicQos();里面的参数,如果在当前信道上存在等于参数的消息就不会在安排当前信道进行消费消息。

    1.4.1 代码测试

    测试方法:

    1.新建两个不同的消费者分别给定预期值5个2。

    2.给睡眠时间长的指定为5,时间短的指定为2。

    3.假如按照指定的预期值获取消息则表示测试成功,但并不是代表一定会按照5和2分配,这个类似于权重的判别。

    代码根据上述代码修改预期值即可。

    2. 发布确认

    发布确认就是生产者发布消息到队列之后,队列确认进行持久化完毕再通知给生产者的过程。这样才能保证消息不会丢失。

    需要注意的是需要开启队列持久化才能使用确认发布。
    开启方法:channel.confirmSelect();

    2.1 单个确认发布

    是一种同步发布的方式,即发送完一个消息之后只有确认它确认发布后,后续的消息才会继续发布,在指定的时间内没有确认就会抛出异常。缺点就是特别慢。

    /**
     * @Description 确认发布——单个确认
     * @date 2022/3/7 14:49
     */
    public class SoloProducer {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_solo";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = ""+i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 单个发布确认
                boolean flag = channel.waitForConfirms();
                if (flag){
                    System.out.println("发送消息:" + i);
                }
            }
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");   }
    }

    2.2 批量确认发布

    一批一批的确认发布可以提高系统的吞吐量。但是缺点是发生故障导致发布出现问题时,需要将整个批处理保存在内存中,后面再重新发布。

    /**
     * @Description 确认发布——批量确认
     * @date 2022/3/7 14:49
     */
    public class BatchProducer {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_batch";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 设置一个多少一批确认一次。
            int batchSize = MESSAGE_COUNT / 10;
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = ""+i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 批量发布确认
                if (i % batchSize == 0){
                    if (channel.waitForConfirms()){
                        System.out.println("发送消息:" + i);
                    }
                }
            }
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }

    显然效率要比单个确认发布的高很多。

    2.3 异步确认发布

    在编程上比上述两个要复杂,但是性价比很高,无论是可靠性还行效率的都好很多,利用回调函数来达到消息可靠性传递的。

    /**
     * @Description 确认发布——异步确认
     * @date 2022/3/7 14:49
     */
    public class AsyncProducer {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_async";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            // 确认成功回调
            ConfirmCallback ackCallback = (deliveryTab,multiple) ->{
                System.out.println("确认成功消息:" + deliveryTab);
            };
            // 确认失败回调
            ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
                System.out.println("未确认的消息:" + deliveryTab);
            };
            // 消息监听器
            /**
             * addConfirmListener:
             *                  1. 确认成功的消息;
             *                  2. 确认失败的消息。
             */
            channel.addConfirmListener(ackCallback,nackCallback);
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "" + i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            }
    
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }

    2.4 处理未确认的消息

    最好的处理方式把未确认的消息放到一个基于内存的能被发布线程访问的队列。

    例如:ConcurrentLinkedQueue可以在确认队列confirm callbacks

    2. 메시지 지속성: 채널을 사용하여 메시지를 보낼 때 channel.basicPublish();는 세 번째 매개변수를 MessageProperties.PERSISTENT_TEXT_PLAIN으로 변경하여 지속성 메시지를 나타냅니다.

    ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
    그러나 메시지를 저장하는 데 캐시 간격이 있으며 실제로 디스크에 쓰는 일이 없으며 내구성 보장도 충분히 강력하지는 않지만 간단한 대기열에는 충분합니다.

    1.2 불공정한 배분

    폴링 배분 방식은 소비자마다 처리 효율이 다를 경우 적합하지 않습니다. 그러므로 진정한 공정성은 더 많은 일을 할 수 있는 사람이 더 많은 일을 해야 한다는 전제를 따라야 한다.

    불공정 분배를 활성화하도록 소비자에서 channel.basicQos(1);를 수정🎜
    /**
     * @Description 异步发布确认,处理未发布成功的消息
     * @date 2022/3/7 18:09
     */
    public class AsyncProducerRemember {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_async_remember";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 线程安全有序的一个hash表,适用与高并发
            ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            // 确认成功回调
            ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
                //2. 在发布成功确认处删除;
                // 批量删除
                if (multiple){
                    ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
                    confirmMap.clear();
                }else {
                    // 单独删除
                    map.remove(deliveryTab);
                }
                System.out.println("确认成功消息:" + deliveryTab);
            };
            // 确认失败回调
            ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
                // 3. 打印未确认的消息。
                System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);
            };
            // 消息监听器
            /**
             * addConfirmListener:
             *                  1. 确认成功的消息;
             *                  2. 确认失败的消息。
             */
            channel.addConfirmListener(ackCallback,nackCallback);
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "" + i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 1. 记录要发送的全部消息;
                map.put(channel.getNextPublishSeqNo(),msg);
            }
    
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }
    🎜1.3 불공정 분배 테스트🎜🎜🎜테스트 목적🎜: 할 수 있는 사람에게 도달할 수 있는지 여부 더 많은 일. 🎜🎜🎜테스트 방법🎜: 두 소비자가 서로 다른 이벤트를 잠자기하여 서로 다른 처리 이벤트를 시뮬레이션합니다. 처리 시간(수면 시간)이 짧고 여러 메시지를 처리할 수 있으면 목적이 달성됩니다. 🎜🎜먼저 생산자를 시작하여 대기열을 만든 다음 두 소비자를 각각 시작합니다. 🎜🎜🎜생산자는 4개의 메시지를 순서대로 보냅니다: 🎜🎜🎜Java 구현 방법 RabbitMQ 지속성 및 릴리스 확인🎜🎜잠자기 시간이 짧은 스레드 A가 세 개의 메시지를 받았습니다🎜🎜Java RabbitMQ가 지속성 및 릴리스 확인을 구현하는 방법🎜🎜오랫동안 휴면 상태인 스레드 B는 두 번째 메시지만 받았습니다: 🎜🎜Java RabbitMQ가 지속성 및 릴리스 확인을 구현하는 방법🎜🎜스레드 B는 메시지를 처리하는 데 오랜 시간이 걸리기 때문에 다른 메시지는 스레드 A에 할당됩니다. 🎜🎜실험 성공! 🎜🎜1.4 프리페치 값🎜🎜🎜 메시지 전송 및 수동 확인이 비동기식으로 완료되므로 확인되지 않은 메시지의 버퍼가 있습니다. 개발자는 확인되지 않은 메시지 문제를 방지하기 위해 버퍼 크기를 제한하기를 원합니다. 🎜🎜🎜여기서 예상되는 값은 위 메소드 channel.basicQos();의 매개변수입니다. 현재 채널에 매개변수와 동일한 메시지가 있는 경우 현재 채널은 소비하도록 정렬되지 않습니다. 메시지. 🎜
    1.4.1 코드 테스트
    🎜🎜테스트 방법: 🎜🎜🎜1. 서로 다른 두 개의 Consumer를 생성하고 각각 5와 2의 기대값을 제공합니다. 🎜🎜2. 긴 수면 시간을 5로, 짧은 수면 시간을 2로 지정합니다. 🎜🎜3. 지정된 기대값에 따라 메시지가 얻어지면 테스트에 성공한 것이지만 5와 2에 따라 배포된다는 의미는 아닙니다. 이는 가중치 판단과 유사합니다. 🎜🎜코드는 위 코드에 따라 예상 값을 수정할 수 있습니다. 🎜🎜2. 릴리스 확인 🎜🎜🎜 릴리스 확인은 생산자가 메시지를 대기열에 릴리스한 후 대기열이 이를 확인하고 유지한 다음 생산자에게 알리는 프로세스입니다. 이렇게 하면 메시지가 손실되지 않습니다. 🎜🎜🎜확인된 게시를 사용하려면 대기열 지속성을 켜야 한다는 점에 유의해야 합니다.
    열기 방법: channel.confirmSelect();🎜🎜2.1 단일 확인 게시🎜🎜🎜는 동기 게시 방법입니다. 즉, 메시지를 보낸 후 확인 및 게시된 후에만 입니다. , 후속 메시지는 계속 게시되며 지정된 시간 내에 확인이 없으면 예외가 발생합니다. 단점은 속도가 매우 느리다는 것입니다. 🎜🎜rrreee🎜2.2 일괄 확인 릴리스🎜🎜🎜 일괄 확인 릴리스는 시스템의 처리량을 향상시킬 수 있습니다. 하지만 게시에 실패하고 문제가 발생하면 전체 배치를 메모리에 저장했다가 나중에 다시 게시해야 한다는 단점이 있습니다. 🎜🎜rrreee🎜분명히 단일 확인 릴리스보다 효율성이 훨씬 높습니다. 🎜🎜2.3 비동기 확인 릴리스🎜🎜🎜는 위의 두 가지보다 프로그래밍이 더 복잡하지만 비용 효율적이며 신뢰성이 있든 효율적이든 콜백 기능을 사용하여 안정적인 메시지 전달을 달성합니다. 🎜🎜rrreee🎜2.4 확인되지 않은 메시지 처리 🎜🎜🎜확인되지 않은 메시지를 처리하는 가장 좋은 방법은 확인되지 않은 메시지를 게시 스레드에서 액세스할 수 있는 메모리 기반 대기열에 넣는 것입니다. 🎜🎜🎜예: ConcurrentLinkedQueue는 확인 대기열 콜백 확인과 게시 스레드 간에 메시지를 전송할 수 있습니다. 🎜🎜🎜처리 방법: 🎜🎜🎜1. 보낼 메시지를 모두 녹음하세요. 🎜🎜2. 확인되지 않은 메시지를 인쇄하세요. 🎜🎜해시 테이블을 사용하여 메시지를 저장하면 다음과 같은 장점이 있습니다. 🎜

    可以将需要和消息进行关联;轻松批量删除条目;支持高并发。

    ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
    /**
     * @Description 异步发布确认,处理未发布成功的消息
     * @date 2022/3/7 18:09
     */
    public class AsyncProducerRemember {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_async_remember";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 线程安全有序的一个hash表,适用与高并发
            ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            // 确认成功回调
            ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
                //2. 在发布成功确认处删除;
                // 批量删除
                if (multiple){
                    ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
                    confirmMap.clear();
                }else {
                    // 单独删除
                    map.remove(deliveryTab);
                }
                System.out.println("确认成功消息:" + deliveryTab);
            };
            // 确认失败回调
            ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
                // 3. 打印未确认的消息。
                System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);
            };
            // 消息监听器
            /**
             * addConfirmListener:
             *                  1. 确认成功的消息;
             *                  2. 确认失败的消息。
             */
            channel.addConfirmListener(ackCallback,nackCallback);
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "" + i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 1. 记录要发送的全部消息;
                map.put(channel.getNextPublishSeqNo(),msg);
            }
    
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }

    위 내용은 Java RabbitMQ의 지속성 및 릴리스 확인 구현 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    성명:
    이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제