ホームページ  >  記事  >  Java  >  SpringBoot が Kafka 構成ツール クラスを統合する方法

SpringBoot が Kafka 構成ツール クラスを統合する方法

WBOY
WBOY転載
2023-05-12 21:58:111322ブラウズ

spring-kafka は、java バージョンの kafka クライアントと spring の統合に基づいています。簡単に操作できるようにさまざまなメソッドをカプセル化する KafkaTemplate を提供します。Apache の kafka クライアントをカプセル化し、クライアントの依存関係をインポートする必要はありません。

<!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

YML 構成

kafka:
    #bootstrap-servers: server1:9092,server2:9093 #kafka开发地址,
    #生产者配置
    producer:
      # Kafka提供的序列化和反序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 1 # 消息发送重试次数
      #acks = 0:设置成 表示 producer 完全不理睬 leader broker 端的处理结果。此时producer 发送消息后立即开启下 条消息的发送,根本不等待 leader broker 端返回结果
      #acks= all 或者-1 :表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给,消息安全但是吞吐量会比较低。
      #acks = 1:默认的参数值。 producer 发送消息后 leader broker 仅将该消息写入本地日志,然后便发送响应结果给producer ,而无须等待其他副本写入该消息。折中方案,只要leader一直活着消息就不会丢失,同时也保证了吞吐量
      acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      batch-size: 16384 #批量大小
      properties:
        linger:
          ms: 0 #提交延迟
      buffer-memory: 33554432 # 生产端缓冲区大小
    # 消费者配置
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 分组名称
      group-id: web
      enable-auto-commit: false
      #提交offset延时(接收到消息后多久提交offset)
      # auto-commit-interval: 1000ms
      #当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset;
      # latest:重置为分区中最新的offset(消费分区中新产生的数据);
      # none:只要有一个分区不存在已提交的offset,就抛出异常;
      auto-offset-reset: latest
      properties:
        #消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
        session.timeout.ms: 15000
        #消费请求超时时间
        request.timeout.ms: 18000
      #批量消费每次最多消费多少条消息
      #每次拉取一条,一条条消费,当然是具体业务状况设置
      max-poll-records: 1
      # 指定心跳包发送频率,即间隔多长时间发送一次心跳包,优化该值的设置可以减少Rebalance操作,默认时间为3秒;
      heartbeat-interval: 6000
      # 发出请求时传递给服务器的 ID。用于服务器端日志记录 正常使用后解开注释,不然只有一个节点会报错
      #client-id: mqtt
    listener:
      #消费端监听的topic不存在时,项目启动会报错(关掉)
      missing-topics-fatal: false
      #设置消费类型 批量消费 batch,单条消费:single
      type: single
      #指定容器的线程数,提高并发量
      #concurrency: 3
      #手动提交偏移量 manual达到一定数据后批量提交
      #ack-mode: manual
      ack-mode: MANUAL_IMMEDIATE #手動確認消息
        # 认证
    #properties:
      #security:
        #protocol: SASL_PLAINTEXT
      #sasl:
        #mechanism: SCRAM-SHA-256
        #jaas:config: &#39;org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";&#39;

シンプルなツール クラス、通常の使用に対応、テーマは変更不可

@Component
@Slf4j
public class KafkaUtils<K, V> {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @Value("${spring.kafka.bootstrap-servers}")
    String[] servers;

    /**
     * 获取连接
     * @return
     */
    private Admin getAdmin() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", servers);
        // 正式环境需要添加账号密码
        return Admin.create(properties);
    }

    /**
     * 增加topic
     *
     * @param name      主题名字
     * @param partition 分区数量
     * @param replica   副本数量
     * @date 2022-06-23 chens
     */
    public R addTopic(String name, Integer partition, Integer replica) {
        Admin admin = getAdmin();
        if (replica > servers.length) {
            return R.error("副本数量不允许超过Broker数量");
        }
        try {
            NewTopic topic = new NewTopic(name, partition, Short.parseShort(replica.toString()));
            admin.createTopics(Collections.singleton(topic));
        } finally {
            admin.close();
        }
        return R.ok();
    }

    /**
     * 删除主题
     *
     * @param names 主题名字集合
     * @date 2022-06-23 chens
     */
    public void deleteTopic(List<String> names) {
        Admin admin = getAdmin();
        try {
            admin.deleteTopics(names);
        } finally {
            admin.close();
        }
    }

    /**
     * 查询所有主题
     *
     * @date 2022-06-24 chens
     */
    public Set<String> queryTopic() {
        Admin admin = getAdmin();
        try {
            ListTopicsResult topics = admin.listTopics();
            Set<String> set = topics.names().get();
            return set;
        } catch (Exception e) {
            log.error("查询主题错误!");
        } finally {
            admin.close();
        }
        return null;
    }

    // 向所有分区发送消息
    public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
        return kafkaTemplate.send(topic, data);
    }
    
    // 指定key发送消息,相同key保证消息在同一个分区
    public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
        return kafkaTemplate.send(topic, key, data);
    }

    // 指定分区和key发送。
    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
        return kafkaTemplate.send(topic, partition, key, data);
    }
}

