Heim  >  Artikel  >  Backend-Entwicklung  >  Implementieren Sie Kafka mit Golang

Implementieren Sie Kafka mit Golang

王林
王林Original
2023-05-10 13:18:371538Durchsuche

Mit der zunehmenden Komplexität der Anwendungsarchitektur auf Unternehmensebene ist die Nachrichtenübertragung zu einer entscheidenden Komponente geworden. Hier tritt Kafka in den Vordergrund. Kafka ist eine effiziente und zuverlässige verteilte Nachrichtenwarteschlange, die die Veröffentlichung und das Abonnement von Nachrichten unterstützt. Es handelt sich um ein modernes Messaging-System auf Unternehmensebene mit sehr hohem Durchsatz und geringer Latenz. Obwohl der offizielle Client mehrere Sprachen in der Kafka-API bereitstellt, wird Golang in den letzten Jahren immer häufiger verwendet. Daher wird in diesem Artikel Golang als Implementierungssprache verwendet, um zu erklären, wie Golang zur Implementierung von Kafka verwendet wird.

1. Abhängigkeiten

Bevor Sie beginnen, müssen Sie die erforderlichen Abhängigkeiten herunterladen:

  • sarama: Golang Kafka-Clientbibliothek
  • pkg/errors: Kapseln Sie das Fehlerpaket der Go-Standardbibliothek

Spezifische Verwendungsmethode Wie folgt :

Gehen Sie zu github.com/Shopify/sarama
Gehen Sie zu github.com/pkg/errors

2. Erstellen Sie einen Produzenten

Bevor Sie Kafkas API einführen, müssen Sie zunächst eine Produzenteninstanz erstellen. Der Code des Produzenten lautet wie folgt:

package main

import (
    "fmt"
    "time"

    "github.com/pkg/errors"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create producer"))
    }
    defer producer.Close()

    for i := 0; i < 10; i++ {
        message := &sarama.ProducerMessage{
            Topic: "test_topic",
            Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)),
        }
        partition, offset, err := producer.SendMessage(message)
        if err != nil {
            fmt.Println(errors.Wrapf(err, "failed to send message: %s", message))
        } else {
            fmt.Printf("message sent to partition %d at offset %d
", partition, offset)
        }

        time.Sleep(500 * time.Millisecond) // 延迟发送
    }
}

Der Code macht hauptsächlich die folgenden Dinge:

  • Konfigurieren Sie den Produzenten: Legen Sie die Konfiguration des Produzenten fest, geben Sie die Partitionierungsmethode als zufällige Partitionierung an und warten Sie, bis alle ISR-Knoten dies tun Bestätigen Sie die Meldung. Dann kehren Sie zurück und geben nach erfolgreichem Senden Partition und Offset zurück.
  • Produzenten erstellen: Erstellen Sie eine Produzenteninstanz mit der angegebenen Broker-Adresse und Konfiguration.
  • Nachricht senden: Erstellen Sie eine Nachricht mit Betreff und Inhalt der Nachricht und senden Sie sie.
  • Ergebnisse ausgeben: Ergebnisse drucken, Nachrichtenpartition und Offset aufzeichnen.

3. Erstellen Sie einen Verbraucher

Zweitens müssen Sie eine Verbraucherinstanz erstellen. Der Verbrauchercode lautet wie folgt:

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"

    "github.com/Shopify/sarama"
    "github.com/pkg/errors"
)

func main() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true

    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create consumer"))
    }
    defer consumer.Close()

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    partitions, err := consumer.Partitions("test_topic")
    if err != nil {
        panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic"))
    }

    ctx, cancel := context.WithCancel(context.Background())

    for _, partition := range partitions {
        go func(partition int32) {
            partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest)
            if err != nil {
                fmt.Printf("failed to create partition consumer for partition %d: %s
", partition, err)
                return
            }
            defer partitionConsumer.Close()

            for {
                select {
                case msg := <-partitionConsumer.Messages():
                    fmt.Printf("Consumed message from partition %d at offset %d: %s
", msg.Partition, msg.Offset, msg.Value)
                case <-signals:
                    cancel()
                    return
                case err := <-partitionConsumer.Errors():
                    fmt.Printf("Consumed error from partition %d: %s
", partition, err)
                case <-ctx.Done():
                    return
                }
            }
        }(partition)
    }

    <-signals
    fmt.Println("Shutting down consumer")
}

Der Code führt hauptsächlich die folgenden Aufgaben aus:

  • Konfigurieren Sie den Verbraucher: Konfigurieren Sie den Verbraucher und stellen Sie den Fehlerrückgabeschalter ein.
  • Verbraucher erstellen: Erstellen Sie eine Verbraucherinstanz basierend auf der angegebenen Broker-Adresse und -Konfiguration.
  • Partition abrufen: Ruft die Partition des angegebenen Themas ab.
  • Verbrauch: Öffnen Sie für jede Partition eine Goroutine für den separaten Verbrauch.
  • Ergebnisse ausgeben: Drucken Sie die konsumierten Nachrichten aus.

4. Zusammenfassung

Oben haben wir Golang verwendet, um die Produzenten- und Verbraucherteile von Kafka zu implementieren. Als eine der wichtigen Komponenten bei der Implementierung verteilter Systeme kann Kafka das Problem von Nachrichtensystemen lösen, die in Umgebungen mit hoher Parallelität und verteilten Umgebungen vorhanden sind. Probleme, und Kafka verfügt außerdem über eine gute Supportdokumentation und eine stabile Community, sodass die Anwendung in der tatsächlichen Entwicklung stressfrei ist.

Das obige ist der detaillierte Inhalt vonImplementieren Sie Kafka mit Golang. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Vorheriger Artikel:Redis Golang Batch-AbfrageNächster Artikel:Redis Golang Batch-Abfrage