spring-kafka 是基於java版的kafka client與spring的集成,提供了KafkaTemplate,封裝了各種方法,方便操作,它封裝了apache的kafka-client,不需要再導入client依賴
<!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
YML配置
kafka: #bootstrap-servers: server1:9092,server2:9093 #kafka开发地址, #生产者配置 producer: # Kafka提供的序列化和反序列化类 key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化 value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 1 # 消息发送重试次数 #acks = 0:设置成 表示 producer 完全不理睬 leader broker 端的处理结果。此时producer 发送消息后立即开启下 条消息的发送,根本不等待 leader broker 端返回结果 #acks= all 或者-1 :表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给,消息安全但是吞吐量会比较低。 #acks = 1:默认的参数值。 producer 发送消息后 leader broker 仅将该消息写入本地日志,然后便发送响应结果给producer ,而无须等待其他副本写入该消息。折中方案,只要leader一直活着消息就不会丢失,同时也保证了吞吐量 acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) batch-size: 16384 #批量大小 properties: linger: ms: 0 #提交延迟 buffer-memory: 33554432 # 生产端缓冲区大小 # 消费者配置 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 分组名称 group-id: web enable-auto-commit: false #提交offset延时(接收到消息后多久提交offset) # auto-commit-interval: 1000ms #当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset; # latest:重置为分区中最新的offset(消费分区中新产生的数据); # none:只要有一个分区不存在已提交的offset,就抛出异常; auto-offset-reset: latest properties: #消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) session.timeout.ms: 15000 #消费请求超时时间 request.timeout.ms: 18000 #批量消费每次最多消费多少条消息 #每次拉取一条,一条条消费,当然是具体业务状况设置 max-poll-records: 1 # 指定心跳包发送频率,即间隔多长时间发送一次心跳包,优化该值的设置可以减少Rebalance操作,默认时间为3秒; heartbeat-interval: 6000 # 发出请求时传递给服务器的 ID。用于服务器端日志记录 正常使用后解开注释,不然只有一个节点会报错 #client-id: mqtt listener: #消费端监听的topic不存在时,项目启动会报错(关掉) missing-topics-fatal: false #设置消费类型 批量消费 batch,单条消费:single type: single #指定容器的线程数,提高并发量 #concurrency: 3 #手动提交偏移量 manual达到一定数据后批量提交 #ack-mode: manual ack-mode: MANUAL_IMMEDIATE #手動確認消息 # 认证 #properties: #security: #protocol: SASL_PLAINTEXT #sasl: #mechanism: SCRAM-SHA-256 #jaas:config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
#簡單工具類,能滿足正常使用,主題是無法修改的
@Component @Slf4j public class KafkaUtils<K, V> { @Autowired private KafkaTemplate kafkaTemplate; @Value("${spring.kafka.bootstrap-servers}") String[] servers; /** * 获取连接 * @return */ private Admin getAdmin() { Properties properties = new Properties(); properties.put("bootstrap.servers", servers); // 正式环境需要添加账号密码 return Admin.create(properties); } /** * 增加topic * * @param name 主题名字 * @param partition 分区数量 * @param replica 副本数量 * @date 2022-06-23 chens */ public R addTopic(String name, Integer partition, Integer replica) { Admin admin = getAdmin(); if (replica > servers.length) { return R.error("副本数量不允许超过Broker数量"); } try { NewTopic topic = new NewTopic(name, partition, Short.parseShort(replica.toString())); admin.createTopics(Collections.singleton(topic)); } finally { admin.close(); } return R.ok(); } /** * 删除主题 * * @param names 主题名字集合 * @date 2022-06-23 chens */ public void deleteTopic(List<String> names) { Admin admin = getAdmin(); try { admin.deleteTopics(names); } finally { admin.close(); } } /** * 查询所有主题 * * @date 2022-06-24 chens */ public Set<String> queryTopic() { Admin admin = getAdmin(); try { ListTopicsResult topics = admin.listTopics(); Set<String> set = topics.names().get(); return set; } catch (Exception e) { log.error("查询主题错误!"); } finally { admin.close(); } return null; } // 向所有分区发送消息 public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) { return kafkaTemplate.send(topic, data); } // 指定key发送消息,相同key保证消息在同一个分区 public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) { return kafkaTemplate.send(topic, key, data); } // 指定分区和key发送。 public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) { return kafkaTemplate.send(topic, partition, key, data); } }
發送訊息使用非同步
@GetMapping("/{topic}") public String test(@PathVariable String topic, @PathVariable Long index) throws ExecutionException, InterruptedException { ListenableFuture future = null; Chenshuang user = new Chenshuang(i, "陈爽", "123456", new Date()); String s = JSON.toJSONString(user); KafkaUtils utils = new KafkaUtils(); future = kafkaUtils.send(topic, s); // 异步回调,同步get,会等待 不推荐同步! future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { System.out.println("发送失败"); } @Override public void onSuccess(Object result) { System.out.println("发送成功:" + result); } }); return "发送成功"; }
建立主題
如果broker端設定auto.create.topics.enable為true(預設為true),當收到客戶端的元資料要求時則會建立topic。
向一個不存在的主題發送和消費都會創建一個新的主題,很多時候,非預期的創建主題,會導致很多意想不到的問題,建議關掉該特性。
Topic主題用來區分不同類型的訊息,實際上也就是適用於不同的業務場景,預設訊息儲存一週時間;
同一個Topic主題下,預設是一個partition分區,也就是只能有一個消費者來消費,如果想提升消費能力,就需要增加分區;
同一個Topic的多個分區,可以有三種方式分派訊息(key,value)到不同的分區,指定分區、HASH路由、默認,同一個分區內的消息ID唯一,併順序;
消費者消費partition分區內的消息時,是透過offsert來標識訊息的位置;
GroupId用來解決同一個Topic主題下重複消費問題,例如一條消費需要多個消費者接收到,就可以透過設定不同的GroupId實現,
實際訊息是存一份的,只是透過邏輯上設定標識來區分,系統會記錄Topic主題下–》GroupId分組下–》partition分區下的offsert,來識別是否消費過。
發送訊息的高可用—
集群模式,多副本方式實現;一則訊息的提交,可能透過設定acks標識實現不同的可用性,=0時,發送成功就OK ;=1時,master成功響應才OK,=all時,一半以上的響應才OK(真正的高可用)
消費訊息的高可用—
#可以關閉自動標識offsert模式,先拉取訊息,消費完成後,再去設定offsert位置,來解決消費高可用
import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaTopic { // yml自定义主题,项目启动就创建, @Value("${spring.kafka.topic}") String topic; @Value("${spring.kafka.bootstrap-servers}") String[] server; /** * 项目启动 初始化主题,如果存在不会覆盖主题的 */ @Bean public NewTopic batchTopic() { // 最大复制因子 <= 经纪人broker数量. return new NewTopic(topic, 10, (short) server.length); } }
監聽類,一條訊息,各分組內的消費者只有一個消費者消費一次,如果訊息在1區,指定分區1監聽也會消費
也可以同個方法監聽不同的主題,指定位移監聽
同組會均勻消費,不同組會重複消費。
(1)topic只有1個partition,該群組內有多個消費者時,此時同一個partition內的消息只能被該組中的一個consumer消費。當消費者數量多於partition數量時,多餘的消費者是處於空閒狀態的,如圖1所示。 topic,test只有一個partition,而且只有1個group,G1,該group內有多個consumer,只能被其中一個消費者消費,其他的處於空閒狀態。
圖一
(2)該topic有多個partition,該群組內有多個消費者,例如test 有3個partition,該組內有2個消費者,那麼可能就是C0對應消費p0,p1內的數據,c1對應消費p2的數據;如果有3個消費者,就是一個消費者對應消費一個partition內的數據了。圖解分別如圖2,圖3.這種模式在集群模式下使用是非常普遍的,比如我們可以起3個服務,對應的topic設置3個partiition,這樣就可以實現並行消費,大大提高處理消息的效率。
圖二
#圖三
如果想實現廣播的模式就需要設定多個消費者群組,這樣當一個消費者群組消費完這個訊息後,絲毫不影響其他群組內的消費者進行消費,這就是廣播的概念。
(1)多個消費者群組,1個partition
該topic內的資料被多個消費者群組同時消費,當某個消費者群組有多個消費者時也只能被一個消費者消費,如圖4所示:
圖四
(2)多個消費者群組,多個partition
該topic內的資料可被多個消費者群組多次消費,在一個消費者群組內,每個消費者又可對應該topic內的一個或多個partition並行消費,如圖五:
#注意: 消费者的数量并不能决定一个topic的并行度。它是由分区的数目决定的。
再多的消费者,分区数少,也是浪费!
一个组的最大并行度将等于该主题的分区数。
@Component @Slf4j public class Consumer { // 监听主题 分组a @KafkaListener(topics =("${spring.kafka.topic}") ,groupId = "a") public void getMessage(ConsumerRecord message, Acknowledgment ack) { //确认收到消息 ack.acknowledge(); } // 监听主题 分组a @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "a") public void getMessage2(ConsumerRecord message, Acknowledgment ack) { //确认收到消息 ack.acknowledge(); } // 监听主题 分组b @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b") public void getMessage3(ConsumerRecord message, Acknowledgment ack) { //确认收到消息//确认收到消息 ack.acknowledge(); } // 监听主题 分组b @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b") public void getMessage4(ConsumerRecord message, Acknowledgment ack) { //确认收到消息//确认收到消息 ack.acknowledge(); } // 指定监听分区1的消息 @KafkaListener(topicPartitions = {@TopicPartition(topic = ("${spring.kafka.topic}"),partitions = {"1"})}) public void getMessage5(ConsumerRecord message, Acknowledgment ack) { Long id = JSONObject.parseObject(message.value().toString()).getLong("id"); //确认收到消息//确认收到消息 ack.acknowledge(); } /** * @Title 指定topic、partition、offset消费 * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8 * 注意:topics和topicPartitions不能同时使用; **/ @KafkaListener(id = "c1",groupId = "c",topicPartitions = { @TopicPartition(topic = "t1", partitions = { "0" }), @TopicPartition(topic = "t2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))}) public void getMessage6(ConsumerRecord record,Acknowledgment ack) { //确认收到消息 ack.acknowledge(); } /** * 批量消费监听goods变更消息 * yml配置listener:type 要改为batch * ymk配置consumer:max-poll-records: ??(每次拉取多少条数据消费) * concurrency = "2" 启动多少线程执行,应小于等于broker数量,避免资源浪费 */ @KafkaListener(id="sync-modify-goods", topics = "${spring.kafka.topic}",concurrency = "4") public void getMessage7(List<ConsumerRecord<String, String>> records){ for (ConsumerRecord<String, String> msg:records) { GoodsChangeMsg changeMsg = null; try { changeMsg = JSONObject.parseObject(msg.value(), GoodsChangeMsg.class); syncGoodsProcessor.handle(changeMsg); }catch (Exception exception) { log.error("解析失败{}", msg, exception); } } } }
以上是SpringBoot怎麼整合Kafka配置工具類的詳細內容。更多資訊請關注PHP中文網其他相關文章!