>  기사  >  백엔드 개발  >  Golang과 Kafka를 함께 사용하는 방법

Golang과 Kafka를 함께 사용하는 방법

PHPz
PHPz원래의
2023-04-13 18:34:171309검색

Kafka는 빅 데이터 애플리케이션에서 실시간 데이터 스트림 처리 애플리케이션을 구축하는 데 자주 사용되는 오픈 소스 분산 메시지 대기열입니다. Golang은 Google이 개발한 프로그래밍 언어로 효율적인 동시성, 강력한 라이브러리 및 생태계로 유명합니다. 그렇다면 Golang을 사용하여 Kafka와 결합하는 방법은 무엇입니까?

먼저 github.com/Shopify/sarama 패키지를 가져와야 합니다. Kafka를 지원하는 Golang 클라이언트 라이브러리입니다. 설치 프로세스 중에 다음 명령을 실행해야 합니다.

go get github.com/Shopify/sarama

다음으로 생산자를 생성해야 합니다. 먼저 구성을 만듭니다.

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true

여기에서는 생산자가 모든 ACK를 기다리고 최대 5번의 재시도를 시도한 후 성공 후 생산자에게 성공 메시지를 반환하도록 설정합니다.

다음으로 생산자 인스턴스를 생성해야 합니다.

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
    panic(err)
}
defer producer.Close()

Kafka에 연결하기 위한 서비스 엔드포인트로 Kafka 브로커 주소를 지정해야 합니다. 여기서는 로컬 Kafka 서버에 연결합니다. 또한 .Close() 메서드를 호출하여 생산자가 종료될 때 정리되도록 합니다. .Close()方法,以确保生产者退出时会清理。

现在我们已经准备好了开始向Kafka主题发布消息:

msg := &sarama.ProducerMessage{
    Topic: "test",
    Value: sarama.StringEncoder("Hello World!"),
}

part, offset, err := producer.SendMessage(msg)
if err != nil {
    fmt.Printf("Error publishing message: %v", err)
} else {
    fmt.Printf("Message published successfully. Partition: %v, Offset: %v\n", part, offset)
}

在这个例子中,我们发布了一个消息到名为“test”的主题中。如果没有错误,它会打印出成功发布的分区和偏移量。

现在我们已经创建了一个生产者,向Kafka发布了一条消息。接下来,我们来看一下如何创建一个消费者。

首先,我们需要创建消费者配置:

config := sarama.NewConfig()
config.Consumer.Return.Errors = true

此处我们设定了接收错误。

接下来,我们需要创建一个消费者实例:

consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
    panic(err)
}
defer consumer.Close()

这里我们同样指定了一个Kafka broker地址。我们还需要调用.Close()方法来确保消费者退出时会清理。

现在我们已经准备好读取Kafka主题的消息:

partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
if err != nil {
    panic(err)
}
defer partitionConsumer.Close()

for {
    select {
    case msg := <-partitionConsumer.Messages():
        fmt.Printf("Received message from partition %d with offset %d: %s = %s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
    case err := <-partitionConsumer.Errors():
        fmt.Println("Error: ", err.Error())
    }
}

在这个例子中,我们订阅了名为“test”的主题。然后我们读取第一个分区的偏移量。我们然后在一个循环中无限读取来自该分区的消息。循环中的select

이제 Kafka 주제에 메시지 게시를 시작할 준비가 되었습니다.

rrreee

이 예에서는 "test"라는 주제에 메시지를 게시합니다. 오류가 없으면 성공적으로 게시된 파티션과 오프셋을 인쇄합니다. 🎜🎜이제 Kafka에 메시지를 게시하는 프로듀서를 만들었습니다. 다음으로 컨슈머를 생성하는 방법을 살펴보겠습니다. 🎜🎜먼저 소비자 구성을 생성해야 합니다. 🎜rrreee🎜여기서 수신 오류를 설정합니다. 🎜🎜다음으로 소비자 인스턴스를 생성해야 합니다. 🎜rrreee🎜여기서 Kafka 브로커 주소도 지정합니다. 또한 소비자가 종료될 때 정리되도록 .Close() 메서드를 호출해야 합니다. 🎜🎜이제 Kafka 주제의 메시지를 읽을 준비가 되었습니다. 🎜rrreee🎜이 예에서는 "test"라는 주제를 구독합니다. 그런 다음 첫 번째 파티션의 오프셋을 읽습니다. 그런 다음 루프에서 해당 파티션의 메시지를 무한히 읽습니다. 루프의 select 문은 항상 메시지와 오류 채널을 수신하고 각각 인쇄합니다. 🎜🎜지금까지 Golang과 Kafka를 결합하여 사용하는 방법을 소개했습니다. 이 간단한 예를 통해 Golang과 Kafka의 기본 사용법을 익혔을 것입니다. 🎜

위 내용은 Golang과 Kafka를 함께 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.