Heim  >  Artikel  >  Java  >  Wie SpringBoot den benutzerdefinierten RabbitMq-Nachrichtenüberwachungscontainer integriert, um die Nachrichtenstapelverarbeitung zu implementieren

Wie SpringBoot den benutzerdefinierten RabbitMq-Nachrichtenüberwachungscontainer integriert, um die Nachrichtenstapelverarbeitung zu implementieren

WBOY
WBOYnach vorne
2023-05-13 08:52:11970Durchsuche

SpringBoot integriert den benutzerdefinierten Nachrichten-Listening-Container von RabbitMq, um die Stapelverarbeitung von Nachrichten zu implementieren Durch die Integration können Nachrichten schnell gesendet und empfangen werden. In RabbitMQ erfolgt das Senden und Empfangen von Nachrichten asynchron, daher ist ein Listener erforderlich, der auf den Eingang von Nachrichten wartet. Spring Boot bietet einen Standard-Listener-Container. Manchmal müssen wir den Listener-Container jedoch anpassen, um bestimmte Anforderungen zu erfüllen, z. B. das Abrufen von Daten in Stapeln.

In diesem Artikel werden wir Spring Boot verwenden, um RabbitMQ zu integrieren und einen Listener-Container anzupassen, um die Funktion des stapelweisen Abrufens von Daten zu erreichen.

Voraussetzungen:

Bevor Sie beginnen, müssen Sie die folgenden Bedingungen erfüllen:

Haben Sie den RabbitMQ-Server installiert und beginnen.

  • Die zu verwendende Warteschlange wurde erstellt.

  • Ich bin bereits mit den Grundkenntnissen von Spring Boot und RabbitMQ vertraut.

  • Umgebungsvorbereitung:
Bevor wir beginnen, müssen wir die folgende Umgebung vorbereiten:


JDK 1.8 oder höher 🎜🎜#RabbitMQ 3.8.0 oder höher

  • Abhängigkeiten hinzufügen

  • Zuerst hinzufügen die folgenden Abhängigkeiten in der Datei pom.xml: #🎜 🎜#
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • Konfigurationsdatei

    Fügen Sie als Nächstes die folgende Konfiguration in der Datei application.properties hinzu:
  • spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    
    # 队列名称
    spring.rabbitmq.listener.simple.queue-name=myQueue
    
    # 最大并发消费者数量
    spring.rabbitmq.listener.simple.concurrency=5
    
    # 最小数量
    spring.rabbitmq.listener.simple.min-concurrency=1
    
    # 最大数量
    spring.rabbitmq.listener.simple.max-concurrency=10
    
    # 批量处理消息的大小
    spring.rabbitmq.listener.simple.batch-size=50
    #🎜🎜 # oder
  • spring:
      rabbitmq:
        host: localhost
        listener:
          simple:
            batch-size: 50
            concurrency: 5
            max-concurrency: 10
            min-concurrency: 1
            queue-name: myQueue
        password: guest
        port: 5672
        username: guest
        virtual-host: /

    geschriebener Listener

  • Dann müssen wir eine Listener-Klasse erstellen, um die aus der Warteschlange empfangenen Nachrichten zu verarbeiten. Das Folgende ist ein einfaches Beispiel:
@Component
public class MyListener {
    
    @RabbitListener(queues = "myQueue", containerFactory = "myFactory")
    public void handleMessage(List<MyMessage> messages, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag)
            throws IOException {
        try {
            // 处理消息
            System.out.println("Received " + messages.size() + " messages");
            for (Message message : messages) {
           		// 处理消息
            	System.out.println("Received message: " + new String(message.getBody()));
        	}
        	channel.basicAck(messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag(), true);
        } finally {
            // 手动确认消息
            channel.basicAck(deliveryTag, true);
        }
    }
}

Im obigen Code verwenden wir die Annotation @RabbitListener, um den Namen der abzuhörenden Warteschlange anzugeben, und geben außerdem die Verwendung der myFactory-Factory zum Erstellen des Abhörcontainers an. In diesem Listener drucken wir einfach die empfangene Nachricht aus.

SimpleRabbitListenerContainerFactory erstellen

Als nächstes müssen wir eine SimpleRabbitListenerContainerFactory-Factory erstellen, um das Verhalten des Listener-Containers anpassen zu können. Hier ist ein einfaches Beispiel:

@Configuration
public class RabbitMQConfig {

//    @Bean
//    public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConnectionFactory(connectionFactory);
//        factory.setConcurrentConsumers(1);
//        factory.setMaxConcurrentConsumers(10);
//        factory.setBatchListener(true);
//        factory.setBatchSize(50);
//        return factory;
//    }

	@Bean
    public SimpleRabbitListenerContainerFactory myFactory(
            ConnectionFactory connectionFactory,
            PlatformTransactionManager transactionManager,
            MessageConverter messageConverter) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        
        // 并发消费者数,默认为 1
        factory.setConcurrentConsumers(5);
        
        // 最大并发消费者数,默认为 1
        factory.setMaxConcurrentConsumers(10);
        
        // 拒绝未确认的消息并重新将它们放回队列,默认为 true
        factory.setDefaultRequeueRejected(false);
        
        // 容器启动时是否自动启动,默认为 true
        factory.setAutoStartup(true);
        
        // 消息确认模式,默认为 AUTO
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        
        // 每个消费者在一次请求中预获取的消息数,默认为 1
        factory.setPrefetchCount(5);
        
        // 从队列中接收消息的超时时间,默认为 0,表示没有超时限制
        factory.setReceiveTimeout(1000);
        
        // 与容器一起使用的事务管理器。默认情况下,容器不会使用事务
        factory.setTransactionManager(transactionManager);
        
        // 消息转换器,用于将接收到的消息转换为 Java 对象或将 Java 对象转换为消息
        factory.setMessageConverter(messageConverter);
        
        // 用于异步消息处理的线程池。默认情况下,容器使用一个简单的 SimpleAsyncTaskExecutor
        factory.setTaskExecutor(new SimpleAsyncTaskExecutor());
        
        // 在关闭容器时等待活动线程终止的时间,默认为 5000 毫秒
        factory.setShutdownTimeout(10000);
        
        // 重试失败的消息之前等待的时间,默认为 5000 毫秒
        factory.setRecoveryInterval(5000);
        
        // 如果消息处理器尝试监听不存在的队列,是否抛出异常。默认为 true
        factory.setMissingQueuesFatal(false);
        
        // 监听器容器连接工厂
        factory.setConnectionFactory(connectionFactory);

        return factory;
    }
}

Die meisten dieser Eigenschaften sind optional und können nach Bedarf festgelegt werden. Abhängig von den Anforderungen der Anwendung können wir diese Eigenschaften frei anpassen, um die Leistung und Zuverlässigkeit der Anwendung zu verbessern.

Nachricht senden

Schließlich können wir einen einfachen Nachrichtensendecode schreiben, um einige Nachrichten an die Warteschlange zu senden. Hier ist ein einfaches Beispiel:

@Component
public class MySender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("myQueue", "message:" + i);
        }
    }
}

Das obige ist der detaillierte Inhalt vonWie SpringBoot den benutzerdefinierten RabbitMq-Nachrichtenüberwachungscontainer integriert, um die Nachrichtenstapelverarbeitung zu implementieren. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen