Rumah >pembangunan bahagian belakang >Golang >Sambung Kafka dengan Golang

Sambung Kafka dengan Golang

WBOY
WBOYasal
2024-09-06 22:30:32565semak imbas

pengenalan

Jika anda perlu mengetahui asas Kafka, seperti ciri utama, komponen dan kelebihannya, saya ada artikel yang membincangkannya di sini. Sila semak dan ikuti langkah-langkah sehingga anda menyelesaikan pemasangan Kafka menggunakan Docker untuk meneruskan bahagian berikut.

Connect Kafka with Golang

Bersambung ke Kafka dengan Golang

Sama seperti contoh dalam artikel tentang menyambungkan Kafka dengan NodeJS, kod sumber ini juga termasuk dua bahagian: memulakan pengeluar untuk menghantar mesej ke Kafka dan menggunakan pengguna untuk melanggan mesej daripada topik.

Saya akan memecahkan kod kepada bahagian yang lebih kecil untuk pemahaman yang lebih baik. Mula-mula, mari kita takrifkan nilai pembolehubah.

package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)

- Di sini, pakej github.com/confluentinc/confluent-kafka-go/kafka digunakan untuk menyambung ke Kafka.

- broker ialah alamat hos; jika anda menggunakan ZooKeeper, gantikan alamat hos dengan sewajarnya.

- groupId dan topik boleh ditukar mengikut keperluan.

Seterusnya ialah memulakan penerbit.

func startProducer() {
  p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
  if err != nil {
    panic(err)
  }

  go func() {
    for e := range p.Events() {
      switch ev := e.(type) {
      case *kafka.Message:
        if ev.TopicPartition.Error != nil {
          fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
        } else {
          fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
        }
      }
    }
  }()

  for _, word := range []string{"message 1", "message 2", "message 3"} {
    p.Produce(&kafka.Message{
      TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
      Value:          []byte(word),
    }, nil)
  }
}

Kod di atas digunakan untuk menghantar tatasusunan mesej {"mesej 1", "mesej 2", "mesej 3"} kepada topik dan menggunakan pergi rutin untuk mengulangi acara dengan untuk e := julat p.Events() dan mencetak hasil penghantaran, sama ada ia berjaya atau gagal.

Seterusnya ialah mencipta pengguna untuk melanggan topik dan menerima mesej.

func startConsumer() {
  c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": broker,
    "group.id":          groupId,
    "auto.offset.reset": "earliest",
  })

  if err != nil {
    panic(err)
  }
  c.Subscribe(topic, nil)

  for {
    msg, err := c.ReadMessage(-1)
    if err == nil {
      fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    } else {
      fmt.Printf("Consumer error: %v (%v)\n", err, msg)
      break
    }
  }

  c.Close()
}

Akhir sekali, kerana ini adalah contoh mudah, panggil fungsi untuk mencipta pengeluar dan pengguna untuk digunakan. Dalam senario dunia sebenar, penggunaan pengeluar dan pengguna biasanya dilakukan pada dua pelayan berbeza dalam sistem perkhidmatan mikro.

func main() {
  startProducer()
  startConsumer()
}

Connect Kafka with Golang

Selamat mengekod!


Jika anda mendapati kandungan ini membantu, sila lawati artikel asal di blog saya untuk menyokong pengarang dan meneroka kandungan yang lebih menarik.

Connect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with Golang


Sesetengah siri yang mungkin anda rasa menarik:

  • NodeJS
  •  Bertindak balas
  • Doker 
  • Kubernetes

Atas ialah kandungan terperinci Sambung Kafka dengan Golang. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn