首頁 >Java >java教程 >springboot+kafka中@KafkaListener動態指定多個topic怎麼實現

springboot+kafka中@KafkaListener動態指定多個topic怎麼實現

WBOY
WBOY轉載
2023-05-20 20:58:193650瀏覽

說明

本項目為springboot kafak的整合項目,故其用了springboot中對kafak的消費註解@KafkaListener

首先,application.properties中配置用逗號隔開的多個topic。

springboot+kafka中@KafkaListener動態指定多個topic怎麼實現

方法:利用Spring的SpEl表達式,將topics 設定為:@KafkaListener(topics = “#{’${topics}’.split(’ ,’)}”)

springboot+kafka中@KafkaListener動態指定多個topic怎麼實現

運行程序,console打印的效果如下:

springboot+kafka中@KafkaListener動態指定多個topic怎麼實現

因為只開了一條消費者線程,所以所有的topic和分區都分配給這條線程。

如果你想開多條消費者線程去消費這些topic,添加@KafkaListener註解的參數concurrency的值為自己想要的消費者個數即可(注意,消費者數要小於等於你開的所有topic的分區數總和)

springboot+kafka中@KafkaListener動態指定多個topic怎麼實現

#運行程序,console打印的效果如下:

springboot+kafka中@KafkaListener動態指定多個topic怎麼實現

總結大家問的最多的一個問題

如何在程式運作的過程中,改變topic,消費者能夠消費修改後的topic?

ans: 經過嘗試,使用@KafkaListener註解實作不了這個需求,在程式啟動的時候,程式就會根據@KafkaListener的註解資訊初始化好消費者去消費指定好的topic。如果在程式運作的過程中,修改topic,不會讓此消費者修改消費者的配置再重新訂閱topic的。

不過我們可以有個摺疊中的辦法,就是利用@KafkaListener的topicPattern參數來進行topic比對。

終極方法

想法

使用 Kafka 原生客戶依賴,手動初始化消費者並啟動消費者線程,而不是使用 @KafkaListener。

在消費者線程中,每次循環都從配置、資料庫或其他配置來源獲取最新的topic信息,與先前的topic比較,如果發生變化,重新訂閱topic或初始化消費者。

實作

加入kafka客戶端依賴(本次測試服務端kafka版本:2.12-2.4.0)

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.3.0</version>
</dependency>

程式碼

@Service
@Slf4j
public class KafkaConsumers implements InitializingBean {

    /**
     * 消费者
     */
    private static KafkaConsumer<String, String> consumer;
    /**
     * topic
     */
    private List<String> topicList;

    public static String getNewTopic() {
        try {
            return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件)
     *
     * @param topicList
     * @return
     */
    public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "192.168.9.185:9092");
        //必须指定消费者组
        props.put("group.id", "haha");
        //设置数据key和value的序列化处理类
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        //创建消息者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅topic的消息
        consumer.subscribe(topicList);
        return consumer;
    }

    /**
     * 开启消费者线程
     * 异常请自己根据需求自己处理
     */
    @Override
    public void afterPropertiesSet() {
        // 初始化topic
        topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
        if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) {
            consumer = getInitConsumer(topicList);
            // 开启一个消费者线程
            new Thread(() -> {
                while (true) {
                    // 模拟从配置源中获取最新的topic(字符串,逗号隔开)
                    final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
                    // 如果topic发生变化
                    if (!topicList.equals(newTopic)) {
                        log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList);
                        // method one:重新订阅topic:
                        topicList = newTopic;
                        consumer.subscribe(newTopic);
                        // method two:关闭原来的消费者,重新初始化一个消费者
                        //consumer.close();
                        //topicList = newTopic;
                        //consumer = getInitConsumer(newTopic);
                        continue;
                    }
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("key:" + record.key() + "" + ",value:" + record.value());
                    }
                }
            }).start();
        }
    }
}

說一下第72行程式碼:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

上面這行程式碼表示:在100ms內等待Kafka的broker回傳資料.超市參數指定poll在多久之後可以回,不管有沒有可用的資料都要回傳。

在修改topic後,必須等到此次poll拉取的訊息處理完,while(true)循環的時候偵測topic發生變化,才能重新訂閱topic.

poll()方法一次拉取得訊息數預設為:500,如下圖,kafka客戶端原始碼中設定的。

springboot+kafka中@KafkaListener動態指定多個topic怎麼實現

如果想自訂此配置,可在初始化消費者時加入

springboot+kafka中@KafkaListener動態指定多個topic怎麼實現

執行結果(測試的topic中都無資料)

springboot+kafka中@KafkaListener動態指定多個topic怎麼實現

注意:KafkaConsumer是線程不安全的,不要用一個KafkaConsumer實例開啟多個消費者,要開啟多個消費者,需要new 多個KafkaConsumer實例。

以上是springboot+kafka中@KafkaListener動態指定多個topic怎麼實現的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除