送信メッセージ 非同期の使用

@GetMapping("/{topic}")
    public String test(@PathVariable String topic, @PathVariable Long index) throws ExecutionException, InterruptedException {

        ListenableFuture future = null;
        Chenshuang user = new Chenshuang(i, "陈爽", "123456", new Date());
        String s = JSON.toJSONString(user);
        KafkaUtils utils = new KafkaUtils();
        future = kafkaUtils.send(topic, s);
        // 异步回调,同步get,会等待 不推荐同步!
        future.addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送失败");
            }
            @Override
            public void onSuccess(Object result) {
                System.out.println("发送成功:" + result);
            }
        });
        return "发送成功";
    }

トピックの作成

ブローカーが auto.create.topics.enable を true に設定した場合 (デフォルトは true)、メッセージを受信するときに使用されます。クライアントからのメタデータ要求 トピックを作成します。

存在しないトピックを送信して利用すると、新しいトピックが作成されます。多くの場合、予期しないトピックが作成されると、多くの予期せぬ問題が発生します。この機能をオフにすることをお勧めします。

トピック トピックは、さまざまなタイプのメッセージを区別するために使用されます。実際、これらはさまざまなビジネス シナリオに適しています。デフォルトでは、メッセージは 1 週間保存されます。

同じトピック トピックの下で、デフォルトはパーティションです。つまり、消費できるコンシューマは 1 つだけです。消費容量を向上させたい場合は、パーティションを追加する必要があります。

同じトピックの複数のパーティションには、メッセージ (キー、値) を別のパーティションに分散する 3 つの方法、パーティションの指定、ハッシュ ルーティング、デフォルト、同じパーティション内のメッセージ ID は一意であり、順序付けられています;

コンシューマーがパーティション パーティション内のメッセージを消費する場合、メッセージの場所を識別するためにオフセットを使用します;

GroupId は、同じトピックの下で繰り返し消費される問題を解決するために使用されます。たとえば、消費を複数のコンシューマーが受信する必要がある場合は、次のようにすることができます。異なる GroupId を設定することで実現されます。

実際のメッセージは 1 つのコピーに保存されます。識別子を論理的に設定することによってのみ区別されます。システムは、Topic トピック -> GroupId グループ - およびパーティション パーティションの下にオフセットを記録します。消費されたかどうかを特定します。

メッセージ送信の高可用性 - クラスター モード、マルチコピーの実装。メッセージの送信は、acks 識別子を設定することで異なる可用性を実現できます。=0 の場合、送信が成功すれば OK です。;= 1、マスターは OK になる前に正常に応答し、=all、応答の半分以上が OK (実際の高可用性)

コンシューマー メッセージの高可用性 -- 自動識別オフサート モードをオフにすることができます。消費が完了した後、最初にメッセージをプルし、消費の高可用性を解決するためにオフセット位置を設定します。

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaTopic {
    // yml自定义主题,项目启动就创建,
    @Value("${spring.kafka.topic}")
    String topic;
    @Value("${spring.kafka.bootstrap-servers}")
    String[] server;
    /**
     * 项目启动 初始化主题,如果存在不会覆盖主题的
     */
    @Bean
    public NewTopic batchTopic() {
        // 最大复制因子 <= 经纪人broker数量.
        return new NewTopic(topic, 10, (short) server.length);
    }
}

リスニング クラス、メッセージ。メッセージが監視するパーティション 1 を指定すると、また消費されます。

同じ方法で異なるトピックを監視し、ディスプレイスメント監視を指定することもできます。

同じグループは均等に消費し、異なるグループは繰り返し消費します。

1. ユニキャスト モード、コンシューマ グループは 1 つだけです

(1) トピックにはパーティションが 1 つだけあり、グループ内に複数のコンシューマがある場合、同じパーティション内のメッセージはグループ内の 1 人の消費者のみが消費できます。図 1 に示すように、コンシューマの数がパーティションの数を超えると、超過したコンシューマはアイドル状態になります。トピックとテストには、パーティションが 1 つとグループ G1 が 1 つだけあります。このグループには複数のコンシューマがあり、そのうちの 1 つだけが使用でき、他のコンシューマはアイドル状態です。

図 1

