Maison >développement back-end >Golang >'Facteur de réplication non valide' dans le client Convergence Kafka Go

'Facteur de réplication non valide' dans le client Convergence Kafka Go

王林
王林avant
2024-02-14 10:30:09900parcourir

汇合 Kafka Go 客户端中的“无效复制因子”

l'éditeur php Xigua vous propose aujourd'hui un article sur le "facteur de réplication invalide" dans le client Kafka Go. Kafka est une plate-forme de traitement de flux distribuée hautes performances et évolutive, tandis que Go est un langage de programmation concis et efficace. Cet article se concentrera sur le problème du « facteur de réplication invalide » qui se produit dans le client Kafka Go, explorera ses causes et ses solutions, et aidera les lecteurs à mieux comprendre et gérer ce défi technique courant. Des facteurs de réplication non valides peuvent entraîner des incohérences de données et une dégradation des performances. Il est donc important que les utilisateurs de Kafka comprennent comment résoudre ce problème. Explorons ensemble en profondeur !

Contenu de la question

Je suis nouveau sur Kafka et j'essaie de démarrer mon projet. J'ai ça dans mon docker-compose.yml

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.0
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

Ensuite, je lance mon fichier main.go avec les producteurs et les consommateurs et quelques sujets fictifs.

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)



func main()  {
    topic := "HVSE"
    p, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "client.id": "foo",
        "acks": "all",
    })
    go func() {
        consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
            "bootstrap.servers":    "localhost:9092",
            "group.id":             "foo",
            "auto.offset.reset":    "smallest",
        })

        if err != nil {
            log.Fatal(err)
        }

        err = consumer.Subscribe(topic, nil)

        if err != nil {
            log.Fatal(err)
        }

        for {
                ev := consumer.Poll(100)
                // fmt.Println(ev)
                switch e := ev.(type) {
                case *kafka.Message:
                        fmt.Printf("consumed message from queue: %s\n", string(e.Value))
                case *kafka.Error:
                        fmt.Printf("%v\n", e)
                        // return
                // default:
                //      fmt.Printf("Ignored %v\n", e)
                }
        }
    }()

    deliverch := make(chan kafka.Event, 10000)
    for {
        err = p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value: []byte("FOO"),
        },
            deliverch,
        )
        if err != nil {
            log.Fatal(err)
        }
    
        <- deliverch
        time.Sleep(time.Second * 1)
    }
}

Si je décommente la valeur par défaut, j'y entre.

Sinon, j'obtiens cette erreur dans la console.

2023/09/26 13:45:05 Broker: Invalid replication factor
exit status 1

Mes conteneurs Kafka et Zookeeper fonctionnent.

J'ai modifié le fichier docker-compose.yml mais cela n'a pas aidé. J'ai trouvé que mon consumer.Events() est nul mais je ne comprends pas pourquoi cela se produit

Solution de contournement

J'ai copié votre code et il est correct mais le problème dans compose.yml n'est ajusté qu'ADVERTISED_LISTENERS

au lieu de ceci :

KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092

Utilisez ceci :

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer