Home > Article > Backend Development > "Invalid replication factor" in Convergence Kafka Go client
php editor Xigua today brings you an article about the "invalid replication factor" in the Kafka Go client. Kafka is a high-performance, scalable distributed stream processing platform, while Go is a concise and efficient programming language. This article will focus on the "invalid replication factor" problem that occurs in the Kafka Go client, explore its causes and solutions, and help readers better understand and deal with this common technical challenge. Invalid replication factors can lead to data inconsistencies and performance degradation, so it is important for Kafka users to understand how to deal with this issue. Let’s explore in depth together!
I am new to Kafka and trying to start my project. I have this
in my docker-compose.ymlversion: '3' services: zookeeper: image: confluentinc/cp-zookeeper:7.3.0 container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:7.3.0 container_name: broker depends_on: - zookeeper ports: - 9092:9092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Then I run my main.go file with producers and consumers and some mock topics.
package main import ( "fmt" "log" "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { topic := "HVSE" p, err := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "client.id": "foo", "acks": "all", }) go func() { consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "group.id": "foo", "auto.offset.reset": "smallest", }) if err != nil { log.Fatal(err) } err = consumer.Subscribe(topic, nil) if err != nil { log.Fatal(err) } for { ev := consumer.Poll(100) // fmt.Println(ev) switch e := ev.(type) { case *kafka.Message: fmt.Printf("consumed message from queue: %s\n", string(e.Value)) case *kafka.Error: fmt.Printf("%v\n", e) // return // default: // fmt.Printf("Ignored %v\n", e) } } }() deliverch := make(chan kafka.Event, 10000) for { err = p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte("FOO"), }, deliverch, ) if err != nil { log.Fatal(err) } <- deliverch time.Sleep(time.Second * 1) } }
If I uncomment the default value, I get into it.
Otherwise I get this error in the console.
2023/09/26 13:45:05 Broker: Invalid replication factor exit status 1
My kafka and Zookeeper containers are running.
I changed the docker-compose.yml file but that didn't help. I found that my consumer.Events() is nil but I don't understand why this is happening
I copied your code and it is correct but in compose.yml The problem only adjusted ADVERTISED_LISTENERS
instead of this:
KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
use this:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
The above is the detailed content of "Invalid replication factor" in Convergence Kafka Go client. For more information, please follow other related articles on the PHP Chinese website!