Heim >Backend-Entwicklung >Golang >Implementieren Sie Kafka mit Golang
Mit der zunehmenden Komplexität der Anwendungsarchitektur auf Unternehmensebene ist die Nachrichtenübertragung zu einer entscheidenden Komponente geworden. Hier tritt Kafka in den Vordergrund. Kafka ist eine effiziente und zuverlässige verteilte Nachrichtenwarteschlange, die die Veröffentlichung und das Abonnement von Nachrichten unterstützt. Es handelt sich um ein modernes Messaging-System auf Unternehmensebene mit sehr hohem Durchsatz und geringer Latenz. Obwohl der offizielle Client mehrere Sprachen in der Kafka-API bereitstellt, wird Golang in den letzten Jahren immer häufiger verwendet. Daher wird in diesem Artikel Golang als Implementierungssprache verwendet, um zu erklären, wie Golang zur Implementierung von Kafka verwendet wird.
1. Abhängigkeiten
Bevor Sie beginnen, müssen Sie die erforderlichen Abhängigkeiten herunterladen:
Spezifische Verwendungsmethode Wie folgt :
Gehen Sie zu github.com/Shopify/sarama
Gehen Sie zu github.com/pkg/errors
2. Erstellen Sie einen Produzenten
Bevor Sie Kafkas API einführen, müssen Sie zunächst eine Produzenteninstanz erstellen. Der Code des Produzenten lautet wie folgt:
package main import ( "fmt" "time" "github.com/pkg/errors" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create producer")) } defer producer.Close() for i := 0; i < 10; i++ { message := &sarama.ProducerMessage{ Topic: "test_topic", Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)), } partition, offset, err := producer.SendMessage(message) if err != nil { fmt.Println(errors.Wrapf(err, "failed to send message: %s", message)) } else { fmt.Printf("message sent to partition %d at offset %d ", partition, offset) } time.Sleep(500 * time.Millisecond) // 延迟发送 } }
Der Code macht hauptsächlich die folgenden Dinge:
3. Erstellen Sie einen Verbraucher
Zweitens müssen Sie eine Verbraucherinstanz erstellen. Der Verbrauchercode lautet wie folgt:
package main import ( "context" "fmt" "os" "os/signal" "github.com/Shopify/sarama" "github.com/pkg/errors" ) func main() { config := sarama.NewConfig() config.Consumer.Return.Errors = true consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create consumer")) } defer consumer.Close() signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) partitions, err := consumer.Partitions("test_topic") if err != nil { panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic")) } ctx, cancel := context.WithCancel(context.Background()) for _, partition := range partitions { go func(partition int32) { partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest) if err != nil { fmt.Printf("failed to create partition consumer for partition %d: %s ", partition, err) return } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("Consumed message from partition %d at offset %d: %s ", msg.Partition, msg.Offset, msg.Value) case <-signals: cancel() return case err := <-partitionConsumer.Errors(): fmt.Printf("Consumed error from partition %d: %s ", partition, err) case <-ctx.Done(): return } } }(partition) } <-signals fmt.Println("Shutting down consumer") }
Der Code führt hauptsächlich die folgenden Aufgaben aus:
4. Zusammenfassung
Oben haben wir Golang verwendet, um die Produzenten- und Verbraucherteile von Kafka zu implementieren. Als eine der wichtigen Komponenten bei der Implementierung verteilter Systeme kann Kafka das Problem von Nachrichtensystemen lösen, die in Umgebungen mit hoher Parallelität und verteilten Umgebungen vorhanden sind. Probleme, und Kafka verfügt außerdem über eine gute Supportdokumentation und eine stabile Community, sodass die Anwendung in der tatsächlichen Entwicklung stressfrei ist.
Das obige ist der detaillierte Inhalt vonImplementieren Sie Kafka mit Golang. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!