Maison >développement back-end >Golang >Connectez Kafka à Golang
Si vous avez besoin de connaître les bases de Kafka, telles que ses principales fonctionnalités, composants et avantages, j'ai un article qui traite de cela ici. Veuillez le consulter et suivre les étapes jusqu'à ce que vous ayez terminé l'installation de Kafka à l'aide de Docker pour passer aux sections suivantes.
Semblable à l'exemple de l'article sur la connexion de Kafka avec NodeJS, ce code source comprend également deux parties : initialiser un producteur d'envoyer des messages à Kafka et d'utiliser un consommateur pour s'abonner aux messages d'un sujet.
Je vais diviser le code en parties plus petites pour une meilleure compréhension. Commençons par définir les valeurs des variables.
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
- Ici, le package github.com/confluentinc/confluent-kafka-go/kafka est utilisé pour se connecter à Kafka.
- Le courtier est l'adresse de l'hôte ; si vous utilisez ZooKeeper, remplacez l'adresse de l'hôte en conséquence.
- Le groupId et le sujet peuvent être modifiés selon les besoins.
Ensuite, l'initialisation du producteur.
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }
Le code ci-dessus est utilisé pour envoyer un tableau de messages {"message 1", "message 2", "message 3"} à un sujet et utilise un go-routine pour parcourir les événements avec for e := range p.Events() et imprimer le résultat de la livraison, qu'il s'agisse d'un succès ou échec.
Ensuite, créez un consommateur pour s'abonner au sujet et recevoir des messages.
func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }
Enfin, puisqu'il s'agit d'un exemple simple, appelez les fonctions pour créer le producteur et le consommateur à utiliser. Dans un scénario réel, le déploiement du producteur et du consommateur s'effectue généralement sur deux serveurs différents dans un système de microservices.
func main() { startProducer() startConsumer() }
Bon codage !
Si vous avez trouvé ce contenu utile, veuillez visiter l'article original sur mon blog pour soutenir l'auteur et explorer un contenu plus intéressant.
Quelques séries qui pourraient vous intéresser :
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!