Home  >  Article  >  Backend Development  >  Implement kafka with golang

Implement kafka with golang

王林
王林Original
2023-05-10 13:18:371473browse

As enterprise-level application architectures become increasingly complex, message transmission has become a crucial component. This is when Kafka comes to the fore. Kafka is an efficient and reliable distributed message queue that supports message publishing and subscription. It is a modern enterprise-level messaging system with very high throughput and low latency. In Kafka's API, although the official client provides multiple languages, Golang has become more and more widely used in recent years, so this article uses Golang as the implementation language to explain how to use Golang to implement Kafka.

1. Dependencies

Before you start, you need to download the required dependencies:

  • sarama: Golang Kafka client library
  • pkg /errors: Encapsulate the error package of the Go standard library

The specific usage method is as follows:

go get github.com/Shopify/sarama
go get github.com/ pkg/errors

2. Create a producer

Before introducing Kafka’s API, you need to create a producer instance first. The code of the producer is as follows:

package main

import (
    "fmt"
    "time"

    "github.com/pkg/errors"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create producer"))
    }
    defer producer.Close()

    for i := 0; i < 10; i++ {
        message := &sarama.ProducerMessage{
            Topic: "test_topic",
            Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)),
        }
        partition, offset, err := producer.SendMessage(message)
        if err != nil {
            fmt.Println(errors.Wrapf(err, "failed to send message: %s", message))
        } else {
            fmt.Printf("message sent to partition %d at offset %d
", partition, offset)
        }

        time.Sleep(500 * time.Millisecond) // 延迟发送
    }
}

The code mainly does the following things:

  • Configure the producer: Set the configuration of the producer and specify the partitioning method as random Partitioning requires waiting for all ISR nodes to confirm the message before returning, and returning Partition and Offset after successful transmission.
  • Create a producer: Create a producer instance with the specified broker address and configuration.
  • Send message: Create a message containing the message subject and content, and send it.
  • Output results: print results, record message partition and offset.

3. Create a consumer

Secondly, you need to create a consumer instance. The consumer code is as follows:

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"

    "github.com/Shopify/sarama"
    "github.com/pkg/errors"
)

func main() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true

    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create consumer"))
    }
    defer consumer.Close()

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    partitions, err := consumer.Partitions("test_topic")
    if err != nil {
        panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic"))
    }

    ctx, cancel := context.WithCancel(context.Background())

    for _, partition := range partitions {
        go func(partition int32) {
            partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest)
            if err != nil {
                fmt.Printf("failed to create partition consumer for partition %d: %s
", partition, err)
                return
            }
            defer partitionConsumer.Close()

            for {
                select {
                case msg := <-partitionConsumer.Messages():
                    fmt.Printf("Consumed message from partition %d at offset %d: %s
", msg.Partition, msg.Offset, msg.Value)
                case <-signals:
                    cancel()
                    return
                case err := <-partitionConsumer.Errors():
                    fmt.Printf("Consumed error from partition %d: %s
", partition, err)
                case <-ctx.Done():
                    return
                }
            }
        }(partition)
    }

    <-signals
    fmt.Println("Shutting down consumer")
}

The code mainly does the following things:

  • Configure the consumer: Configure the consumer and set the error return switch.
  • Create a consumer: Create a consumer instance based on the specified broker address and configuration.
  • Get partition: Get the partition of the specified topic.
  • Consumption: Open a goroutine for each partition for separate consumption.
  • Output results: Print out the consumed messages.

4. Summary

Above, we used Golang to implement the producer and consumer parts of Kafka. As one of the important components of realizing a distributed system, Kafka can solve messages The system has problems in high concurrency and distributed environments, and Kafka also has good support documentation and a stable community, making it stress-free to apply in actual development.

The above is the detailed content of Implement kafka with golang. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Previous article:redis golang batch queryNext article:redis golang batch query