ホームページ >バックエンド開発 >Golang >Beego で Kafka と Flink を使用したリアルタイム ストリーム処理

Beego で Kafka と Flink を使用したリアルタイム ストリーム処理

WBOY
WBOYオリジナル
2023-06-22 16:18:331516ブラウズ

ビッグデータ時代の到来により、多くの場合、リアルタイム データの処理と分析が必要になります。リアルタイム ストリーム処理テクノロジは、その高性能、高いスケーラビリティ、低遅延により、大規模なリアルタイム データを処理するための主流の方法となっています。リアルタイム ストリーム処理テクノロジでは、Kafka と Flink が一般的なコンポーネントであり、多くのエンタープライズ レベルのデータ処理システムで広く使用されています。この記事では、Beego で Kafka と Flink を使用してリアルタイム ストリーム処理を行う方法を紹介します。

1. Kafka の概要

Apache Kafka は、分散ストリーム処理プラットフォームです。データをストリーム (ストリーミング データ) に分離し、複数のノードにデータを分散することで、高性能、高可用性、高スケーラビリティ、および Exactly-Once 保証などの高度な機能を提供します。 Kafka の主な役割は、分散システム内の複数のコンポーネント間の通信の問題を解決し、メッセージを確実に送信するために使用できる、信頼性の高いメッセージング システムとして機能することです。

2. Flink の概要

Flink は、イベント駆動型の分散型高性能ビッグ データ ストリーム処理フレームワークです。ストリームとバッチ処理をサポートし、SQL のようなクエリとストリーム処理機能を備え、高度に構成可能なストリーミング コンピューティングをサポートし、豊富なウィンドウとデータ ストレージをサポートします。

3. Beego での Kafka

Beego での Kafka の使用は、主に、Kafka コンシューマーと Kafka プロデューサーの 2 つの部分に分かれています。

  1. Kafka プロデューサー

Beego で Kafka プロデューサーを使用すると、簡単にデータを Kafka クラスターに送信できます。Beego で Kafka プロデューサーを使用する方法は次のとおりです。例:

import (
    "github.com/Shopify/sarama"
)

func main() {
    // 创建 kafka 生产者
    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)

    if err != nil {
        // 处理错误情况
        panic(err)
    }

    // 创建 Kafka 消息
    msg := &sarama.ProducerMessage{
        Topic: "test",
        Value: sarama.StringEncoder("Hello, World!"),
    }

    // 发送消息
    partition, offset, err := producer.SendMessage(msg)

    if err != nil {
        // 处理错误情况
        panic(err)
    }

    fmt.Printf("消息已发送到分区 %d 的偏移量 %d 中
", partition, offset)

    // 关闭 Kafka 生产者
    producer.Close()
}
  1. Kafka Consumer

Beego で Kafka コンシューマーを使用すると、Kafka クラスターからデータを簡単に取得できます。Beego での使用方法は次のとおりです。Kafka コンシューマーの例:

import (
    "github.com/Shopify/sarama"
)

func main() {
    // 创建 kafka 消费者
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)

    if err != nil {
        // 处理错误情况
        panic(err)
    }

    // 订阅 Topic
    partitions, err := consumer.Partitions("test")

    if err != nil {
        // 处理错误情况
        panic(err)
    }

    for _, partition := range partitions {
        // 从分区的开头读取数据
        partitionConsumer, _ := consumer.ConsumePartition("test", partition, sarama.OffsetOldest)

        // 处理数据
        go func(partitionConsumer sarama.PartitionConsumer) {
            for {
                select {
                case msg := <-partitionConsumer.Messages():
                    // 处理消息
                    fmt.Printf("收到消息: %v", string(msg.Value))
                }
            }
        }(partitionConsumer)
    }

    // 关闭 Kafka 消费者
    defer consumer.Close()
}

4. Beego での Flink

Beego での Flink の使用は、Flink の Java API、Java と Go 間の Cgo 対話を通じて直接実行できます。プロセス全体を完了します。以下は、Flink の簡単な例です。ここでは、各ソケット テキスト単語の頻度がリアルタイム ストリーム処理によって計算されます。この例では、指定されたテキスト データ ストリームを Flink に読み取り、Flink の演算子を使用してデータ ストリームを操作し、最後に結果をコンソールに出力します。

  1. Socket テキスト データ ソースの作成
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

public class SocketTextStreamFunction implements SourceFunction<String> {
    private final String hostname;
    private final int port;

    public SocketTextStreamFunction(String hostname, int port) {
        this.hostname = hostname;
        this.port = port;
    }

    public void run(SourceContext<String> context) throws Exception {
        Socket socket = new Socket(hostname, port);
        BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        String line;
        while ((line = reader.readLine()) != null) {
            context.collect(line);
        }
        reader.close();
        socket.close();
    }

    public void cancel() {}
}
  1. 各単語の頻度の計算
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception {
        String hostname = "localhost";
        int port = 9999;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从 Socket 中读取数据流
        DataStream<String> text = env.addSource(new SocketTextStreamFunction(hostname, port));

        // 计算每个单词的出现频率
        DataStream<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] words = value.toLowerCase().split("\W+");
                        for (String word : words) {
                            out.collect(new Tuple2<String, Integer>(word, 1));
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
                    public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
                        int sum = 0;
                        for (Tuple2<String, Integer> t : input) {
                            sum += t.f1;
                        }
                        out.collect(new Tuple2<String, Integer>((String) key.getField(0), sum));
                    }
                });

        // 打印到控制台
        wordCounts.print();

        env.execute("Socket Text Stream Word Count");
    }
}

5. 結論

この記事では、Beego で Kafka と Flink を使用してリアルタイム ストリーム処理を行う方法を紹介します。 Kafka は、分散システム内の複数のコンポーネント間の通信の問題を解決し、メッセージを確実に送信するための信頼できるメッセージング システムとして使用できます。 Flink は、イベント駆動型の分散型高性能ビッグ データ ストリーム処理フレームワークです。実際のアプリケーションでは、大規模なリアルタイム データ処理の課題を解決するために、特定のニーズに基づいて Kafka や Flink などのテクノロジの使用を柔軟に選択できます。

以上がBeego で Kafka と Flink を使用したリアルタイム ストリーム処理の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。