Home  >  Article  >  Backend Development  >  "Invalid replication factor" in Convergence Kafka Go client

"Invalid replication factor" in Convergence Kafka Go client

王林
王林forward
2024-02-14 10:30:09872browse

汇合 Kafka Go 客户端中的“无效复制因子”

php editor Xigua today brings you an article about the "invalid replication factor" in the Kafka Go client. Kafka is a high-performance, scalable distributed stream processing platform, while Go is a concise and efficient programming language. This article will focus on the "invalid replication factor" problem that occurs in the Kafka Go client, explore its causes and solutions, and help readers better understand and deal with this common technical challenge. Invalid replication factors can lead to data inconsistencies and performance degradation, so it is important for Kafka users to understand how to deal with this issue. Let’s explore in depth together!

Question content

I am new to Kafka and trying to start my project. I have this

in my docker-compose.yml
version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.0
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

Then I run my main.go file with producers and consumers and some mock topics.

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)



func main()  {
    topic := "HVSE"
    p, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "client.id": "foo",
        "acks": "all",
    })
    go func() {
        consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
            "bootstrap.servers":    "localhost:9092",
            "group.id":             "foo",
            "auto.offset.reset":    "smallest",
        })

        if err != nil {
            log.Fatal(err)
        }

        err = consumer.Subscribe(topic, nil)

        if err != nil {
            log.Fatal(err)
        }

        for {
                ev := consumer.Poll(100)
                // fmt.Println(ev)
                switch e := ev.(type) {
                case *kafka.Message:
                        fmt.Printf("consumed message from queue: %s\n", string(e.Value))
                case *kafka.Error:
                        fmt.Printf("%v\n", e)
                        // return
                // default:
                //      fmt.Printf("Ignored %v\n", e)
                }
        }
    }()

    deliverch := make(chan kafka.Event, 10000)
    for {
        err = p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value: []byte("FOO"),
        },
            deliverch,
        )
        if err != nil {
            log.Fatal(err)
        }
    
        <- deliverch
        time.Sleep(time.Second * 1)
    }
}

If I uncomment the default value, I get into it.

Otherwise I get this error in the console.

2023/09/26 13:45:05 Broker: Invalid replication factor
exit status 1

My kafka and Zookeeper containers are running.

I changed the docker-compose.yml file but that didn't help. I found that my consumer.Events() is nil but I don't understand why this is happening

WORKAROUND

I copied your code and it is correct but in compose.yml The problem only adjusted ADVERTISED_LISTENERS

instead of this:

KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092

use this:

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092

The above is the detailed content of "Invalid replication factor" in Convergence Kafka Go client. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:stackoverflow.com. If there is any infringement, please contact admin@php.cn delete