ビッグデータ時代の到来により、多くの場合、リアルタイム データの処理と分析が必要になります。リアルタイム ストリーム処理テクノロジは、その高性能、高いスケーラビリティ、低遅延により、大規模なリアルタイム データを処理するための主流の方法となっています。リアルタイム ストリーム処理テクノロジでは、Kafka と Flink が一般的なコンポーネントであり、多くのエンタープライズ レベルのデータ処理システムで広く使用されています。この記事では、Beego で Kafka と Flink を使用してリアルタイム ストリーム処理を行う方法を紹介します。
1. Kafka の概要
Apache Kafka は、分散ストリーム処理プラットフォームです。データをストリーム (ストリーミング データ) に分離し、複数のノードにデータを分散することで、高性能、高可用性、高スケーラビリティ、および Exactly-Once 保証などの高度な機能を提供します。 Kafka の主な役割は、分散システム内の複数のコンポーネント間の通信の問題を解決し、メッセージを確実に送信するために使用できる、信頼性の高いメッセージング システムとして機能することです。
2. Flink の概要
Flink は、イベント駆動型の分散型高性能ビッグ データ ストリーム処理フレームワークです。ストリームとバッチ処理をサポートし、SQL のようなクエリとストリーム処理機能を備え、高度に構成可能なストリーミング コンピューティングをサポートし、豊富なウィンドウとデータ ストレージをサポートします。
3. Beego での Kafka
Beego での Kafka の使用は、主に、Kafka コンシューマーと Kafka プロデューサーの 2 つの部分に分かれています。
Beego で Kafka プロデューサーを使用すると、簡単にデータを Kafka クラスターに送信できます。Beego で Kafka プロデューサーを使用する方法は次のとおりです。例:
import ( "github.com/Shopify/sarama" ) func main() { // 创建 kafka 生产者 producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil) if err != nil { // 处理错误情况 panic(err) } // 创建 Kafka 消息 msg := &sarama.ProducerMessage{ Topic: "test", Value: sarama.StringEncoder("Hello, World!"), } // 发送消息 partition, offset, err := producer.SendMessage(msg) if err != nil { // 处理错误情况 panic(err) } fmt.Printf("消息已发送到分区 %d 的偏移量 %d 中 ", partition, offset) // 关闭 Kafka 生产者 producer.Close() }
Beego で Kafka コンシューマーを使用すると、Kafka クラスターからデータを簡単に取得できます。Beego での使用方法は次のとおりです。Kafka コンシューマーの例:
import ( "github.com/Shopify/sarama" ) func main() { // 创建 kafka 消费者 consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { // 处理错误情况 panic(err) } // 订阅 Topic partitions, err := consumer.Partitions("test") if err != nil { // 处理错误情况 panic(err) } for _, partition := range partitions { // 从分区的开头读取数据 partitionConsumer, _ := consumer.ConsumePartition("test", partition, sarama.OffsetOldest) // 处理数据 go func(partitionConsumer sarama.PartitionConsumer) { for { select { case msg := <-partitionConsumer.Messages(): // 处理消息 fmt.Printf("收到消息: %v", string(msg.Value)) } } }(partitionConsumer) } // 关闭 Kafka 消费者 defer consumer.Close() }
4. Beego での Flink
Beego での Flink の使用は、Flink の Java API、Java と Go 間の Cgo 対話を通じて直接実行できます。プロセス全体を完了します。以下は、Flink の簡単な例です。ここでは、各ソケット テキスト単語の頻度がリアルタイム ストリーム処理によって計算されます。この例では、指定されたテキスト データ ストリームを Flink に読み取り、Flink の演算子を使用してデータ ストリームを操作し、最後に結果をコンソールに出力します。
import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.Socket; public class SocketTextStreamFunction implements SourceFunction<String> { private final String hostname; private final int port; public SocketTextStreamFunction(String hostname, int port) { this.hostname = hostname; this.port = port; } public void run(SourceContext<String> context) throws Exception { Socket socket = new Socket(hostname, port); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String line; while ((line = reader.readLine()) != null) { context.collect(line); } reader.close(); socket.close(); } public void cancel() {} }
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class SocketTextStreamWordCount { public static void main(String[] args) throws Exception { String hostname = "localhost"; int port = 9999; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从 Socket 中读取数据流 DataStream<String> text = env.addSource(new SocketTextStreamFunction(hostname, port)); // 计算每个单词的出现频率 DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.toLowerCase().split("\W+"); for (String word : words) { out.collect(new Tuple2<String, Integer>(word, 1)); } } }) .keyBy(0) .timeWindow(Time.seconds(5)) .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { int sum = 0; for (Tuple2<String, Integer> t : input) { sum += t.f1; } out.collect(new Tuple2<String, Integer>((String) key.getField(0), sum)); } }); // 打印到控制台 wordCounts.print(); env.execute("Socket Text Stream Word Count"); } }
5. 結論
この記事では、Beego で Kafka と Flink を使用してリアルタイム ストリーム処理を行う方法を紹介します。 Kafka は、分散システム内の複数のコンポーネント間の通信の問題を解決し、メッセージを確実に送信するための信頼できるメッセージング システムとして使用できます。 Flink は、イベント駆動型の分散型高性能ビッグ データ ストリーム処理フレームワークです。実際のアプリケーションでは、大規模なリアルタイム データ処理の課題を解決するために、特定のニーズに基づいて Kafka や Flink などのテクノロジの使用を柔軟に選択できます。
以上がBeego で Kafka と Flink を使用したリアルタイム ストリーム処理の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。