ホームページ >Java >&#&チュートリアル >Spring Boot が Kafka を統合する方法

Spring Boot が Kafka を統合する方法

WBOY
WBOY転載
2023-06-02 14:18:351498ブラウズ

ステップ 1: 依存関係を追加する

pom.xml に次の依存関係を追加します:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.0</version>
</dependency>

ステップ 2: Kafka を構成する

application.yml で 次の設定をファイルに追加します:

sping:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

ここでは、Kafka のサービス アドレスを localhost:9092 として設定し、コンシューマ グループ ID を my-group として設定します。そして、メッセージを読み取るための最も早いオフセットを設定します。プロデューサ側では、メッセージ シリアライザーを StringSerializer として構成しました。

ステップ 3: プロデューサーを作成する

次に、Kafka サーバーにメッセージを送信するための Kafka プロデューサーを作成する必要があります。ここでは、POST リクエストを受信し、Kafka にメッセージを送信するための RESTful API エンドポイントを作成します。

最初に、Kafka プロデューサーを構成するための KafkaProducerConfig クラスを作成します。

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

上記のコードでは、@Configuration## を使用します # アノテーションは宣言しますKafkaProducerConfig クラスを構成クラスとして使用します。次に、@Value 注釈を使用して、構成ファイルに bootstrap-servers プロパティを挿入します。

次に、Kafka プロデューサの構成を設定するための

ProducerConfigs メソッドを作成しました。ここでは、BOOTSTRAP_SERVERS_CONFIGKEY_SERIALIZER_CLASS_CONFIG、および VALUE_SERIALIZER_CLASS_CONFIG プロパティを設定します。

次に、Kafka プロデューサー ファクトリを作成するための

ProducerFactory メソッドを作成しました。ここでは、DefaultKafkaProducerFactory クラスを使用し、設定を渡しました。

最後に、

KafkaTemplate インスタンスを作成するための kafkaTemplate メソッドを作成しました。ここでは、作成したばかりのプロデューサー ファクトリをパラメータとして使用し、KafkaTemplate インスタンスを返します。

次に、POST リクエストを受信し、Kafka にメッセージを送信する RESTful エンドポイントを作成します。ここでは、

@RestController アノテーションを使用して RESTful コントローラーを作成します。

@RestController
public class KafkaController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send("my-topic", message);
    }
}

上記のコードでは、

@Autowired アノテーションを KafkaTemplate に使用します。 インスタンスは KafkaController クラスに挿入されます。次に、Kafka にメッセージを送信するための sendMessage メソッドを作成しました。

ここでは、

kafkaTemplate.send メソッドを使用して、my-topic トピックにメッセージを送信します。 send メソッドは、結果の非同期処理のために ListenableFuture オブジェクトを返します。

ステップ 4: コンシューマーの作成

次に、Kafka サーバーからメッセージを受信する Kafka コンシューマーを作成します。ここでは、コンシューマ グループを作成し、

my-topic トピックからメッセージを読み取るように構成します。

最初に、Kafka コンシューマーを構成するための

KafkaConsumerConfig クラスを作成します。

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

上記のコードでは、

@Configuration## を使用します # アノテーションは宣言しますKafkaConsumerConfig クラスを構成クラスとして設定し、@EnableKafka アノテーションを使用して Kafka を有効にします。 次に、

@Value

アノテーションを使用して、bootstrap-servers プロパティと consumer.group-id プロパティを構成ファイルに挿入します。 次に、Kafka コンシューマの構成を設定するための

consumerConfigs

メソッドを作成しました。ここでは、5 つのプロパティ BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIGAUTO_OFFSET_RESET_CONFIGKEY_DESERIALIZER_CLASS_CONFIG、および VALUE_DESERIALIZER_CLASS_CONFIG を設定します。 次に、Kafka コンシューマ ファクトリを作成するための

consumerFactory

メソッドを作成しました。ここでは、DefaultKafkaConsumerFactory クラスを使用し、設定を渡しました。 最後に、

ConcurrentKafkaListenerContainerFactory

インスタンスを作成するための kafkaListenerContainerFactory メソッドを作成しました。ここでは、コンシューマ ファクトリを kafkaListenerContainerFactory インスタンスに挿入します。 次に、Kafka コンシューマー クラス

KafkaConsumer

を作成して、my-topic トピックをリッスンし、メッセージを受信します。上のコードでは、@KafkaListener アノテーションを使用して、

my-topic

トピックから読み取られたメッセージを受信するコンシューマ メソッドを宣言します。ここでは、コンシューマ グループ ID を my-group-id に設定します。 これで、Kafka のプロデューサーとコンシューマーのセットアップが完了しました。 mvn spring-boot:run コマンドを使用してアプリケーションを起動し、curl コマンドを使用して POST リクエストを

http://localhost:8080/send

エンドポイントに送信して、カフカへのメッセージ。その後、コンシューマーが受信したメッセージをコンソールで確認できます。これが Spring Boot と Kafka を使用するための基本的な設定です。特定のニーズを満たすために、必要に応じて変更および拡張できます。

以上がSpring Boot が Kafka を統合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。