首頁  >  文章  >  後端開發  >  用golang實作kafka

用golang實作kafka

王林
王林原創
2023-05-10 13:18:371528瀏覽

隨著企業級應用程式架構的逐漸複雜化,訊息傳輸變成了一個至關重要的組成部分。這時Kafka便嶄露頭角。 Kafka是一款高效可靠的分散式訊息佇列,它支援訊息的發布和訂閱,是現代化的企業級訊息系統,且擁有非常高的吞吐量和低延遲。在Kafka的API中,儘管官方提供了多種語言的客戶端,但近年來Golang的應用越來越廣泛,所以本文以Golang作為實現語言,講解如何用Golang實現Kafka。

一、依賴

在開始之前,需要先下載所需的依賴:

  • sarama:Golang Kafka客戶端程式庫
  • pkg /errors:對Go標準函式庫的錯誤套件進行封裝

具體使用方法如下:

go get github.com/Shopify/sarama
go get github.com/ pkg/errors

二、建立一個生產者

在介紹Kafka的API之前,需要先建立一個生產者實例。生產者的程式碼如下所示:

package main

import (
    "fmt"
    "time"

    "github.com/pkg/errors"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create producer"))
    }
    defer producer.Close()

    for i := 0; i < 10; i++ {
        message := &sarama.ProducerMessage{
            Topic: "test_topic",
            Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)),
        }
        partition, offset, err := producer.SendMessage(message)
        if err != nil {
            fmt.Println(errors.Wrapf(err, "failed to send message: %s", message))
        } else {
            fmt.Printf("message sent to partition %d at offset %d
", partition, offset)
        }

        time.Sleep(500 * time.Millisecond) // 延迟发送
    }
}

程式碼中主要做了以下幾件事:

  • 配置生產者:設定生產者的配置,並指定分區方式為隨機分區,要求等待所有ISR節點都確認訊息後再返回,以及在發送成功後返回Partition和Offset。
  • 建立生產者:用指定的broker位址和設定建立一個生產者實例。
  • 傳送訊息:建立一個含有訊息主題和內容的訊息,並進行傳送。
  • 輸出結果:列印結果,記錄訊息分區和偏移量。

三、創建一個消費者

在其次,需要建立一個消費者實例。消費者的程式碼如下所示:

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"

    "github.com/Shopify/sarama"
    "github.com/pkg/errors"
)

func main() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true

    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create consumer"))
    }
    defer consumer.Close()

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    partitions, err := consumer.Partitions("test_topic")
    if err != nil {
        panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic"))
    }

    ctx, cancel := context.WithCancel(context.Background())

    for _, partition := range partitions {
        go func(partition int32) {
            partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest)
            if err != nil {
                fmt.Printf("failed to create partition consumer for partition %d: %s
", partition, err)
                return
            }
            defer partitionConsumer.Close()

            for {
                select {
                case msg := <-partitionConsumer.Messages():
                    fmt.Printf("Consumed message from partition %d at offset %d: %s
", msg.Partition, msg.Offset, msg.Value)
                case <-signals:
                    cancel()
                    return
                case err := <-partitionConsumer.Errors():
                    fmt.Printf("Consumed error from partition %d: %s
", partition, err)
                case <-ctx.Done():
                    return
                }
            }
        }(partition)
    }

    <-signals
    fmt.Println("Shutting down consumer")
}

程式碼中主要做了以下幾件事:

  • 設定消費者:設定消費者並設定錯誤回傳開關。
  • 建立消費者:根據指定的broker位址和設定建立一個消費者實例。
  • 取得分割區:取得指定topic的分割區。
  • 進行消費:對每個分區開啟一個goroutine進行單獨的消費。
  • 輸出結果:列印出消費到的訊息。

四、總結

以上,我們使用Golang實現了Kafka的生產者和消費者部分,作為實現分散式系統的重要組成部分之一,Kafka可以解決訊息系統在高並發以及分散式環境下存在的問題,Kafka也有良好的支援文件以及穩定的社區,在實際的開發中應用起來毫無壓力。

以上是用golang實作kafka的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn