Maison >développement back-end >Golang >Implémenter Kafka avec Golang
Avec la complexité croissante de l'architecture des applications au niveau de l'entreprise, la transmission des messages est devenue un élément crucial. C’est alors que Kafka apparaît. Kafka est une file d'attente de messages distribuée efficace et fiable qui prend en charge la publication et l'abonnement des messages. Il s'agit d'un système de messagerie d'entreprise moderne avec un débit très élevé et une faible latence. Dans l'API de Kafka, bien que le client officiel propose plusieurs langues, Golang est devenu de plus en plus largement utilisé ces dernières années, c'est pourquoi cet article utilise Golang comme langage d'implémentation pour expliquer comment utiliser Golang pour implémenter Kafka.
1. Dépendances
Avant de commencer, vous devez télécharger les dépendances requises :
Méthode d'utilisation spécifique Comme suit :
allez sur github.com/Shopify/sarama
allez sur github.com/pkg/errors
2. Créer un producteur
Avant d'introduire l'API de Kafka, vous devez d'abord créer une instance de producteur. Le code du producteur est le suivant :
package main import ( "fmt" "time" "github.com/pkg/errors" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create producer")) } defer producer.Close() for i := 0; i < 10; i++ { message := &sarama.ProducerMessage{ Topic: "test_topic", Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)), } partition, offset, err := producer.SendMessage(message) if err != nil { fmt.Println(errors.Wrapf(err, "failed to send message: %s", message)) } else { fmt.Printf("message sent to partition %d at offset %d ", partition, offset) } time.Sleep(500 * time.Millisecond) // 延迟发送 } }
Le code fait principalement les choses suivantes :
3. Créer un consommateur
Deuxièmement, vous devez créer une instance de consommateur. Le code du consommateur est le suivant :
package main import ( "context" "fmt" "os" "os/signal" "github.com/Shopify/sarama" "github.com/pkg/errors" ) func main() { config := sarama.NewConfig() config.Consumer.Return.Errors = true consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create consumer")) } defer consumer.Close() signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) partitions, err := consumer.Partitions("test_topic") if err != nil { panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic")) } ctx, cancel := context.WithCancel(context.Background()) for _, partition := range partitions { go func(partition int32) { partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest) if err != nil { fmt.Printf("failed to create partition consumer for partition %d: %s ", partition, err) return } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("Consumed message from partition %d at offset %d: %s ", msg.Partition, msg.Offset, msg.Value) case <-signals: cancel() return case err := <-partitionConsumer.Errors(): fmt.Printf("Consumed error from partition %d: %s ", partition, err) case <-ctx.Done(): return } } }(partition) } <-signals fmt.Println("Shutting down consumer") }
Le code fait principalement les choses suivantes :
IV.Résumé
Ci-dessus, nous avons utilisé Golang pour implémenter les parties producteur et consommateur de Kafka. En tant que l'un des composants importants de la mise en œuvre de systèmes distribués, Kafka peut résoudre le problème des systèmes de messagerie existant dans les environnements distribués et à haute concurrence. problèmes, et Kafka dispose également d'une bonne documentation de support et d'une communauté stable, ce qui rend son application sans stress dans le développement réel.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!