spring-kafka는 Kafka 클라이언트의 Java 버전과 spring의 통합을 기반으로 하며 KafkaTemplate을 제공합니다. KafkaTemplate은 Apache의 Kafka 클라이언트를 캡슐화하고 클라이언트 종속성을 가져올 필요가 없습니다. YML 구성
<!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>일반적인 용도에 맞는 간단한 도구 클래스입니다. 주제는 수정할 수 없습니다.
kafka: #bootstrap-servers: server1:9092,server2:9093 #kafka开发地址, #生产者配置 producer: # Kafka提供的序列化和反序列化类 key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化 value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 1 # 消息发送重试次数 #acks = 0:设置成 表示 producer 完全不理睬 leader broker 端的处理结果。此时producer 发送消息后立即开启下 条消息的发送,根本不等待 leader broker 端返回结果 #acks= all 或者-1 :表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给,消息安全但是吞吐量会比较低。 #acks = 1:默认的参数值。 producer 发送消息后 leader broker 仅将该消息写入本地日志,然后便发送响应结果给producer ,而无须等待其他副本写入该消息。折中方案,只要leader一直活着消息就不会丢失,同时也保证了吞吐量 acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) batch-size: 16384 #批量大小 properties: linger: ms: 0 #提交延迟 buffer-memory: 33554432 # 生产端缓冲区大小 # 消费者配置 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 分组名称 group-id: web enable-auto-commit: false #提交offset延时(接收到消息后多久提交offset) # auto-commit-interval: 1000ms #当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset; # latest:重置为分区中最新的offset(消费分区中新产生的数据); # none:只要有一个分区不存在已提交的offset,就抛出异常; auto-offset-reset: latest properties: #消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) session.timeout.ms: 15000 #消费请求超时时间 request.timeout.ms: 18000 #批量消费每次最多消费多少条消息 #每次拉取一条,一条条消费,当然是具体业务状况设置 max-poll-records: 1 # 指定心跳包发送频率,即间隔多长时间发送一次心跳包,优化该值的设置可以减少Rebalance操作,默认时间为3秒; heartbeat-interval: 6000 # 发出请求时传递给服务器的 ID。用于服务器端日志记录 正常使用后解开注释,不然只有一个节点会报错 #client-id: mqtt listener: #消费端监听的topic不存在时,项目启动会报错(关掉) missing-topics-fatal: false #设置消费类型 批量消费 batch,单条消费:single type: single #指定容器的线程数,提高并发量 #concurrency: 3 #手动提交偏移量 manual达到一定数据后批量提交 #ack-mode: manual ack-mode: MANUAL_IMMEDIATE #手動確認消息 # 认证 #properties: #security: #protocol: SASL_PLAINTEXT #sasl: #mechanism: SCRAM-SHA-256 #jaas:config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'비동기를 사용하여 메시지를 보냅니다
@Component @Slf4j public class KafkaUtils<K, V> { @Autowired private KafkaTemplate kafkaTemplate; @Value("${spring.kafka.bootstrap-servers}") String[] servers; /** * 获取连接 * @return */ private Admin getAdmin() { Properties properties = new Properties(); properties.put("bootstrap.servers", servers); // 正式环境需要添加账号密码 return Admin.create(properties); } /** * 增加topic * * @param name 主题名字 * @param partition 分区数量 * @param replica 副本数量 * @date 2022-06-23 chens */ public R addTopic(String name, Integer partition, Integer replica) { Admin admin = getAdmin(); if (replica > servers.length) { return R.error("副本数量不允许超过Broker数量"); } try { NewTopic topic = new NewTopic(name, partition, Short.parseShort(replica.toString())); admin.createTopics(Collections.singleton(topic)); } finally { admin.close(); } return R.ok(); } /** * 删除主题 * * @param names 主题名字集合 * @date 2022-06-23 chens */ public void deleteTopic(List<String> names) { Admin admin = getAdmin(); try { admin.deleteTopics(names); } finally { admin.close(); } } /** * 查询所有主题 * * @date 2022-06-24 chens */ public Set<String> queryTopic() { Admin admin = getAdmin(); try { ListTopicsResult topics = admin.listTopics(); Set<String> set = topics.names().get(); return set; } catch (Exception e) { log.error("查询主题错误!"); } finally { admin.close(); } return null; } // 向所有分区发送消息 public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) { return kafkaTemplate.send(topic, data); } // 指定key发送消息,相同key保证消息在同一个分区 public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) { return kafkaTemplate.send(topic, key, data); } // 指定分区和key发送。 public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) { return kafkaTemplate.send(topic, partition, key, data); } }주제를 만듭니다브로커 측에서 auto.create.topics를 구성합니다. true로 설정하면(기본값은 true) 클라이언트의 메타데이터 요청이 수신될 때 주제가 생성됩니다. 존재하지 않는 주제로 전송하고 소비하면 새로운 주제가 생성되는 경우가 많습니다. 예상치 못한 주제 생성으로 인해 예상치 못한 문제가 발생하는 경우가 많으므로 이 기능을 끄는 것이 좋습니다. 주제 테마는 실제로 다양한 비즈니스 시나리오에 적합합니다. 기본적으로 메시지는 일주일 동안 저장됩니다. 동일한 주제 테마에서 기본값은 파티션 파티션입니다. 소비자가 하나만 있을 수 있다는 의미입니다. 소비 용량을 늘리려면 파티션을 추가해야 합니다. 동일한 주제의 여러 파티션의 경우 메시지(키, 값)를 다른 파티션에 배포하는 세 가지 방법이 있습니다. , 지정된 파티션, HASH 라우팅, 기본값, 동일 파티션의 메시지 ID는 고유하고 순차적입니다. 소비자는 파티션의 메시지를 소비할 때 오프셋을 사용하여 메시지 위치를 식별합니다. 소비 욕구 등 동일한 주제에 대한 반복 소비 문제 여러 소비자가 수신할 때 서로 다른 GroupId를 설정하면 가능합니다. 실제 메시지는 하나의 사본에 저장되며, 논리적인 설정에 의해서만 구별됩니다. 시스템은 파티션 아래의 주제 테마 –》GroupId 그룹화–》 오프셋에 이를 기록하여 사용 여부를 식별합니다. 메시지 전송의 높은 가용성 - 클러스터 모드, 다중 복사 구현; acks 플래그를 설정하면 메시지 제출이 다른 가용성을 얻을 수 있습니다. =1이면 전송이 성공합니다. 마스터가 성공적으로 응답합니다 Only OK, =all인 경우 응답의 절반 이상이 OK입니다(실제 고가용성) 메시지 소비의 높은 가용성 - 자동 식별 오프셋 모드를 끄고 메시지를 먼저 가져온 다음 설정할 수 있습니다. 소비가 완료된 후 오프셋 위치, 소비의 고가용성을 해결하기 위해
@GetMapping("/{topic}") public String test(@PathVariable String topic, @PathVariable Long index) throws ExecutionException, InterruptedException { ListenableFuture future = null; Chenshuang user = new Chenshuang(i, "陈爽", "123456", new Date()); String s = JSON.toJSONString(user); KafkaUtils utils = new KafkaUtils(); future = kafkaUtils.send(topic, s); // 异步回调,同步get,会等待 不推荐同步! future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { System.out.println("发送失败"); } @Override public void onSuccess(Object result) { System.out.println("发送成功:" + result); } }); return "发送成功"; }Listening 클래스, 메시지는 각 그룹에서 한 명의 Consumer만 해당 메시지가 zone 1에 있는 경우 지정된 파티션 1 청취자도 이를 소비합니다. 소비합니다
같은 방식으로 다양한 메시지를 들을 수도 있습니다. 주제, 지정된 변위 모니터링
같은 그룹은 고르게 소비하고, 다른 그룹은 반복적으로 소비합니다. 1. 유니캐스트 모드에는 소비자 그룹이 하나만 있습니다 (1) 주제에는 하나의 파티션만 있습니다. 그룹에 여러 소비자가 있는 경우 동일한 파티션의 메시지는 그룹 중 하나만 보낼 수 있습니다. 소비자 소비. 소비자 수가 파티션 수를 초과하면 그림 1과 같이 초과 소비자는 유휴 상태가 됩니다. 주제와 테스트에는 단 하나의 파티션과 단 하나의 그룹인 G1만 있습니다. 이 그룹에는 여러 소비자가 있으며 그 중 하나만 사용할 수 있고 나머지는 유휴 상태입니다.(2) 주제에 여러 파티션이 있고 그룹에 여러 소비자가 있습니다. 예를 들어 테스트에는 3개의 파티션이 있고 그룹에 2명의 소비자가 있으면 해당 C0이 될 수 있습니다. p0, p1, c1의 데이터는 p2의 데이터를 소비하는 것에 해당하며, 소비자가 3명인 경우 한 명의 소비자가 하나의 파티션에서 데이터를 소비하는 것에 해당합니다. 다이어그램은 그림 2와 그림 3에 나와 있습니다. 이 모드는 클러스터 모드에서 매우 일반적입니다. 예를 들어, 3개의 서비스를 시작하고 해당 주제에 대해 3개의 파티션을 설정할 수 있으므로 병렬 소비가 달성되고 메시지 처리 효율성이 향상됩니다. 효율성을 크게 향상시킬 수 있습니다.
그림 2
그림 3
2. 브로드캐스트 모드, 다중 소비자 그룹
브로드캐스트 모드를 구현하려면 여러 소비자 그룹을 설정해야 하므로 하나의 소비자 그룹이 소비 이 메시지를 마친 후에는 다른 그룹의 소비자 소비에 전혀 영향을 미치지 않습니다. 이것이 방송의 개념입니다.
(1) 여러 소비자 그룹, 1개의 파티션이 주제의 데이터는 여러 소비자 그룹에서 동시에 소비됩니다. 소비자 그룹에 여러 소비자가 있으면 다음과 같이 한 명의 소비자만 사용할 수 있습니다. 그림 4:그림 4
(2) 다중 소비자 그룹, 다중 파티션
이 항목의 데이터는 한 소비자에서 여러 소비자 그룹에 의해 여러 번 소비될 수 있습니다. 그룹 내에서 각 소비자는 그림 5와 같이 주제 내의 하나 이상의 파티션에 해당하는 병렬:
注意: 消费者的数量并不能决定一个topic的并行度。它是由分区的数目决定的。
再多的消费者,分区数少,也是浪费!
一个组的最大并行度将等于该主题的分区数。
@Component @Slf4j public class Consumer { // 监听主题 分组a @KafkaListener(topics =("${spring.kafka.topic}") ,groupId = "a") public void getMessage(ConsumerRecord message, Acknowledgment ack) { //确认收到消息 ack.acknowledge(); } // 监听主题 分组a @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "a") public void getMessage2(ConsumerRecord message, Acknowledgment ack) { //确认收到消息 ack.acknowledge(); } // 监听主题 分组b @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b") public void getMessage3(ConsumerRecord message, Acknowledgment ack) { //确认收到消息//确认收到消息 ack.acknowledge(); } // 监听主题 分组b @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b") public void getMessage4(ConsumerRecord message, Acknowledgment ack) { //确认收到消息//确认收到消息 ack.acknowledge(); } // 指定监听分区1的消息 @KafkaListener(topicPartitions = {@TopicPartition(topic = ("${spring.kafka.topic}"),partitions = {"1"})}) public void getMessage5(ConsumerRecord message, Acknowledgment ack) { Long id = JSONObject.parseObject(message.value().toString()).getLong("id"); //确认收到消息//确认收到消息 ack.acknowledge(); } /** * @Title 指定topic、partition、offset消费 * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8 * 注意:topics和topicPartitions不能同时使用; **/ @KafkaListener(id = "c1",groupId = "c",topicPartitions = { @TopicPartition(topic = "t1", partitions = { "0" }), @TopicPartition(topic = "t2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))}) public void getMessage6(ConsumerRecord record,Acknowledgment ack) { //确认收到消息 ack.acknowledge(); } /** * 批量消费监听goods变更消息 * yml配置listener:type 要改为batch * ymk配置consumer:max-poll-records: ??(每次拉取多少条数据消费) * concurrency = "2" 启动多少线程执行,应小于等于broker数量,避免资源浪费 */ @KafkaListener(id="sync-modify-goods", topics = "${spring.kafka.topic}",concurrency = "4") public void getMessage7(List<ConsumerRecord<String, String>> records){ for (ConsumerRecord<String, String> msg:records) { GoodsChangeMsg changeMsg = null; try { changeMsg = JSONObject.parseObject(msg.value(), GoodsChangeMsg.class); syncGoodsProcessor.handle(changeMsg); }catch (Exception exception) { log.error("解析失败{}", msg, exception); } } } }
위 내용은 SpringBoot가 Kafka 구성 도구 클래스를 통합하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!