>  기사  >  백엔드 개발  >  golang으로 kafka 구현하기

golang으로 kafka 구현하기

王林
王林원래의
2023-05-10 13:18:371538검색

엔터프라이즈 수준 애플리케이션 아키텍처의 복잡성이 증가함에 따라 메시지 전송이 중요한 구성 요소가 되었습니다. 이때 카프카가 등장합니다. Kafka는 메시지 게시 및 구독을 지원하는 효율적이고 안정적인 분산 메시지 대기열로, 처리량이 매우 높고 대기 시간이 짧은 최신 엔터프라이즈급 메시징 시스템입니다. Kafka의 API에서는 공식 클라이언트가 여러 언어를 제공하지만 최근에는 Golang이 점점 더 널리 사용되고 있으므로 이 기사에서는 Golang을 구현 언어로 사용하여 Golang을 사용하여 Kafka를 구현하는 방법을 설명합니다.

1. 종속성

시작하기 전에 필수 종속성을 다운로드해야 합니다.

  • sarama: Golang Kafka 클라이언트 라이브러리
  • pkg/errors: Go 표준 라이브러리의 오류 패키지를 캡슐화합니다.

구체적인 사용 방법은 다음과 같습니다. :

go get github.com/Shopify/sarama
go get github.com/pkg/errors

2. 생산자 생성

Kafka의 API를 소개하기 전에 먼저 생산자 인스턴스를 생성해야 합니다. 생산자의 코드는 다음과 같습니다.

package main

import (
    "fmt"
    "time"

    "github.com/pkg/errors"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create producer"))
    }
    defer producer.Close()

    for i := 0; i < 10; i++ {
        message := &sarama.ProducerMessage{
            Topic: "test_topic",
            Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)),
        }
        partition, offset, err := producer.SendMessage(message)
        if err != nil {
            fmt.Println(errors.Wrapf(err, "failed to send message: %s", message))
        } else {
            fmt.Printf("message sent to partition %d at offset %d
", partition, offset)
        }

        time.Sleep(500 * time.Millisecond) // 延迟发送
    }
}

코드는 주로 다음 작업을 수행합니다.

  • 생산자 구성: 생산자의 구성을 설정하고 분할 방법을 무작위 분할로 지정하고 모든 ISR 노드가 분할될 때까지 기다립니다. 메시지를 확인하고 전송 성공 후 파티션 및 오프셋을 반환하고 반환합니다.
  • 생산자 생성: 지정된 브로커 주소와 구성을 사용하여 생산자 인스턴스를 생성합니다.
  • 메시지 보내기: 메시지 제목과 내용을 포함하여 메시지를 작성하여 보냅니다.
  • 출력 결과: 결과 인쇄, 메시지 파티션 기록 및 오프셋.

3. 소비자 생성

두 번째로 소비자 인스턴스를 생성해야 합니다. 소비자 코드는 다음과 같습니다.

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"

    "github.com/Shopify/sarama"
    "github.com/pkg/errors"
)

func main() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true

    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create consumer"))
    }
    defer consumer.Close()

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    partitions, err := consumer.Partitions("test_topic")
    if err != nil {
        panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic"))
    }

    ctx, cancel := context.WithCancel(context.Background())

    for _, partition := range partitions {
        go func(partition int32) {
            partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest)
            if err != nil {
                fmt.Printf("failed to create partition consumer for partition %d: %s
", partition, err)
                return
            }
            defer partitionConsumer.Close()

            for {
                select {
                case msg := <-partitionConsumer.Messages():
                    fmt.Printf("Consumed message from partition %d at offset %d: %s
", msg.Partition, msg.Offset, msg.Value)
                case <-signals:
                    cancel()
                    return
                case err := <-partitionConsumer.Errors():
                    fmt.Printf("Consumed error from partition %d: %s
", partition, err)
                case <-ctx.Done():
                    return
                }
            }
        }(partition)
    }

    <-signals
    fmt.Println("Shutting down consumer")
}

코드는 주로 다음 작업을 수행합니다.

  • 소비자 구성: 소비자를 구성하고 오류 반환 스위치를 설정합니다.
  • 소비자 생성: 지정된 브로커 주소 및 구성을 기반으로 소비자 인스턴스를 생성합니다.
  • 파티션 가져오기: 지정된 주제의 파티션을 가져옵니다.
  • 소비: 별도의 소비를 위해 각 파티션에 대해 고루틴을 엽니다.
  • 출력 결과: 소비된 메시지를 인쇄합니다.

4. 요약

위에서 Golang을 사용하여 Kafka의 생산자 및 소비자 부분을 구현했습니다. 분산 시스템 구현의 중요한 구성 요소 중 하나인 Kafka는 높은 동시성 및 분산 환경에 존재하는 메시지 시스템의 문제를 해결할 수 있습니다. Kafka는 우수한 지원 문서와 안정적인 커뮤니티를 갖추고 있어 실제 개발에 스트레스 없이 적용할 수 있습니다.

위 내용은 golang으로 kafka 구현하기의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.