Rumah >Java >javaTutorial >Cara menggunakan @KafkaListener untuk menerima mesej secara serentak dalam kumpulan dalam Spring Boot
###Langkah pertama, penggunaan serentak###
Lihat kod dahulu Perkara utama ialah kami menggunakan ConcurrentKafkaListenerContainerFactory dan set factory.setConcurrency(4); Untuk mempercepatkan penggunaan, tetapkan konkurensi kepada 4, iaitu, terdapat 4 KafkaMessageListenerContainers)
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; }
Ambil perhatian bahawa anda juga boleh menambah spring.kafka.listener.concurrency=3 terus ke application.properties, dan kemudian gunakan @KafkaListener untuk penggunaan serentak.
###Langkah kedua ialah penggunaan batch###
Kemudian terdapat penggunaan batch. Perkara utama ialah factory.setBatchListener(true);
dan propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
Satu adalah untuk mendayakan penggunaan kelompok, dan satu lagi adalah untuk menetapkan bilangan maksimum rekod mesej yang kumpulan penggunaan boleh mengambil setiap masa.
Perlu ambil perhatian bahawa ConsumerConfig.MAX_POLL_RECORDS_CONFIG yang kami tetapkan ialah 50, ini tidak bermakna kami akan terus menunggu jika 50 mesej tidak dicapai. Penjelasan rasmi ialah "Bilangan maksimum rekod yang dikembalikan dalam satu panggilan ke tinjauan pendapat () iaitu, 50 mewakili bilangan maksimum rekod yang dikembalikan dalam satu tinjauan pendapat.
Daripada log permulaan, kita dapat melihat bahawa terdapat max.poll.interval.ms = 300000, yang bermaksud bahawa kita memanggil tinjauan pendapat sekali setiap selang max.poll.interval.ms. Setiap tinjauan pendapat mengembalikan sehingga 50 rekod.
penjelasan rasmi max.poll.interval.ms ialah "Lengah maksimum antara seruan poll() apabila menggunakan pengurusan kumpulan pengguna. Ini meletakkan had atas pada jumlah masa pengguna boleh melahu sebelum mengambil lebih banyak rekod. Jika tinjauan pendapat() tidak dipanggil sebelum tamat tempoh masa ini, maka pengguna dianggap gagal dan kumpulan akan mengimbangi semula untuk menetapkan semula partition kepada ahli lain ";
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public MapconsumerConfigs() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker()); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit()); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId()); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset()); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); return propsMap; }
Tangkapan skrin log permulaan
Tangkapan skrin penjelasan rasmi tentang max.poll.records dan max.poll.interval.ms:
# ## Langkah ketiga, penggunaan partition####
Untuk topik dengan hanya satu partition, penggunaan partition tidak diperlukan kerana ia tidak bermakna. Contoh berikut adalah untuk kes di mana terdapat 2 partition (terdapat 4 kaedah listenPartitionX dalam kod lengkap saya, dan topik saya mempunyai 4 partition set Pembaca boleh menyesuaikannya mengikut situasi mereka sendiri).
Atas ialah kandungan terperinci Cara menggunakan @KafkaListener untuk menerima mesej secara serentak dalam kumpulan dalam Spring Boot. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!