SpringBoot が Kafka 構成ツール クラスを統合する方法 (2) トピックには複数のパーティションがあり、グループ内には複数のコンシューマがあります。たとえば、test には 3 つのパーティションがあります。グループ内に 2 つのコンシューマーがいる場合、C0 は p0 と p1 のデータの消費に対応し、c1 は p2 のデータの消費に対応します。コンシューマーが 3 人の場合、1 つのコンシューマーは 1 つのパーティション内のデータの消費に対応します。図を図 2 と図 3 に示します。このモードはクラスター モードで非常に一般的です。たとえば、3 つのサービスを開始し、対応するトピックに 3 つのパーティションを設定できるため、並列消費が達成され、メッセージ処理の効率が向上します。効率を大幅に向上させることができます。

#図 2

SpringBoot が Kafka 構成ツール クラスを統合する方法

図 3

2. ブロードキャスト モード、複数のコンシューマー グループ SpringBoot が Kafka 構成ツール クラスを統合する方法

ブロードキャスト モードを実装したい場合は、1 つのコンシューマ グループがメッセージを消費した後、他のグループのコンシューマの消費にまったく影響を及ぼさないように、複数のコンシューマ グループを設定する必要があります。これがコンセプトです。放送の

(1) 複数のコンシューマ グループ、1 つのパーティション

このトピックのデータは、複数のコンシューマ グループによって同時に消費されます。コンシューマ グループに複数のコンシューマがある場合、それを消費できるのは複数のコンシューマ グループのみです。図 4 に示すように、1 つのコンシューマ:

図 4

(2) 複数のコンシューマ グループ、複数のパーティション SpringBoot が Kafka 構成ツール クラスを統合する方法

このトピックは、複数のコンシューマ グループによって複数回使用できます。図 5 に示すように、コンシューマ グループ内では、各コンシューマはトピック内の 1 つ以上のパーティションに対応して並行して使用できます。 ##

注意: 消费者的数量并不能决定一个topic的并行度。它是由分区的数目决定的。
再多的消费者,分区数少,也是浪费!
一个组的最大并行度将等于该主题的分区数。

@Component
@Slf4j
public class Consumer {
    // 监听主题 分组a
    @KafkaListener(topics =("${spring.kafka.topic}") ,groupId = "a")
    public  void  getMessage(ConsumerRecord message, Acknowledgment ack) {
        //确认收到消息
        ack.acknowledge();
    }
    // 监听主题 分组a
    @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "a")
    public  void getMessage2(ConsumerRecord message, Acknowledgment ack) {
        //确认收到消息
        ack.acknowledge();
    }
    // 监听主题 分组b
    @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b")
    public  void getMessage3(ConsumerRecord message, Acknowledgment ack) {
        //确认收到消息//确认收到消息
        ack.acknowledge();
    }
    // 监听主题 分组b
    @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b")
    public  void getMessage4(ConsumerRecord message, Acknowledgment ack) {
        //确认收到消息//确认收到消息
        ack.acknowledge();
    }

    // 指定监听分区1的消息
    @KafkaListener(topicPartitions = {@TopicPartition(topic = ("${spring.kafka.topic}"),partitions = {"1"})})
    public void getMessage5(ConsumerRecord message, Acknowledgment ack) {
        Long id = JSONObject.parseObject(message.value().toString()).getLong("id");
        //确认收到消息//确认收到消息
        ack.acknowledge();
    }
    
    /**
     * @Title 指定topic、partition、offset消费
     * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
     * 注意:topics和topicPartitions不能同时使用;
     **/
    @KafkaListener(id = "c1",groupId = "c",topicPartitions = {
            @TopicPartition(topic = "t1", partitions = { "0" }),
            @TopicPartition(topic = "t2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))})
    public void getMessage6(ConsumerRecord record,Acknowledgment ack) {
        //确认收到消息
        ack.acknowledge();
    }
    
    /**        
     * 批量消费监听goods变更消息
     * yml配置listener:type 要改为batch
     * ymk配置consumer:max-poll-records: ??(每次拉取多少条数据消费)
     * concurrency = "2" 启动多少线程执行,应小于等于broker数量,避免资源浪费
     */
    @KafkaListener(id="sync-modify-goods", topics = "${spring.kafka.topic}",concurrency = "4")
    public void getMessage7(List<ConsumerRecord<String, String>> records){
        for (ConsumerRecord<String, String> msg:records) {
            GoodsChangeMsg changeMsg = null;
            try {
                changeMsg = JSONObject.parseObject(msg.value(), GoodsChangeMsg.class);
                syncGoodsProcessor.handle(changeMsg);
            }catch (Exception exception) {
                log.error("解析失败{}", msg, exception);
            }
        }
    }
}

以上がSpringBoot が Kafka 構成ツール クラスを統合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。