Rumah >Java >javaTutorial >Cara SpringBoot menyepadukan bekas mendengar mesej tersuai RabbitMq untuk melaksanakan pemprosesan kelompok mesej

Cara SpringBoot menyepadukan bekas mendengar mesej tersuai RabbitMq untuk melaksanakan pemprosesan kelompok mesej

WBOY
WBOYke hadapan
2023-05-13 08:52:111016semak imbas

SpringBoot menyepadukan bekas mendengar mesej tersuai RabbitMq untuk melaksanakan pemprosesan kelompok mesej

Kata Pengantar

RabbitMQ ialah baris gilir mesej yang biasa digunakan telah menyepadukan dengan mendalam dan boleh merealisasikan penghantaran dan menerima mesej. Dalam RabbitMQ, penghantaran dan penerimaan mesej adalah tidak segerak, jadi pendengar diperlukan untuk mendengar ketibaan mesej. Spring Boot menyediakan bekas pendengar lalai, tetapi kadangkala kita perlu menyesuaikan bekas pendengar untuk memenuhi beberapa keperluan khas, seperti mendapatkan data dalam kelompok.

Dalam artikel ini, kami akan menggunakan Spring Boot untuk menyepadukan RabbitMQ dan menyesuaikan bekas pendengar untuk mencapai fungsi mendapatkan data dalam kelompok.
Prasyarat:
Sebelum bermula, anda perlu mempunyai syarat berikut:

  • Pelayan RabbitMQ telah dipasang dan dimulakan.

  • Baris gilir yang akan digunakan telah dibuat.

  • Sudah biasa dengan pengetahuan asas Spring Boot dan RabbitMQ.

Persediaan persekitaran:
Sebelum bermula, kita perlu menyediakan persekitaran berikut:

  • JDK 1.8 atau Versi di atas

  • Spring Boot 2.5.0 ke atas versi

  • RabbitMQ 3.8.0 ke atas versi

Tambah kebergantungan

Mula-mula, tambahkan kebergantungan berikut dalam fail pom.xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Fail konfigurasi

Seterusnya, tambahkan konfigurasi berikut dalam aplikasi. properties fail :

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

# 队列名称
spring.rabbitmq.listener.simple.queue-name=myQueue

# 最大并发消费者数量
spring.rabbitmq.listener.simple.concurrency=5

# 最小数量
spring.rabbitmq.listener.simple.min-concurrency=1

# 最大数量
spring.rabbitmq.listener.simple.max-concurrency=10

# 批量处理消息的大小
spring.rabbitmq.listener.simple.batch-size=50

atau

spring:
  rabbitmq:
    host: localhost
    listener:
      simple:
        batch-size: 50
        concurrency: 5
        max-concurrency: 10
        min-concurrency: 1
        queue-name: myQueue
    password: guest
    port: 5672
    username: guest
    virtual-host: /

Menulis Pendengar

Kemudian, kita perlu mencipta kelas pendengar untuk mengendalikan mesej yang diterima daripada baris gilir. Berikut ialah contoh mudah:

@Component
public class MyListener {
    
    @RabbitListener(queues = "myQueue", containerFactory = "myFactory")
    public void handleMessage(List<MyMessage> messages, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag)
            throws IOException {
        try {
            // 处理消息
            System.out.println("Received " + messages.size() + " messages");
            for (Message message : messages) {
           		// 处理消息
            	System.out.println("Received message: " + new String(message.getBody()));
        	}
        	channel.basicAck(messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag(), true);
        } finally {
            // 手动确认消息
            channel.basicAck(deliveryTag, true);
        }
    }
}

Dalam kod di atas, kami menggunakan anotasi @RabbitListener untuk menentukan nama baris gilir untuk didengar, dan juga menyatakan penggunaan kilang myFactory untuk mencipta bekas mendengar. Dalam pendengar ini, kami hanya mencetak mesej yang diterima.

Buat SimpleRabbitListenerContainerFactory

Seterusnya, kita perlu mencipta kilang SimpleRabbitListenerContainerFactory untuk dapat menyesuaikan tingkah laku bekas mendengar. Berikut ialah contoh mudah:

@Configuration
public class RabbitMQConfig {

//    @Bean
//    public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConnectionFactory(connectionFactory);
//        factory.setConcurrentConsumers(1);
//        factory.setMaxConcurrentConsumers(10);
//        factory.setBatchListener(true);
//        factory.setBatchSize(50);
//        return factory;
//    }

	@Bean
    public SimpleRabbitListenerContainerFactory myFactory(
            ConnectionFactory connectionFactory,
            PlatformTransactionManager transactionManager,
            MessageConverter messageConverter) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        
        // 并发消费者数,默认为 1
        factory.setConcurrentConsumers(5);
        
        // 最大并发消费者数,默认为 1
        factory.setMaxConcurrentConsumers(10);
        
        // 拒绝未确认的消息并重新将它们放回队列,默认为 true
        factory.setDefaultRequeueRejected(false);
        
        // 容器启动时是否自动启动,默认为 true
        factory.setAutoStartup(true);
        
        // 消息确认模式,默认为 AUTO
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        
        // 每个消费者在一次请求中预获取的消息数,默认为 1
        factory.setPrefetchCount(5);
        
        // 从队列中接收消息的超时时间,默认为 0,表示没有超时限制
        factory.setReceiveTimeout(1000);
        
        // 与容器一起使用的事务管理器。默认情况下,容器不会使用事务
        factory.setTransactionManager(transactionManager);
        
        // 消息转换器,用于将接收到的消息转换为 Java 对象或将 Java 对象转换为消息
        factory.setMessageConverter(messageConverter);
        
        // 用于异步消息处理的线程池。默认情况下,容器使用一个简单的 SimpleAsyncTaskExecutor
        factory.setTaskExecutor(new SimpleAsyncTaskExecutor());
        
        // 在关闭容器时等待活动线程终止的时间,默认为 5000 毫秒
        factory.setShutdownTimeout(10000);
        
        // 重试失败的消息之前等待的时间,默认为 5000 毫秒
        factory.setRecoveryInterval(5000);
        
        // 如果消息处理器尝试监听不存在的队列,是否抛出异常。默认为 true
        factory.setMissingQueuesFatal(false);
        
        // 监听器容器连接工厂
        factory.setConnectionFactory(connectionFactory);

        return factory;
    }
}

Kebanyakan sifat ini adalah pilihan dan boleh ditetapkan mengikut keperluan. Bergantung pada keperluan aplikasi, kami bebas untuk menyesuaikan sifat ini untuk meningkatkan prestasi dan kebolehpercayaan aplikasi.

Hantar Mesej

Akhir sekali, kita boleh menulis kod mesej hantar mudah untuk menghantar beberapa mesej ke baris gilir. Berikut ialah contoh mudah:

@Component
public class MySender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("myQueue", "message:" + i);
        }
    }
}

Atas ialah kandungan terperinci Cara SpringBoot menyepadukan bekas mendengar mesej tersuai RabbitMq untuk melaksanakan pemprosesan kelompok mesej. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam