Heim  >  Artikel  >  Java  >  So geben Sie mit @KafkaListener in Springboot + Kafka dynamisch mehrere Themen an

So geben Sie mit @KafkaListener in Springboot + Kafka dynamisch mehrere Themen an

WBOY
WBOYnach vorne
2023-05-20 20:58:193580Durchsuche

Beschreibung

Dieses Projekt ist ein Springboot + Kafak-Integrationsprojekt und verwendet daher die Kafak-Verbrauchsanmerkung @KafkaListener in Springboot.

Konfigurieren Sie zunächst mehrere durch Kommas getrennte Themen in application.properties.

So geben Sie mit @KafkaListener in Springboot + Kafka dynamisch mehrere Themen an

Methode: Verwenden Sie den SpEl-Ausdruck von Spring, um Themen wie folgt zu konfigurieren: @KafkaListener(topics = „#{’${topics}’.split(’,’)}“)

So geben Sie mit @KafkaListener in Springboot + Kafka dynamisch mehrere Themen an

Run Das Programm und der Konsolendruckeffekt sind wie folgt:

So geben Sie mit @KafkaListener in Springboot + Kafka dynamisch mehrere Themen an

Da nur ein Verbraucherthread geöffnet ist, werden alle Themen und Partitionen diesem Thread zugewiesen.

Wenn Sie mehrere Verbraucherthreads öffnen möchten, um diese Themen zu konsumieren, fügen Sie den Parameter concurrency der @KafkaListener-Annotation zur Anzahl der gewünschten Verbraucher hinzu (beachten Sie, dass die Anzahl der Verbraucher kleiner oder gleich der Anzahl sein muss). Verbraucher, die Sie möchten) Die Summe der Anzahl der Partitionen aller Themen Ändern Sie das Thema und konsumieren Sie es, während das Programm ausgeführt wird?

So geben Sie mit @KafkaListener in Springboot + Kafka dynamisch mehrere Themen an

ans:

Nach dem Versuch kann diese Anforderung nicht mit der Annotation @KafkaListener erfüllt werden Consumer basierend auf den @KafkaListener-Annotationsinformationen, um das angegebene Thema zu konsumieren. Wenn das Thema während der Ausführung des Programms geändert wird, ist es dem Verbraucher nicht gestattet, die Verbraucherkonfiguration zu ändern und das Thema anschließend erneut zu abonnieren.

Aber wir können einen Kompromiss finden, der darin besteht, den topicPattern-Parameter von @KafkaListener für den Themenabgleich zu verwenden. So geben Sie mit @KafkaListener in Springboot + Kafka dynamisch mehrere Themen an

Ultimative Methode

Idee

Verwenden Sie die native Client-Abhängigkeit von Kafka, initialisieren Sie Verbraucher manuell und starten Sie Verbraucher-Threads, anstatt @KafkaListener zu verwenden.

Im Verbraucherthread ruft jeder Zyklus die neuesten Themeninformationen aus der Konfiguration, Datenbank oder anderen Konfigurationsquellen ab, vergleicht sie mit dem vorherigen Thema und abonniert das Thema erneut oder initialisiert den Verbraucher, wenn Änderungen auftreten.

Implementierung

Kafka-Client-Abhängigkeit hinzufügen (diese Testserver-Kafka-Version: 2.12-2.4.0)

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.3.0</version>
</dependency>

Code

@Service
@Slf4j
public class KafkaConsumers implements InitializingBean {

    /**
     * 消费者
     */
    private static KafkaConsumer<String, String> consumer;
    /**
     * topic
     */
    private List<String> topicList;

    public static String getNewTopic() {
        try {
            return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件)
     *
     * @param topicList
     * @return
     */
    public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "192.168.9.185:9092");
        //必须指定消费者组
        props.put("group.id", "haha");
        //设置数据key和value的序列化处理类
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        //创建消息者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅topic的消息
        consumer.subscribe(topicList);
        return consumer;
    }

    /**
     * 开启消费者线程
     * 异常请自己根据需求自己处理
     */
    @Override
    public void afterPropertiesSet() {
        // 初始化topic
        topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
        if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) {
            consumer = getInitConsumer(topicList);
            // 开启一个消费者线程
            new Thread(() -> {
                while (true) {
                    // 模拟从配置源中获取最新的topic(字符串,逗号隔开)
                    final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
                    // 如果topic发生变化
                    if (!topicList.equals(newTopic)) {
                        log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList);
                        // method one:重新订阅topic:
                        topicList = newTopic;
                        consumer.subscribe(newTopic);
                        // method two:关闭原来的消费者,重新初始化一个消费者
                        //consumer.close();
                        //topicList = newTopic;
                        //consumer = getInitConsumer(newTopic);
                        continue;
                    }
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("key:" + record.key() + "" + ",value:" + record.value());
                    }
                }
            }).start();
        }
    }
}

Lassen Sie uns über die 72. Codezeile sprechen:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

Die obige Codezeile bedeutet: in 100 ms Warten Sie, bis der Kafka-Broker Daten zurückgibt. Der Supermarktparameter gibt an, wie lange es nach der Abfrage dauern kann, unabhängig davon, ob Daten verfügbar sind oder nicht.

Nachdem Sie das Thema geändert haben, müssen Sie warten, bis die von dieser Umfrage abgerufenen Nachrichten verarbeitet wurden, und Änderungen im Thema während der while-Schleife (true) erkennen, bevor Sie das Thema erneut abonnieren können Die Methode poll () lautet zu einem Zeitpunkt: 500, wie unten gezeigt, wird im Quellcode des Kafka-Clients festgelegt.

Wenn Sie diese Konfiguration anpassen möchten, können Sie die

Laufergebnisse beim Initialisieren des Verbrauchers hinzufügen (im getesteten Thema sind keine Daten vorhanden)

Hinweis: KafkaConsumer ist threadunsicher Verwenden Sie nicht eine KafkaConsumer-Instanz, um mehrere Verbraucher zu öffnen. Um mehrere Verbraucher zu öffnen, müssen Sie mehrere neue KafkaConsumer-Instanzen erstellen.

Das obige ist der detaillierte Inhalt vonSo geben Sie mit @KafkaListener in Springboot + Kafka dynamisch mehrere Themen an. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen