Maison >développement back-end >Golang >Connectez Kafka à Golang

Connectez Kafka à Golang

WBOY
WBOYoriginal
2024-09-06 22:30:32565parcourir

Introduction

Si vous avez besoin de connaître les bases de Kafka, telles que ses principales fonctionnalités, composants et avantages, j'ai un article qui traite de cela ici. Veuillez le consulter et suivre les étapes jusqu'à ce que vous ayez terminé l'installation de Kafka à l'aide de Docker pour passer aux sections suivantes.

Connect Kafka with Golang

Se connecter à Kafka avec Golang

Semblable à l'exemple de l'article sur la connexion de Kafka avec NodeJS, ce code source comprend également deux parties : initialiser un producteur d'envoyer des messages à Kafka et d'utiliser un consommateur pour s'abonner aux messages d'un sujet.

Je vais diviser le code en parties plus petites pour une meilleure compréhension. Commençons par définir les valeurs des variables.

package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)

- Ici, le package github.com/confluentinc/confluent-kafka-go/kafka est utilisé pour se connecter à Kafka.

- Le courtier est l'adresse de l'hôte ; si vous utilisez ZooKeeper, remplacez l'adresse de l'hôte en conséquence.

- Le groupId et le sujet peuvent être modifiés selon les besoins.

Ensuite, l'initialisation du producteur.

func startProducer() {
  p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
  if err != nil {
    panic(err)
  }

  go func() {
    for e := range p.Events() {
      switch ev := e.(type) {
      case *kafka.Message:
        if ev.TopicPartition.Error != nil {
          fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
        } else {
          fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
        }
      }
    }
  }()

  for _, word := range []string{"message 1", "message 2", "message 3"} {
    p.Produce(&kafka.Message{
      TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
      Value:          []byte(word),
    }, nil)
  }
}

Le code ci-dessus est utilisé pour envoyer un tableau de messages {"message 1", "message 2", "message 3"} à un sujet et utilise un go-routine pour parcourir les événements avec for e := range p.Events() et imprimer le résultat de la livraison, qu'il s'agisse d'un succès ou échec.

Ensuite, créez un consommateur pour s'abonner au sujet et recevoir des messages.

func startConsumer() {
  c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": broker,
    "group.id":          groupId,
    "auto.offset.reset": "earliest",
  })

  if err != nil {
    panic(err)
  }
  c.Subscribe(topic, nil)

  for {
    msg, err := c.ReadMessage(-1)
    if err == nil {
      fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    } else {
      fmt.Printf("Consumer error: %v (%v)\n", err, msg)
      break
    }
  }

  c.Close()
}

Enfin, puisqu'il s'agit d'un exemple simple, appelez les fonctions pour créer le producteur et le consommateur à utiliser. Dans un scénario réel, le déploiement du producteur et du consommateur s'effectue généralement sur deux serveurs différents dans un système de microservices.

func main() {
  startProducer()
  startConsumer()
}

Connect Kafka with Golang

Bon codage !


Si vous avez trouvé ce contenu utile, veuillez visiter l'article original sur mon blog pour soutenir l'auteur et explorer un contenu plus intéressant.

Connect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with Golang


Quelques séries qui pourraient vous intéresser :

  • NodeJS
  •  Réagissez
  • Docker 
  • Kubernetes

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn