Rumah  >  Artikel  >  pembangunan bahagian belakang  >  Laksanakan baris gilir mesej menggunakan kafka dalam Beego

Laksanakan baris gilir mesej menggunakan kafka dalam Beego

WBOY
WBOYasal
2023-06-22 21:57:081043semak imbas

Dalam aplikasi web moden, pemesejan yang cekap adalah bahagian yang sangat penting. Baris gilir mesej ialah penyelesaian untuk penghantaran mesej tak segerak antara sistem yang berbeza, yang boleh mengoptimumkan penghantaran data dan kecekapan pemprosesan. Dalam bahasa Go, rangka kerja Beego ialah rangka kerja web yang sangat popular yang menyokong pembangunan aplikasi web dan API. Dalam artikel ini, kami akan meneroka cara melaksanakan baris gilir mesej menggunakan kafka dalam Beego untuk penghantaran mesej yang cekap.

1. Pengenalan kepada Kafka

Kafka ialah sistem baris gilir mesej yang diedarkan, dibahagikan, berbilang salinan, pada asalnya dibangunkan oleh LinkedIn dan kemudian diselenggara oleh Apache Software Foundation. Kafka digunakan terutamanya untuk memproses sejumlah besar data masa nyata, menyokong pemesejan pemprosesan tinggi, dan juga menyokong pelbagai aplikasi merentas berbilang pengguna dan pengeluar.

Konsep teras Kafka ialah topik, sekatan dan ofset. Topik merujuk kepada klasifikasi mesej, dan setiap mesej tergolong dalam topik tertentu. Partition ialah subset topik, dan setiap partition ialah baris gilir mesej yang teratur dan tidak berubah. Setiap partition boleh direplikasi merentas berbilang pelayan untuk menyokong berbilang pengguna memproses partition yang sama secara serentak. Offset ialah nilai yang mengenal pasti setiap mesej secara unik. Pengguna boleh menentukan ofset khusus untuk mula membaca mesej daripada.

2. Menggunakan Kafka dalam Beego

  1. Memasang Kafka

Memasang kafka adalah sangat mudah. ​​Anda hanya perlu memuat turun pakej termampat dari tapak web rasmi daripada kafka dan nyahzipnya Hanya pergi ke direktori yang ditentukan. Contoh menggunakan versi kafka_2.12-2.3.0.

  1. Mencipta topik dan partition

Sebelum anda mula menggunakan kafka, anda perlu mencipta topik dan partition baharu. Anda boleh menggunakan alat pengurusan Kafka sendiri (kafka-topics.sh) untuk mencipta topik dan partition. Jalankan arahan berikut dalam baris arahan:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Arahan ini akan mencipta topik bernama "ujian" dengan hanya satu partition dan nombor sandaran 1. Anda boleh menukar bilangan partition dan sandaran mengikut keperluan anda.

  1. Buat penerbit

Langkah-langkah untuk mencipta pengeluar kafka adalah seperti berikut:

package main

import (
    "github.com/Shopify/sarama"
)

func main() {
    // 设置kafka配置
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true

    // 新建生产者
    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(err)
    }

    // 构造消息
    message := &sarama.ProducerMessage{
        Topic: "test",
        Value: sarama.StringEncoder("test message"),
    }

    // 发送消息
    _, _, err = producer.SendMessage(message)
    if err != nil {
        panic(err)
    }

    producer.Close()
}

Antaranya, sarama ialah perpustakaan pelanggan bahasa Go untuk menyambung dan Mengendalikan kelompok kafka. Dalam kod di atas, kami mencipta objek SyncProducer baharu dan kemudian menghantar mesej kepada topik "ujian".

  1. Mewujudkan pengguna

Langkah-langkah untuk mencipta pengguna kafka adalah seperti berikut:

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
    "log"
    "os"
    "os/signal"
)

func main() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true

    // 新建一个消费者
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(err)
    }

    // 准备订阅话题
    topic := "test"
    partitionList, err := consumer.Partitions(topic)
    if err != nil {
        panic(err)
    }

    // 启动goroutine处理消息
    for _, partition := range partitionList {
        // 构造一个partitionConsumer
        pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
        if err != nil {
            panic(err)
        }

        go func(partitionConsumer sarama.PartitionConsumer) {
            defer func() {
                // 关闭consumer
                if err := partitionConsumer.Close(); err != nil {
                    log.Fatalln(err)
                }
            }()
            for msg := range partitionConsumer.Messages() {
                fmt.Printf("Partition:%d Offset:%d Key:%s Value:%s
",
                    msg.Partition, msg.Offset, msg.Key, msg.Value)
            }
        }(pc)
    }

    // 处理中断信号
    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, os.Interrupt)
    <-sigterm

    fmt.Println("Shutdown")
    consumer.Close()
}

Kod di atas mencipta objek pengguna baharu dan melanggan ia "menguji" topik. Kemudian, berbilang goroutine dimulakan untuk memproses mesej daripada partition berbeza secara serentak. Selepas mesej diproses, kaedah Close() dipanggil untuk menutup pengguna.

3. Ringkasan

Dalam artikel ini, kami memperkenalkan cara menggunakan kafka untuk melaksanakan baris gilir mesej dalam Beego. Ini berguna untuk aplikasi web yang perlu memproses data pemprosesan tinggi. Dengan menggunakan Kafka, kami boleh menghantar mesej secara tidak segerak antara berbilang pengguna dan pengeluar untuk memaksimumkan pemindahan data dan kecekapan pemprosesan. Jika anda sedang membangunkan aplikasi Beego dan memerlukan pemesejan yang cekap, Kafka ialah pilihan terbaik.

Atas ialah kandungan terperinci Laksanakan baris gilir mesej menggunakan kafka dalam Beego. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn