Maison >développement back-end >Golang >Implémenter Kafka avec Golang

Implémenter Kafka avec Golang

王林
王林original
2023-05-10 13:18:371616parcourir

Avec la complexité croissante de l'architecture des applications au niveau de l'entreprise, la transmission des messages est devenue un élément crucial. C’est alors que Kafka apparaît. Kafka est une file d'attente de messages distribuée efficace et fiable qui prend en charge la publication et l'abonnement des messages. Il s'agit d'un système de messagerie d'entreprise moderne avec un débit très élevé et une faible latence. Dans l'API de Kafka, bien que le client officiel propose plusieurs langues, Golang est devenu de plus en plus largement utilisé ces dernières années, c'est pourquoi cet article utilise Golang comme langage d'implémentation pour expliquer comment utiliser Golang pour implémenter Kafka.

1. Dépendances

Avant de commencer, vous devez télécharger les dépendances requises :

  • sarama : bibliothèque client Golang Kafka
  • pkg/errors : encapsuler le package d'erreurs de la bibliothèque standard Go

Méthode d'utilisation spécifique Comme suit :

allez sur github.com/Shopify/sarama
allez sur github.com/pkg/errors

2. Créer un producteur

Avant d'introduire l'API de Kafka, vous devez d'abord créer une instance de producteur. Le code du producteur est le suivant :

package main

import (
    "fmt"
    "time"

    "github.com/pkg/errors"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create producer"))
    }
    defer producer.Close()

    for i := 0; i < 10; i++ {
        message := &sarama.ProducerMessage{
            Topic: "test_topic",
            Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)),
        }
        partition, offset, err := producer.SendMessage(message)
        if err != nil {
            fmt.Println(errors.Wrapf(err, "failed to send message: %s", message))
        } else {
            fmt.Printf("message sent to partition %d at offset %d
", partition, offset)
        }

        time.Sleep(500 * time.Millisecond) // 延迟发送
    }
}

Le code fait principalement les choses suivantes :

  • Configurer le producteur : définissez la configuration du producteur, spécifiez la méthode de partitionnement comme partitionnement aléatoire, et attendez que tous les nœuds ISR confirmez le message, puis revenez et renvoyez la partition et le décalage après une transmission réussie.
  • Créer un producteur : créez une instance de producteur avec l'adresse et la configuration du courtier spécifiées.
  • Envoyer un message : créez un message avec le sujet et le contenu du message, puis envoyez-le.
  • Résultats de sortie : résultats d'impression, partition d'enregistrement et décalage des messages.

3. Créer un consommateur

Deuxièmement, vous devez créer une instance de consommateur. Le code du consommateur est le suivant :

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"

    "github.com/Shopify/sarama"
    "github.com/pkg/errors"
)

func main() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true

    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create consumer"))
    }
    defer consumer.Close()

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    partitions, err := consumer.Partitions("test_topic")
    if err != nil {
        panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic"))
    }

    ctx, cancel := context.WithCancel(context.Background())

    for _, partition := range partitions {
        go func(partition int32) {
            partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest)
            if err != nil {
                fmt.Printf("failed to create partition consumer for partition %d: %s
", partition, err)
                return
            }
            defer partitionConsumer.Close()

            for {
                select {
                case msg := <-partitionConsumer.Messages():
                    fmt.Printf("Consumed message from partition %d at offset %d: %s
", msg.Partition, msg.Offset, msg.Value)
                case <-signals:
                    cancel()
                    return
                case err := <-partitionConsumer.Errors():
                    fmt.Printf("Consumed error from partition %d: %s
", partition, err)
                case <-ctx.Done():
                    return
                }
            }
        }(partition)
    }

    <-signals
    fmt.Println("Shutting down consumer")
}

Le code fait principalement les choses suivantes :

  • Configurer le consommateur : configurez le consommateur et définissez le commutateur de retour d'erreur.
  • Créer un consommateur : créez une instance de consommateur basée sur l'adresse et la configuration du courtier spécifiées.
  • Obtenir la partition : obtenez la partition du sujet spécifié.
  • Consommation : ouvrez une goroutine pour chaque partition pour une consommation séparée.
  • Résultats de sortie : imprimez les messages consommés.

IV.Résumé

Ci-dessus, nous avons utilisé Golang pour implémenter les parties producteur et consommateur de Kafka. En tant que l'un des composants importants de la mise en œuvre de systèmes distribués, Kafka peut résoudre le problème des systèmes de messagerie existant dans les environnements distribués et à haute concurrence. problèmes, et Kafka dispose également d'une bonne documentation de support et d'une communauté stable, ce qui rend son application sans stress dans le développement réel.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Article précédent:requête par lots Redis GolangArticle suivant:requête par lots Redis Golang