Rumah  >  Artikel  >  pembangunan bahagian belakang  >  Laksanakan kafka dengan golang

Laksanakan kafka dengan golang

王林
王林asal
2023-05-10 13:18:371472semak imbas

Apabila seni bina aplikasi peringkat perusahaan menjadi semakin kompleks, pemesejan telah menjadi komponen penting. Ini adalah apabila Kafka menjadi tumpuan. Kafka ialah baris gilir mesej edaran yang cekap dan boleh dipercayai yang menyokong penerbitan dan langganan mesej Ia adalah sistem pemesejan peringkat perusahaan moden dengan daya pemprosesan yang sangat tinggi dan kependaman yang rendah. Dalam API Kafka, walaupun pelanggan rasmi menyediakan berbilang bahasa, Golang telah semakin digunakan secara meluas sejak beberapa tahun kebelakangan ini, jadi artikel ini menggunakan Golang sebagai bahasa pelaksanaan untuk menerangkan cara menggunakan Golang untuk melaksanakan Kafka.

1. Ketergantungan

Sebelum bermula, anda perlu memuat turun kebergantungan yang diperlukan:

  • sarama: Pustaka pelanggan Golang Kafka
  • pkg /errors : Merangkumkan pakej ralat pustaka standard Go

Kaedah penggunaan khusus adalah seperti berikut:

go get github.com/Shopify/sarama
go get github.com / pkg/errors

2 Buat pengeluar

Sebelum memperkenalkan API Kafka, anda perlu mencipta contoh pengeluar terlebih dahulu. Kod pengeluar adalah seperti berikut:

package main

import (
    "fmt"
    "time"

    "github.com/pkg/errors"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create producer"))
    }
    defer producer.Close()

    for i := 0; i < 10; i++ {
        message := &sarama.ProducerMessage{
            Topic: "test_topic",
            Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)),
        }
        partition, offset, err := producer.SendMessage(message)
        if err != nil {
            fmt.Println(errors.Wrapf(err, "failed to send message: %s", message))
        } else {
            fmt.Printf("message sent to partition %d at offset %d
", partition, offset)
        }

        time.Sleep(500 * time.Millisecond) // 延迟发送
    }
}

Kod terutamanya melakukan perkara berikut:

  • Konfigurasikan pengeluar: Tetapkan konfigurasi pengeluar dan nyatakan kaedah pembahagian sebagai Pembahagian rawak memerlukan menunggu semua nod ISR untuk mengesahkan mesej sebelum kembali, dan mengembalikan Partition dan Offset selepas penghantaran berjaya.
  • Buat pengeluar: Buat contoh pengeluar dengan alamat dan konfigurasi broker yang ditentukan.
  • Hantar mesej: Buat mesej dengan subjek dan kandungan mesej, dan hantarkannya.
  • Hasil output: cetak keputusan, rekod partition mesej dan offset.

3. Buat pengguna

Kedua, anda perlu mencipta contoh pengguna. Kod pengguna adalah seperti berikut:

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"

    "github.com/Shopify/sarama"
    "github.com/pkg/errors"
)

func main() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true

    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create consumer"))
    }
    defer consumer.Close()

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    partitions, err := consumer.Partitions("test_topic")
    if err != nil {
        panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic"))
    }

    ctx, cancel := context.WithCancel(context.Background())

    for _, partition := range partitions {
        go func(partition int32) {
            partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest)
            if err != nil {
                fmt.Printf("failed to create partition consumer for partition %d: %s
", partition, err)
                return
            }
            defer partitionConsumer.Close()

            for {
                select {
                case msg := <-partitionConsumer.Messages():
                    fmt.Printf("Consumed message from partition %d at offset %d: %s
", msg.Partition, msg.Offset, msg.Value)
                case <-signals:
                    cancel()
                    return
                case err := <-partitionConsumer.Errors():
                    fmt.Printf("Consumed error from partition %d: %s
", partition, err)
                case <-ctx.Done():
                    return
                }
            }
        }(partition)
    }

    <-signals
    fmt.Println("Shutting down consumer")
}

Kod terutamanya melakukan perkara berikut:

  • Konfigurasikan pengguna: Konfigurasikan pengguna dan tetapkan suis pemulangan ralat.
  • Buat pengguna: Buat contoh pengguna berdasarkan alamat dan konfigurasi broker yang ditentukan.
  • Dapatkan partition: Dapatkan partition topik yang ditentukan.
  • Penggunaan: Buka goroutine untuk setiap partition untuk penggunaan berasingan.
  • Hasil output: Cetak mesej yang digunakan.

4. Ringkasan

Di atas, kami menggunakan Golang untuk melaksanakan bahagian pengeluar dan pengguna Kafka Sebagai salah satu komponen penting dalam merealisasikan sistem teragih, Kafka boleh menyelesaikan mesej The sistem mempunyai masalah dalam persekitaran selaras dan teragih yang tinggi, dan Kafka juga mempunyai dokumentasi sokongan yang baik dan komuniti yang stabil, menjadikannya bebas tekanan untuk digunakan dalam pembangunan sebenar.

Atas ialah kandungan terperinci Laksanakan kafka dengan golang. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn