Rumah > Artikel > pembangunan bahagian belakang > Pemprosesan strim masa nyata menggunakan Kafka dan Flink dalam Beego
Dengan kemunculan era data besar, kami selalunya perlu memproses dan menganalisis data masa nyata. Teknologi pemprosesan strim masa nyata telah menjadi kaedah arus perdana untuk memproses data masa nyata berskala besar kerana prestasinya yang tinggi, berskala tinggi dan kependaman yang rendah. Dalam teknologi pemprosesan strim masa nyata, Kafka dan Flink adalah komponen biasa dan telah digunakan secara meluas dalam banyak sistem pemprosesan data peringkat perusahaan. Dalam artikel ini, kami akan memperkenalkan cara menggunakan Kafka dan Flink dalam Beego untuk pemprosesan strim masa nyata.
1. Pengenalan kepada Kafka
Apache Kafka ialah platform pemprosesan strim teragih. Ia memisahkan data ke dalam strim (data penstriman) dan mengedarkan data merentas berbilang nod, memberikan prestasi tinggi, ketersediaan tinggi, berskala tinggi dan beberapa ciri lanjutan, seperti jaminan Exactly-Once. Peranan utama Kafka adalah sebagai sistem pemesejan yang boleh dipercayai yang boleh digunakan untuk menyelesaikan masalah komunikasi antara pelbagai komponen dalam sistem teragih dan penghantaran mesej yang boleh dipercayai.
2. Pengenalan kepada Flink
Flink ialah rangka kerja pemprosesan strim data besar yang dipacu peristiwa, diedarkan, dan berprestasi tinggi. Ia menyokong pemprosesan strim dan kelompok, mempunyai pertanyaan seperti SQL dan keupayaan pemprosesan strim, menyokong pengkomputeran penstriman yang sangat boleh dikomposisikan, dan mempunyai sokongan storan tetingkap dan data yang kaya.
3. Kafka dalam Beego
Menggunakan Kafka dalam Beego terbahagi kepada dua bahagian, iaitu pengguna Kafka dan pengeluar Kafka.
Menggunakan pengeluar Kafka dalam Beego boleh menghantar data dengan mudah ke gugusan Kafka Berikut ialah cara menggunakan pengeluar Kafka dalam Beego Contoh:
import ( "github.com/Shopify/sarama" ) func main() { // 创建 kafka 生产者 producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil) if err != nil { // 处理错误情况 panic(err) } // 创建 Kafka 消息 msg := &sarama.ProducerMessage{ Topic: "test", Value: sarama.StringEncoder("Hello, World!"), } // 发送消息 partition, offset, err := producer.SendMessage(msg) if err != nil { // 处理错误情况 panic(err) } fmt.Printf("消息已发送到分区 %d 的偏移量 %d 中 ", partition, offset) // 关闭 Kafka 生产者 producer.Close() }
Menggunakan pengguna Kafka dalam Beego boleh mendapatkan data dengan mudah daripada gugusan Kafka Berikut ialah cara menggunakannya dalam Beego Contoh pengguna Kafka:
import ( "github.com/Shopify/sarama" ) func main() { // 创建 kafka 消费者 consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { // 处理错误情况 panic(err) } // 订阅 Topic partitions, err := consumer.Partitions("test") if err != nil { // 处理错误情况 panic(err) } for _, partition := range partitions { // 从分区的开头读取数据 partitionConsumer, _ := consumer.ConsumePartition("test", partition, sarama.OffsetOldest) // 处理数据 go func(partitionConsumer sarama.PartitionConsumer) { for { select { case msg := <-partitionConsumer.Messages(): // 处理消息 fmt.Printf("收到消息: %v", string(msg.Value)) } } }(partitionConsumer) } // 关闭 Kafka 消费者 defer consumer.Close() }
. 4. Flink dalam Beego
Menggunakan Flink dalam Beego boleh dilakukan terus melalui API Java Flink, melalui interaksi Cgo antara Java dan Go Complete keseluruhan proses. Di bawah ialah contoh mudah daripada Flink di mana kekerapan setiap perkataan teks Socket dikira melalui pemprosesan strim masa nyata. Dalam contoh ini, kami membaca aliran data teks yang diberikan ke dalam Flink, kemudian menggunakan operator Flink untuk beroperasi pada aliran data, dan akhirnya mengeluarkan hasilnya ke konsol.
import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.Socket; public class SocketTextStreamFunction implements SourceFunction<String> { private final String hostname; private final int port; public SocketTextStreamFunction(String hostname, int port) { this.hostname = hostname; this.port = port; } public void run(SourceContext<String> context) throws Exception { Socket socket = new Socket(hostname, port); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String line; while ((line = reader.readLine()) != null) { context.collect(line); } reader.close(); socket.close(); } public void cancel() {} }
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class SocketTextStreamWordCount { public static void main(String[] args) throws Exception { String hostname = "localhost"; int port = 9999; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从 Socket 中读取数据流 DataStream<String> text = env.addSource(new SocketTextStreamFunction(hostname, port)); // 计算每个单词的出现频率 DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.toLowerCase().split("\W+"); for (String word : words) { out.collect(new Tuple2<String, Integer>(word, 1)); } } }) .keyBy(0) .timeWindow(Time.seconds(5)) .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { int sum = 0; for (Tuple2<String, Integer> t : input) { sum += t.f1; } out.collect(new Tuple2<String, Integer>((String) key.getField(0), sum)); } }); // 打印到控制台 wordCounts.print(); env.execute("Socket Text Stream Word Count"); } }
5
Artikel ini memperkenalkan cara menggunakan Kafka dan Flink dalam Beego untuk pemprosesan strim masa nyata. Kafka boleh digunakan sebagai sistem pemesejan yang boleh dipercayai untuk menyelesaikan masalah komunikasi antara berbilang komponen dalam sistem teragih dan penghantaran mesej yang boleh dipercayai. Flink ialah rangka kerja pemprosesan strim data besar yang didorong oleh peristiwa, diedarkan, dan berprestasi tinggi. Dalam aplikasi praktikal, kami boleh memilih secara fleksibel untuk menggunakan teknologi seperti Kafka dan Flink berdasarkan keperluan khusus untuk menyelesaikan cabaran dalam pemprosesan data masa nyata berskala besar.Atas ialah kandungan terperinci Pemprosesan strim masa nyata menggunakan Kafka dan Flink dalam Beego. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!