pom.xml に次の依存関係を追加します:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.0</version> </dependency>
application.yml で
次の設定をファイルに追加します:
sping: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest producer: value-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer
ここでは、Kafka のサービス アドレスを localhost:9092
として設定し、コンシューマ グループ ID を my-group
として設定します。そして、メッセージを読み取るための最も早いオフセットを設定します。プロデューサ側では、メッセージ シリアライザーを StringSerializer
として構成しました。
次に、Kafka サーバーにメッセージを送信するための Kafka プロデューサーを作成する必要があります。ここでは、POST リクエストを受信し、Kafka にメッセージを送信するための RESTful API エンドポイントを作成します。
最初に、Kafka プロデューサーを構成するための KafkaProducerConfig
クラスを作成します。
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
上記のコードでは、@Configuration## を使用します # アノテーションは宣言します
KafkaProducerConfig クラスを構成クラスとして使用します。次に、
@Value 注釈を使用して、構成ファイルに
bootstrap-servers プロパティを挿入します。
ProducerConfigs メソッドを作成しました。ここでは、
BOOTSTRAP_SERVERS_CONFIG、
KEY_SERIALIZER_CLASS_CONFIG、および
VALUE_SERIALIZER_CLASS_CONFIG プロパティを設定します。
ProducerFactory メソッドを作成しました。ここでは、
DefaultKafkaProducerFactory クラスを使用し、設定を渡しました。
KafkaTemplate インスタンスを作成するための
kafkaTemplate メソッドを作成しました。ここでは、作成したばかりのプロデューサー ファクトリをパラメータとして使用し、
KafkaTemplate インスタンスを返します。
@RestController アノテーションを使用して RESTful コントローラーを作成します。
@RestController public class KafkaController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @PostMapping("/send") public void sendMessage(@RequestBody String message) { kafkaTemplate.send("my-topic", message); } }上記のコードでは、
@Autowired アノテーションを
KafkaTemplate に使用します。 インスタンスは
KafkaController クラスに挿入されます。次に、Kafka にメッセージを送信するための
sendMessage メソッドを作成しました。
kafkaTemplate.send メソッドを使用して、
my-topic トピックにメッセージを送信します。 send メソッドは、結果の非同期処理のために
ListenableFuture オブジェクトを返します。
my-topic トピックからメッセージを読み取るように構成します。
KafkaConsumerConfig クラスを作成します。
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }上記のコードでは、
@Configuration## を使用します # アノテーションは宣言しますKafkaConsumerConfig
クラスを構成クラスとして設定し、@EnableKafka
アノテーションを使用して Kafka を有効にします。 次に、
アノテーションを使用して、bootstrap-servers
プロパティと consumer.group-id
プロパティを構成ファイルに挿入します。 次に、Kafka コンシューマの構成を設定するための
メソッドを作成しました。ここでは、5 つのプロパティ BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIG
、AUTO_OFFSET_RESET_CONFIG
、KEY_DESERIALIZER_CLASS_CONFIG
、および VALUE_DESERIALIZER_CLASS_CONFIG
を設定します。 次に、Kafka コンシューマ ファクトリを作成するための
メソッドを作成しました。ここでは、DefaultKafkaConsumerFactory
クラスを使用し、設定を渡しました。 最後に、
インスタンスを作成するための kafkaListenerContainerFactory
メソッドを作成しました。ここでは、コンシューマ ファクトリを kafkaListenerContainerFactory
インスタンスに挿入します。 次に、Kafka コンシューマー クラス
を作成して、my-topic
トピックをリッスンし、メッセージを受信します。上のコードでは、@KafkaListener
アノテーションを使用して、
トピックから読み取られたメッセージを受信するコンシューマ メソッドを宣言します。ここでは、コンシューマ グループ ID を my-group-id
に設定します。 これで、Kafka のプロデューサーとコンシューマーのセットアップが完了しました。
mvn spring-boot:run
コマンドを使用してアプリケーションを起動し、curl コマンドを使用して POST リクエストを
エンドポイントに送信して、カフカへのメッセージ。その後、コンシューマーが受信したメッセージをコンソールで確認できます。これが Spring Boot と Kafka を使用するための基本的な設定です。特定のニーズを満たすために、必要に応じて変更および拡張できます。
以上がSpring Boot が Kafka を統合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。