Home >Java >javaTutorial >How to dynamically specify multiple topics with @KafkaListener in springboot+kafka
This project is a springboot kafak integration project, so it uses the kafak consumption annotation @KafkaListener in springboot
First, configure application.properties separated by commas Multiple topics.
Method: Use Spring’s SpEl expression to configure topics as: @KafkaListener(topics = “#{’${topics}’.split(’ ,’)}”)
Run the program, and the console print effect is as follows:
Because it is only open A consumer thread, so all topics and partitions are assigned to this thread.
If you want to open multiple consumer threads to consume these topics, add the parameter concurrency of the @KafkaListener annotation. The value can be the number of consumers you want (note that consumption The number of partitions must be less than or equal to the total number of partitions of all topics you have opened)
Run the program, and the console printout will be as follows:
How to change the topic while the program is running, so that consumers can consume the modified topic?
ans: After trying, this requirement cannot be achieved using the @KafkaListener annotation. When the program starts, the program will initialize the consumer based on the @KafkaListener annotation information to consume the specified topic. If the topic is modified while the program is running, the consumer will not be allowed to modify the consumer configuration and then re-subscribe to the topic.
However, we can have a compromise, which is to use the topicPattern parameter of @KafkaListener for topic matching.
Use Kafka native client dependency, manually initialize the consumer and start the consumer thread instead of using @KafkaListener.
In the consumer thread, each cycle obtains the latest topic information from the configuration, database or other configuration sources, compares it with the previous topic, and if changes occur, resubscribe to the topic or initialize the consumer.
Add kafka client dependency (this test server kafka version: 2.12-2.4.0)
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
@Service @Slf4j public class KafkaConsumers implements InitializingBean { /** * 消费者 */ private static KafkaConsumer<String, String> consumer; /** * topic */ private List<String> topicList; public static String getNewTopic() { try { return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件) * * @param topicList * @return */ public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) { //配置信息 Properties props = new Properties(); //kafka服务器地址 props.put("bootstrap.servers", "192.168.9.185:9092"); //必须指定消费者组 props.put("group.id", "haha"); //设置数据key和value的序列化处理类 props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); //创建消息者实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //订阅topic的消息 consumer.subscribe(topicList); return consumer; } /** * 开启消费者线程 * 异常请自己根据需求自己处理 */ @Override public void afterPropertiesSet() { // 初始化topic topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) { consumer = getInitConsumer(topicList); // 开启一个消费者线程 new Thread(() -> { while (true) { // 模拟从配置源中获取最新的topic(字符串,逗号隔开) final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); // 如果topic发生变化 if (!topicList.equals(newTopic)) { log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList); // method one:重新订阅topic: topicList = newTopic; consumer.subscribe(newTopic); // method two:关闭原来的消费者,重新初始化一个消费者 //consumer.close(); //topicList = newTopic; //consumer = getInitConsumer(newTopic); continue; } ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("key:" + record.key() + "" + ",value:" + record.value()); } } }).start(); } } }
Let’s talk about it Line 72 of code:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
The above line of code means: wait for Kafka's broker to return data within 100ms. The supermarket parameter specifies how long after poll can return, regardless of whether there is available data or not.
After modifying the topic, you must wait until the messages pulled by this poll are processed, and detect changes in the topic during the while (true) loop before you can re-subscribe to the topic.
poll() method The default number of messages obtained in one pull is: 500, as shown in the figure below, set in the kafka client source code.
If you want to customize this configuration, you can add
running results (test topic) when initializing the consumer There is no data in all)
Note: KafkaConsumer is thread-unsafe. Do not use one KafkaConsumer instance to open multiple consumers. To open multiple consumers, you need new KafkaConsumer instance.
The above is the detailed content of How to dynamically specify multiple topics with @KafkaListener in springboot+kafka. For more information, please follow other related articles on the PHP Chinese website!