ホームページ >Java >&#&チュートリアル >java9 Reactive Stream 応答性プログラミング API の新機能の使用方法

java9 Reactive Stream 応答性プログラミング API の新機能の使用方法

王林
王林転載
2023-05-10 13:46:061525ブラウズ

1. Java9 Reactive Stream API

Java 9 は、リアクティブ ストリーム プログラミングを定義する一連のインターフェイスを提供します。これらすべてのインターフェースは、java.util.concurrent.Flow クラスで静的内部インターフェースとして定義されます。

java9 Reactive Stream 応答性プログラミング API の新機能の使用方法

Java リアクティブ プログラミングにおける重要な役割と概念を以下に示します。まず簡単に理解しましょう。

パブリッシャー (発行者) は、潜在的に無制限の数のプロデューサーです。順序付けされたデータ要素。受信した要求 (サブスクリプション) に基づいて、特定の数のデータ要素を現在のサブスクライバーに公開します。

サブスクライバーは、パブリッシャーからデータ要素をサブスクライブして受信します。パブリッシャーとのサブスクリプション関係を確立した後、パブリッシャーはサブスクリプション トークン (サブスクリプション) をサブスクライバーに送信し、サブスクライバーは自身の処理能力に基づいてパブリッシャーによってパブリッシュされたデータ要素の数を要求できます。

サブスクリプション トークン (サブスクリプション) は、サブスクライバーとパブリッシャーの間に確立されたサブスクリプション関係を表します。サブスクリプション関係が確立されると、パブリッシャーはそれをサブスクライバーに渡します。サブスクライバーはサブスクリプション トークンを使用して、データ要素の数の要求やサブスクライブの解除など、パブリッシャーと対話します。

2. Java 応答プログラミングの 4 つの主要なインターフェイス

2.1. サブスクライバー インターフェイス

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}

onSubscribe: サブスクライバーのサブスクリプション後にパブリッシャーが Called を受け入れるときアクションを実行し、サブスクリプション メッセージを公開する前に実行します。新しく作成された Subscription サブスクリプション トークン オブジェクトは、このメソッドを通じてサブスクライバーに渡されます。

onNext: 次に処理されるデータ項目の処理関数

onError: パブリッシャーまたはサブスクリプションで回復不能なエラーが発生したとき Call

onComplete: サブスクライバ呼び出し (onNext() メソッドを含む) が発生しないときに呼び出されます。

2.2.サブスクリプション インターフェイス (サブスクリプション トークン インターフェイス)

サブスクリプション トークン オブジェクトは、Subscriber.onSubscribe() メソッド

public static interface Subscription {    public void request(long n);    public void cancel();}

## を通じて渡されます。 # request(long n) は、ノンブロッキング バックプレッシャーの概念の背後にある重要なメソッドです。加入者はこれを使用して、n 個を超える消費アイテムをリクエストします。このようにして、加入者は現在受信できるデータ量を制御します。 cancel()サブスクライバは自らサブスクリプションをキャンセルします。キャンセル後はデータ メッセージを受信しなくなります。

2.3.パブリッシャー インターフェイス

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

このメソッドを呼び出して、サブスクライバーとパブリッシャーの間のメッセージ サブスクリプション関係を確立します。

2.4.プロセッサ インターフェイス

プロセッサはサブスクライバとパブリッシャーの両方として機能し、パブリッシャーとサブスクライバーのパイプラインで要素を変換する役割を果たします。パブリッシャーからタイプ T のデータ要素を受信して​​タイプ R のデータに変換し、それらをパブリッシュするために使用されます。

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

2. 実際のケース

リアクティブ プログラミングを完了するには、上記の 4 つのインターフェイスを実装する必要があります

サブスクリプション インターフェイスサブスクリプション トークン インターフェイス通常はこれを実行します。これを実装するために自分自身をプログラムする必要はありません。 request() メソッドと cancel() メソッドの意味を知るだけで済みます。

Publisher インターフェイスPublisher インターフェイスである Java 9 では、デフォルトで SubmissionPublisher の実装が提供されています。Publisher インターフェイスを実装するメソッドに加えて、この実装クラスは submit( ) メッセージデータの送信が完了します。

サブスクライバ インターフェイスサブスクライバ インターフェイスは通常、自分で実装する必要があります。データサブスクリプションを受け取った後の処理ロジックは企業ごとに異なるためです。

Processor は、実際にはパブリッシャー インターフェイスとサブスクライバー インターフェイスのコレクションです。このインターフェイスは、データ型の変換とデータ処理が必要な場合にのみ実装する必要があります。

次の例実装されています 文字列データ メッセージのサブスクリプション処理

サブスクライバー サブスクライバー インターフェイスの実装

import java.util.concurrent.Flow;
public class MySubscriber implements Flow.Subscriber<String> {
  private Flow.Subscription subscription;  //订阅令牌
  @Override
  public void onSubscribe(Flow.Subscription subscription) {
      System.out.println("订阅关系建立onSubscribe: " + subscription);
      this.subscription = subscription;
      subscription.request(2);
  }
  @Override
  public void onNext(String item) {
      System.out.println("item: " + item);
      // 一个消息处理完成之后,可以继续调用subscription.request(n);向发布者要求数据发送
      //subscription.request(n);
  }
  @Override
  public void onError(Throwable throwable) {
      System.out.println("onError: " + throwable);
  }
  @Override
  public void onComplete() {
      System.out.println("onComplete");
  }
}

SubmissionPublisher メッセージ パブリッシャー

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class SubmissionPublisherExample {
  public static void main(String[] args) throws InterruptedException {
      ExecutorService executor = Executors.newFixedThreadPool(1);
      SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
      sb.subscribe(new MySubscriber());   //建立订阅关系,可以有多个订阅者
      sb.submit("数据 1");  //发送消息1
      sb.submit("数据 2"); //发送消息2
      sb.submit("数据 3"); //发送消息3
      executor.shutdown();
  }
}

コンソールの出力結果

#サブスクリプション関係の確立
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39

item: データ 1
item: データ 2

注意: パブリッシャーが 3 つの部分を送信した後であったとしてものデータのうち、MySubscriber は処理のために 2 つのデータのみを受信しました。これは、
MySubscriber#onSubscribe()

メソッドで subscription.request(2); を使用したためです。これは「バック プレッシャー」のリアクティブ プログラミング効果です。処理できるデータ量に応じて、メッセージ発行者にどのくらいのデータを提供するかを通知します。

以上がjava9 Reactive Stream 応答性プログラミング API の新機能の使用方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。