엔터프라이즈 수준 애플리케이션 아키텍처의 복잡성이 증가함에 따라 메시지 전송이 중요한 구성 요소가 되었습니다. 이때 카프카가 등장합니다. Kafka는 메시지 게시 및 구독을 지원하는 효율적이고 안정적인 분산 메시지 대기열로, 처리량이 매우 높고 대기 시간이 짧은 최신 엔터프라이즈급 메시징 시스템입니다. Kafka의 API에서는 공식 클라이언트가 여러 언어를 제공하지만 최근에는 Golang이 점점 더 널리 사용되고 있으므로 이 기사에서는 Golang을 구현 언어로 사용하여 Golang을 사용하여 Kafka를 구현하는 방법을 설명합니다.
1. 종속성
시작하기 전에 필수 종속성을 다운로드해야 합니다.
구체적인 사용 방법은 다음과 같습니다. :
go get github.com/Shopify/sarama
go get github.com/pkg/errors
2. 생산자 생성
Kafka의 API를 소개하기 전에 먼저 생산자 인스턴스를 생성해야 합니다. 생산자의 코드는 다음과 같습니다.
package main import ( "fmt" "time" "github.com/pkg/errors" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create producer")) } defer producer.Close() for i := 0; i < 10; i++ { message := &sarama.ProducerMessage{ Topic: "test_topic", Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)), } partition, offset, err := producer.SendMessage(message) if err != nil { fmt.Println(errors.Wrapf(err, "failed to send message: %s", message)) } else { fmt.Printf("message sent to partition %d at offset %d ", partition, offset) } time.Sleep(500 * time.Millisecond) // 延迟发送 } }
코드는 주로 다음 작업을 수행합니다.
3. 소비자 생성
두 번째로 소비자 인스턴스를 생성해야 합니다. 소비자 코드는 다음과 같습니다.
package main import ( "context" "fmt" "os" "os/signal" "github.com/Shopify/sarama" "github.com/pkg/errors" ) func main() { config := sarama.NewConfig() config.Consumer.Return.Errors = true consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create consumer")) } defer consumer.Close() signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) partitions, err := consumer.Partitions("test_topic") if err != nil { panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic")) } ctx, cancel := context.WithCancel(context.Background()) for _, partition := range partitions { go func(partition int32) { partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest) if err != nil { fmt.Printf("failed to create partition consumer for partition %d: %s ", partition, err) return } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("Consumed message from partition %d at offset %d: %s ", msg.Partition, msg.Offset, msg.Value) case <-signals: cancel() return case err := <-partitionConsumer.Errors(): fmt.Printf("Consumed error from partition %d: %s ", partition, err) case <-ctx.Done(): return } } }(partition) } <-signals fmt.Println("Shutting down consumer") }
코드는 주로 다음 작업을 수행합니다.
4. 요약
위에서 Golang을 사용하여 Kafka의 생산자 및 소비자 부분을 구현했습니다. 분산 시스템 구현의 중요한 구성 요소 중 하나인 Kafka는 높은 동시성 및 분산 환경에 존재하는 메시지 시스템의 문제를 해결할 수 있습니다. Kafka는 우수한 지원 문서와 안정적인 커뮤니티를 갖추고 있어 실제 개발에 스트레스 없이 적용할 수 있습니다.
위 내용은 golang으로 kafka 구현하기의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!