>  기사  >  Java  >  @KafkaListener를 사용하여 Spring Boot에서 일괄적으로 메시지를 동시에 수신하는 방법

@KafkaListener를 사용하여 Spring Boot에서 일괄적으로 메시지를 동시에 수신하는 방법

WBOY
WBOY앞으로
2023-05-13 14:01:061610검색

###첫 번째 단계, 동시 소비###
먼저 코드를 살펴보세요. 중요한 점은 ConcurrentKafkaListenerContainerFactory를 사용하고 Factory.setConcurrency(4)를 설정한다는 것입니다. (제 주제에는 속도를 높이기 위해 4개의 파티션이 있습니다.) 4로 설정, 즉 4개의 KafkaMessageListenerContainers가 있음)

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

application.properties에 spring.kafka.listener.concurrency=3을 직접 추가한 다음 동시 소비를 위해 @KafkaListener를 사용할 수도 있습니다. .

###두 번째 단계는 일괄 소비###
그 다음은 일괄 소비입니다. 핵심은 Factory.setBatchListener(true);
and propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
하나는 일괄 소비를 활성화하는 것이고, 다른 하나는 일괄 소비가 각각 소비할 수 있는 최대 메시지 레코드 수를 설정하는 것입니다. 시간.

우리가 설정한 ConsumerConfig.MAX_POLL_RECORDS_CONFIG가 50이라는 점에 유의하는 것이 중요합니다. 이는 50개의 메시지에 도달하지 않아도 계속 대기한다는 의미는 아닙니다. 공식적인 설명은 "poll()에 대한 단일 호출에서 반환되는 최대 레코드 수"입니다. 즉, 50은 한 번의 폴링에서 반환되는 최대 레코드 수를 나타냅니다.

시작 로그를 보면 max.poll.interval.ms = 300000이라는 것을 알 수 있는데, 이는 max.poll.interval.ms 간격마다 한 번씩 poll을 호출한다는 의미입니다. 각 설문조사는 최대 50개의 레코드를 반환합니다.

max.poll.interval.ms 공식 설명은 "소비자 그룹 관리를 사용할 때 poll() 호출 간의 최대 지연입니다. 이는 더 많은 레코드를 가져오기 전에 소비자가 유휴 상태일 수 있는 시간의 상한을 지정합니다. 이 시간 초과가 만료되기 전에 poll()이 호출되지 않으면 소비자는 실패한 것으로 간주되고 그룹은 파티션을 다른 구성원에게 재할당하기 위해 재조정됩니다. ";

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

   @Bean
    public Map consumerConfigs() {
        Map propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }

시작 로그 스크린샷

@KafkaListener를 사용하여 Spring Boot에서 일괄적으로 메시지를 동시에 수신하는 방법

max.poll 정보 .records 및 max.poll.interval.ms의 공식 설명 스크린샷:

@KafkaListener를 사용하여 Spring Boot에서 일괄적으로 메시지를 동시에 수신하는 방법

###세 번째 단계, 파티션 소비###
파티션이 하나만 있는 주제의 경우 파티션 소비가 필요하지 않습니다. 의미가 없습니다. 다음 예는 2개의 파티션이 있는 경우입니다(내 전체 코드에는 4개의 listeningPartitionX 메소드가 있고 내 주제에는 4개의 파티션 세트가 있습니다). 독자는 자신의 상황에 따라 이를 조정할 수 있습니다.

아아아아

위 내용은 @KafkaListener를 사용하여 Spring Boot에서 일괄적으로 메시지를 동시에 수신하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제