Rumah  >  Artikel  >  Java  >  Bagaimana Spring Boot menyepadukan Kafka

Bagaimana Spring Boot menyepadukan Kafka

WBOY
WBOYke hadapan
2023-06-02 14:18:351399semak imbas

Langkah 1: Tambah kebergantungan

Tambah kebergantungan berikut dalam pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.0</version>
</dependency>

Langkah 2: Konfigurasikan Kafka

Tambah dalam fail application.yml Perkara berikut konfigurasi:

sping:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

Di sini kami mengkonfigurasi alamat perkhidmatan Kafka sebagai localhost:9092, mengkonfigurasi ID kumpulan pengguna sebagai my-group dan menetapkan offset terawal untuk membaca mesej. Di pihak pengeluar, kami mengkonfigurasi penyeri mesej sebagai StringSerializer.

Langkah 3: Buat penerbit

Kami kini akan mencipta pengeluar Kafka untuk menghantar mesej ke pelayan Kafka. Di sini kami akan membuat titik akhir API RESTful untuk menerima permintaan POST dan menghantar mesej kepada Kafka.

Pertama, kami akan mencipta kelas KafkaProducerConfig yang akan digunakan untuk mengkonfigurasi pengeluar Kafka:

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Dalam kod di atas, kami menggunakan anotasi @Configuration untuk mengisytiharkan KafkaProducerConfig kelas sebagai kelas Konfigurasi. Kami kemudian menyuntik atribut @Value dalam fail konfigurasi menggunakan anotasi bootstrap-servers.

Seterusnya, kami mencipta kaedah producerConfigs yang menetapkan konfigurasi pengeluar Kafka. Di sini, kami menetapkan tiga atribut BOOTSTRAP_SERVERS_CONFIG, KEY_SERIALIZER_CLASS_CONFIG dan VALUE_SERIALIZER_CLASS_CONFIG.

Kemudian, kami mencipta kaedah producerFactory yang mencipta kilang pengeluar Kafka. Di sini kami telah menggunakan kelas DefaultKafkaProducerFactory dan lulus konfigurasi kami.

Akhir sekali, kami mencipta kaedah kafkaTemplate yang mencipta kejadian KafkaTemplate. Di sini, kami menggunakan kilang pengeluar yang baru kami buat sebagai parameter dan mengembalikan contoh KafkaTemplate.

Seterusnya, kami akan mencipta titik akhir RESTful yang menerima permintaan POST dan menghantar mesej kepada Kafka. Di sini, kami akan mencipta pengawal RESTful menggunakan anotasi @RestController:

@RestController
public class KafkaController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send("my-topic", message);
    }
}

Dalam kod di atas, kami telah menggunakan anotasi @Autowired untuk menyuntik tika KafkaTemplate ke dalam kelas KafkaController. Kemudian, kami mencipta kaedah sendMessage untuk menghantar mesej kepada Kafka.

Di sini kami menggunakan kaedah kafkaTemplate.send untuk menghantar mesej kepada topik my-topic. Kaedah hantar mengembalikan objek ListenableFuture untuk pemprosesan hasil tak segerak.

Langkah 4: Buat pengguna

Sekarang, kami akan mencipta pengguna Kafka untuk menerima mesej daripada pelayan Kafka. Di sini kami akan membuat kumpulan pengguna dan mengkonfigurasinya untuk membaca mesej daripada topik my-topic.

Pertama, kami akan mencipta kelas KafkaConsumerConfig yang akan digunakan untuk mengkonfigurasi pengguna Kafka:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Dalam kod di atas, kami menggunakan anotasi @Configuration untuk mengisytiharkan KafkaConsumerConfig kelas sebagai Konfigurasikan kelas dan dayakan Kafka menggunakan anotasi @EnableKafka.

Kemudian, kami menggunakan anotasi @Value untuk menyuntik atribut bootstrap-servers dan consumer.group-id dalam fail konfigurasi.

Seterusnya, kami mencipta kaedah consumerConfigs yang menetapkan konfigurasi pengguna Kafka. Di sini, kami menetapkan lima atribut: BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIG, AUTO_OFFSET_RESET_CONFIG, KEY_DESERIALIZER_CLASS_CONFIG dan VALUE_DESERIALIZER_CLASS_CONFIG.

Kemudian, kami mencipta kaedah consumerFactory yang mencipta kilang pengguna Kafka. Di sini kami telah menggunakan kelas DefaultKafkaConsumerFactory dan lulus konfigurasi kami.

Akhir sekali, kami mencipta kaedah kafkaListenerContainerFactory yang mencipta tika ConcurrentKafkaListenerContainerFactory. Di sini kami menyuntik kilang pengguna ke dalam contoh kafkaListenerContainerFactory.

Seterusnya, kami akan mencipta kelas pengguna Kafka KafkaConsumer yang mendengar topik my-topic dan menerima mesej:

@Service
public class KafkaConsumer {
    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

Dalam kod di atas, kami menggunakan @KafkaListener Anotasi mengisytiharkan kaedah pengguna yang menerima mesej yang dibaca daripada topik my-topic. Di sini kami menetapkan ID kumpulan pengguna kepada my-group-id.

Kini, kami telah menyelesaikan penyediaan pengeluar dan pengguna Kafka. Kita boleh memulakan aplikasi menggunakan perintah mvn spring-boot:run dan menghantar permintaan POST ke titik akhir http://localhost:8080/send menggunakan arahan curl untuk menghantar mesej kepada Kafka. Kami kemudiannya boleh melihat mesej yang diterima oleh pengguna pada konsol. Ini ialah persediaan asas untuk menggunakan Spring Boot dan Kafka. Kita boleh berubah dan berkembang mengikut keperluan untuk memenuhi keperluan tertentu.

Atas ialah kandungan terperinci Bagaimana Spring Boot menyepadukan Kafka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam