Rumah >pembangunan bahagian belakang >Golang >Pemprosesan data masa nyata menggunakan Kafka dan Spark Streaming di Beego
Dengan perkembangan berterusan teknologi Internet dan IoT, jumlah data yang dijana dalam pengeluaran dan kehidupan kami semakin meningkat. Data ini memainkan peranan yang sangat penting dalam strategi perniagaan dan membuat keputusan syarikat. Untuk menggunakan data ini dengan lebih baik, pemprosesan data masa nyata telah menjadi bahagian penting dalam kerja harian perusahaan dan institusi penyelidikan saintifik. Dalam artikel ini, kami akan meneroka cara menggunakan Kafka dan Spark Streaming dalam rangka kerja Beego untuk pemprosesan data masa nyata.
1. Apakah itu Kafka
Kafka ialah sistem baris gilir mesej yang diedarkan tinggi yang digunakan untuk memproses data besar-besaran. Kafka menyimpan data mesej dalam berbilang topik dengan cara yang diedarkan dan boleh diambil dan diedarkan dengan cepat. Dalam senario penstriman data, Kafka telah menjadi salah satu sistem pemesejan sumber terbuka yang paling popular dan digunakan secara meluas oleh banyak syarikat teknologi termasuk LinkedIn, Netflix dan Twitter.
2. Apakah itu Spark Streaming
Spark Streaming ialah komponen dalam ekosistem Apache Spark Ia menyediakan rangka kerja pengkomputeran penstriman yang boleh melakukan pemprosesan kumpulan masa nyata bagi strim data. Spark Streaming sangat berskala dan bertolak ansur dengan kesalahan, serta boleh menyokong berbilang sumber data. Spark Streaming boleh digunakan bersama dengan sistem baris gilir mesej seperti Kafka untuk melaksanakan fungsi pengkomputeran penstriman.
3 Gunakan Kafka dan Spark Streaming dalam Beego untuk pemprosesan data masa nyata
Apabila menggunakan rangka kerja Beego untuk pemprosesan data masa nyata, kami boleh menggabungkan Kafka dan Spark Streaming untuk mencapai penerimaan data dan pemprosesan. Berikut ialah proses pemprosesan data masa nyata yang mudah:
1 Gunakan Kafka untuk mewujudkan baris gilir mesej, merangkum data ke dalam mesej dan menghantarnya ke Kafka.
2. Gunakan Spark Streaming untuk membina aplikasi penstriman dan melanggan data dalam baris gilir mesej Kafka.
3. Untuk data yang dilanggan, kami boleh melakukan pelbagai operasi pemprosesan yang kompleks, seperti pembersihan data, pengagregatan data, pengiraan perniagaan, dsb.
4. Keluarkan hasil pemprosesan kepada Kafka atau paparkannya secara visual kepada pengguna.
Di bawah ini kami akan memperkenalkan secara terperinci cara melaksanakan proses di atas.
1. Wujudkan baris gilir mesej Kafka
Pertama, kami perlu memperkenalkan pakej Kafka ke dalam Beego Anda boleh menggunakan pakej sarama dalam bahasa go dan dapatkannya melalui arahan:
pergi dapatkan gopkg.in/Shopify/sarama.v1
Kemudian, wujudkan baris gilir mesej Kafka dalam Beego dan hantar data yang dijana kepada Kafka. Kod sampel adalah seperti berikut:
func initKafka() (err error) {
//配置Kafka连接属性 config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true //创建Kafka连接器 client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { fmt.Println("failed to create producer, err:", err) return } //异步关闭Kafka defer client.Close() //模拟生成数据 for i := 1; i < 5000; i++ { id := uint32(i) userName := fmt.Sprintf("user:%d", i) //数据转为byte格式发送到Kafka message := fmt.Sprintf("%d,%s", id, userName) msg := &sarama.ProducerMessage{} msg.Topic = "test" //topic消息标记 msg.Value = sarama.StringEncoder(message) //消息数据 _, _, err := client.SendMessage(msg) if err != nil { fmt.Println("send message failed:", err) } time.Sleep(time.Second) } return
}
Dalam kod di atas, kami menggunakan kaedah SyncProducer dalam pakej Sarama untuk mencipta penyambung Kafka dan menetapkan sifat sambungan yang diperlukan. Kemudian gunakan gelung for untuk menjana data, dan merangkum data yang dijana ke dalam mesej dan menghantarnya ke Kafka.
2. Gunakan Spark Streaming untuk pemprosesan data masa nyata
Apabila menggunakan Spark Streaming untuk pemprosesan data masa nyata, kita perlu memasang dan mengkonfigurasi Spark dan Kafka, yang boleh dipasang melalui arahan berikut:
sudo apt-get install spark
sudo apt-get install zookeeper
sudo apt-get install kafka
Selepas melengkapkan pemasangan, kita perlu memperkenalkan Spark Streaming dalam Pakej Beego:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Second, StreamingContext}
import org.apache.spark.kafka.KafkaUtils
Seterusnya, kita perlu memproses strim data. Kod berikut melaksanakan logik untuk menerima data daripada Kafka dan memproses setiap mesej:
func main() {
//创建SparkConf对象 conf := SparkConf().setAppName("test").setMaster("local[2]") //创建StreamingContext对象,设置1秒钟处理一次 ssc := StreamingContext(conf, Seconds(1)) //从Kafka中订阅test主题中的数据 zkQuorum := "localhost:2181" group := "test-group" topics := map[string]int{"test": 1} directKafkaStream, err := KafkaUtils.CreateDirectStream(ssc, topics, zkQuorum, group) if err != nil { panic(err) } lines := directKafkaStream.Map(func(message *sarama.ConsumerMessage) (string, int) { //从消息中解析出需要的数据 data := message.Value arr := strings.Split(string(data), ",") id, _ := strconv.Atoi(arr[0]) name := arr[1] return name, 1 }) //使用reduceByKey函数对数据进行聚合计算 counts := lines.ReduceByKey(func(a, b int) int { return a + b }) counts.Print() //开启流式处理 ssc.Start() ssc.AwaitTermination()
}
Dalam kod di atas, kami Menggunakan SparkConf kaedah dan kaedah StreamingContext untuk mencipta konteks Spark Streaming dan menetapkan selang masa pemprosesan aliran data. Kemudian kami melanggan data dalam baris gilir mesej Kafka, menggunakan kaedah Peta untuk menghuraikan data yang diperlukan daripada mesej yang diterima, dan kemudian menggunakan kaedah ReduceByKey untuk melaksanakan pengiraan pengagregatan data. Akhirnya, hasil pengiraan dicetak ke konsol.
4. Ringkasan
Artikel ini memperkenalkan cara menggunakan Kafka dan Spark Streaming dalam rangka kerja Beego untuk pemprosesan data masa nyata. Dengan mewujudkan baris gilir mesej Kafka dan menggunakan Spark Streaming untuk memproses aliran data, proses pemprosesan data masa nyata yang diperkemas dan cekap boleh dicapai. Kaedah pemprosesan ini telah digunakan secara meluas dalam pelbagai bidang dan menyediakan rujukan penting untuk membuat keputusan korporat.
Atas ialah kandungan terperinci Pemprosesan data masa nyata menggunakan Kafka dan Spark Streaming di Beego. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!