ホームページ >Java >&#&チュートリアル >Java 開発: ストリーム処理とデータ転送に Akka Streams を使用する方法

Java 開発: ストリーム処理とデータ転送に Akka Streams を使用する方法

WBOY
WBOYオリジナル
2023-09-22 08:30:26999ブラウズ

Java开发:如何使用Akka Streams进行流处理和数据传输

Java 開発: ストリーム処理とデータ送信に Akka Streams を使用する方法

はじめに:
ビッグ データとリアルタイム データ処理の急速な発展に伴い、ストリーム処理とデータ送信のニーズが増加しています。 Java 開発において、Akka Streams は、ストリーム処理とデータ転送の実装を簡素化する強力なライブラリです。この記事では、Akka Streams の基本概念と使用法を紹介し、詳細なコード例を示します。

1. Akka Streams の概要:
1.1 Akka Streams とは:
Akka Streams は Akka フレームワークの一部であり、非同期で構成可能かつ監視可能なストリーム処理モデルを提供します。バックプレッシャー メカニズムを使用して、データ ストリームの一貫性のない速度を処理します。 Akka Streams は拡張性と柔軟性が高く、大規模なデータ ストリームを簡単に処理できます。

1.2 基本概念:

  • ソース: ファイル、データベース、ネットワーク接続などのデータ フローのソース。ソースは 0 個以上のデータ要素を発行できます。
  • フロー: フィルタリング、マッピング、集計など、データ フローを操作および変換するコンポーネント。フローは 1 つ以上のデータ要素を受信し、1 つ以上のデータ要素を出力できます。
  • シンク: データ フローのエンド ポイント。ファイル、データベース、ネットワーク接続などが考えられます。エンドポイントはフローで処理されたデータを受け取り、処理します。

2. Akka Streams の使用:
2.1 依存関係の導入:
最初に、Akka Streams の依存関係を Java プロジェクトに導入する必要があります。 pom.xml ファイルに次の依存関係を追加します。

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.12</artifactId>
    <version>2.6.17</version>
</dependency>

2.2 単純なストリーム処理を実装します。
以下では、簡単な例を使用して、ストリーム処理に Akka Streams を使用する方法を示します。

まず、整数を含むデータ ソースを作成します:

Source<Integer, NotUsed> source = Source.range(1, 10);

次に、ソース データを 2 倍するフローを作成します:

Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).map(i -> i * 2);

次に、シンクから受信ストリームを作成します。処理されたデータ:

Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(System.out::println);

ソース、フロー、シンクを結合して完全なストリーム処理を構築します:

RunnableGraph<NotUsed> runnableGraph = source.via(flow).to(sink);

最後に、ストリーム処理を実行します:

CompletionStage<NotUsed> completionStage = runnableGraph.run(materializer);

上記のコードでは、別のコードを使用します。データ ソース、フロー、シンクなどの単純なストリーム処理を実装するために Akka Streams によって提供されるコンポーネント。これらのコンポーネントを接続することで、完全なストリーム処理プロセスを定義して実行できます。

2.3 データ送信の実装:
ストリーム処理に加えて、Akka Streams はデータ送信にも使用できます。以下では、TCP 送信を例として、Akka Streams をデータ送信に使用する方法を示します。

まず、サーバー側のストリーム処理を作成します:

final Flow<ByteString, ByteString, NotUsed> serverFlow = Flow.of(ByteString.class)
    .via(Tcp().delimiter(ByteString.fromString("
"), 256, true))
    .map(ByteString::utf8String)
    .map(s -> s + " processed")
    .map(ByteString::fromString);

次に、サーバーを起動します:

final Source<Tcp.IncomingConnection, CompletionStage<Tcp.ServerBinding>> serverSource =
    Tcp().bind("localhost", 8888);

final Flow<Tcp.IncomingConnection, Tcp.IncomingConnection, NotUsed> handler = Flow.<Tcp.IncomingConnection>create()
    .mapAsync(1, connection -> {
        connection.handleWith(serverFlow, materializer);
        return CompletableFuture.completedFuture(connection);
    });

final CompletionStage<Tcp.ServerBinding> binding =
    serverSource.via(handler).to(Sink.ignore()).run(materializer);

次に、クライアント側のストリーム処理を作成します:

final Sink<ByteString, CompletionStage<Done>> clientSink = Sink.ignore();

final Flow<String, ByteString, CompletionStage<OutgoingConnection>> connectionFlow =
    Tcp().outgoingConnection("localhost", 8888);

final Flow<ByteString, ByteString, CompletionStage<Done>> clientFlow = Flow.of(ByteString.class)
    .via(Tcp().delimiter(ByteString.fromString("
"), 256, true))
    .map(ByteString::utf8String)
    .map(s -> s + " processed")
    .map(ByteString::fromString);

final Flow<String, ByteString, CompletionStage<Tcp.OutgoingConnection>> flow =
    Flow.fromSinkAndSourceMat(clientSink, clientFlow, Keep.right());

CompletableFuture<Tcp.OutgoingConnection> connection =
    Source.single("data").viaMat(connectionFlow, Keep.right()).toMat(flow, Keep.left()).run(materializer);

上記のコードにより、サーバー側のストリーム処理とクライアント側のストリーム処理を作成し、TCPでデータを送信します。サーバー側のストリーム処理では、受信した文字列を処理してクライアントに送信します。クライアント側のストリーム処理では、受信した文字列を処理してサーバーに送信します。

概要:
この記事では、Akka Streams の基本概念と使用法を紹介し、詳細なコード例を示します。 Akka Streams を通じて、ストリーム処理とデータ送信を簡単に実装でき、データ処理の効率とパフォーマンスが向上します。この記事が、Java 開発におけるストリーム処理とデータ送信に Akka Streams を使用するのに役立つことを願っています。

以上がJava 開発: ストリーム処理とデータ転送に Akka Streams を使用する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。