ホームページ >Java >&#&チュートリアル >Javaリアクティブプログラミングとは何ですか?
推奨チュートリアル: 「java ビデオ チュートリアル 」
Java レスポンシブ プログラミングとは何ですか?
Java リアクティブ プログラミングは
レスポンシブ プログラミング
リアクティブ プログラミングの方向性の最初のものとして最初のステップとして、Microsoft は .NET エコシステムに Rx ライブラリ (Reactive Extensions) を作成しました。 RxJava は JVM 上での実装です。
リアクティブ プログラミングは、通常、オブザーバー パターンの拡張としてオブジェクト指向言語に現れる非同期プログラミング パラダイムです。
データの流れと変更の伝播に焦点を当てています。これは、静的 (配列など) または動的 (イベント エミッターなど) のデータ フローをプログラミング言語を使用して簡単に表現できることを意味します。
レスポンシブ ストリーミング
時間が経つにつれ、Java に特化した標準化が行われました。これは、JVM プラットフォーム上のリアクティブ ライブラリのいくつかのインターフェイスと相互作用ルールを定義する仕様です。これは Reactive Streams であり、そのインターフェイスは Java 9 の親クラス java.util.concurrent.Flow に統合されています。リアクティブ ストリームはイテレータに似ていますが、イテレータは「プル」に基づいているのに対し、リアクティブ ストリームは「プッシュ」に基づいています。次の要素を取得するために next() をいつ呼び出すかは開発者次第であるため、イテレータの使用は実際には命令型プログラミングです。リアクティブ ストリーミングでは、上記に相当するのはパブリッシャーとサブスクライバーです。ただし、新しい要素が利用可能になると、パブリッシャーによってサブスクライバーにプッシュされます。この「プッシュ」が応答性の鍵となります。
また、プッシュされた要素の操作も宣言的に行われるため、プログラマは何を行うかを表現するだけでよく、どのように行うかを考える必要がありません。
パブリッシャーは、onNext メソッドを使用して新しい要素をサブスクライバーにプッシュし、onError メソッドを使用してエラーを通知し、onComplete メソッドを使用して終了を通知します。
エラー処理や完了(終了)もうまく処理されていることがわかります。エラーと終了の両方でシーケンスを終了できます。
この方法は非常に柔軟です。このモードは、要素 0 (なし) / 要素 1 / n (多数) のケースをサポートします (時計が動いている場合、無限シーケンスを含む)。
Reactor のデビュー
Reactor は第 4 世代のリアクティブ ライブラリであり、リアクティブ プログラミング パラダイムの実装であり、リアクティブ ストリーム仕様に基づいて JVM プラットフォーム上に構築するために使用されます。 - 非同期アプリケーションのブロック。
JVM (http://www.reactive-streams.org/) 上のリアクティブ ストリームの仕様を大幅に実装します。
これは、(「バック プレッシャー」の管理という形で) 効率的な需要管理を備えた、完全にノンブロッキングなリアクティブ プログラミングの基礎です。
Java 関数 API、特に CompletableFuture、Stream、Duration を直接統合します。
これは、reactor-netty プロジェクトを使用してノンブロッキングのクロスプロセス通信を実現することをサポートしており、マイクロサービス アーキテクチャに適しており、HTTP (Websocket を含む)、TCP、UDP をサポートしています。
注: Reactor には Java 8 が必要です
ここまで述べてきましたが、まず、なぜこのような非同期リアクティブ ライブラリが必要なのかを考えるべきでしょうか?
ブロックは無駄です
最新のアプリケーションは非常に多くの同時ユーザーにアクセスできます。最新のハードウェアの機能が継続的に向上しているとしても、最新のソフトウェアのパフォーマンスはまだ重要な懸念点
プログラムのパフォーマンスを向上させるには、一般に 2 つの方法があります:
1. より多くのスレッドとより多くのハードウェア リソースを使用する並列化
2、効率を改善し、現在のリソース使用量の下でより高い効率を追求します
通常、Java 開発者はプログラムを作成するためにブロッキング コードを使用します。これは、パフォーマンスのボトルネックに達するまではうまく機能します。
現時点では、同様のブロック コードを実行するために追加のスレッドが導入されます。ただし、このスケーリング手法は議論の余地があり、リソース使用率の点で同時実行の問題が発生する可能性があります。
さらに悪いことに、ブロックするとリソースが無駄になります。よく見ると、プログラムに何らかの遅延 (特にデータベース要求やネットワーク呼び出しなどの I/O) が発生すると、スレッドがアイドル状態になってデータを待機するため、リソースが無駄に消費されます。
つまり、並列化は特効薬ではありません。ハードウェアの能力を最大限に発揮させることが必要ですが、リソースの浪費による影響や原因も非常に複雑です。
非同期性による解決策
前述の 2 番目の方法は、より高い効率を追求することであり、これはリソースの無駄の問題の解決策として使用できます。
非同期のノンブロッキング コードを記述することで、同じ基盤リソースを使用して実行を他のアクティブなタスクに切り替え、後で現在のプロセスに戻ることができます。
しかし、JVM に非同期コードを生成するにはどうすればよいでしょうか? Java は 2 つの非同期プログラミング モデルを提供します:
1. コールバック: 非同期メソッドには戻り値はありませんが、コールバックが発生します。コールバックは結果が利用可能になると呼び出されます。
2. Futures、非同期メソッドは即座に Future8742468051c85b06f0a0af9e3e506b5c を返します。非同期処理プロセスでは、T 値を計算し、Future オブジェクトを使用してそのアクセスをラップします。この値はすぐには使用できません。オブジェクトをポーリングして、T 値が使用可能かどうかを確認できます。
どちらのテクノロジーも十分に優れていますか?あらゆる状況に完璧に対応できるわけではなく、どちらの方法にも制限があります。
コールバックはまとめるのが難しく、すぐに読みにくく保守しにくいコードになってしまう可能性があります (よく知られている「コールバック地獄」)。
Future はコールバックよりもわずかに優れていますが、それでも構成がうまくいきません。複数の Futures オブジェクトを結合することは可能ですが、簡単ではありません。
将来
他にも問題があり、get() メソッドの呼び出しにより別のブロックが発生しやすくなります。
さらに、遅延計算はサポートされておらず、複数の値のサポートがなく、高度なエラー処理もありません。
命令型プログラミングからリアクティブ プログラミングへ
Reactor のようなリアクティブ ライブラリの目標は、JVM 上の「従来の」非同期アプローチの欠点を解決すると同時に、次の点にも注意を払うことです。追加の側面:
構成性と読みやすさ。
ストリームとしてのデータは、豊富なオペレーターによって操作され、サブスクライブしてポストプレスし、消費者が排出速度が速すぎること、高度な数値抽象化ではなく高レベルであることをプロデューサーに通知するまで、何も起こりません。
構成可能性と可読性
構成可能性とは、実際には、複数の非同期タスクを調整して、前のタスクの結果を後続のタスクの入力として使用できるようにする機能です。フォーク結合 (フォークマージ) 方式で複数のタスクを実行するか、これらの非同期タスクをより高いレベルで再利用します。
タスク オーケストレーションの機能は、コードの可読性と保守性と密接に関係しています。非同期処理の数と複雑さが増加するにつれて、コードの作成と読み取りがより困難になります。
ご覧のとおり、コールバック モデルはシンプルですが、コールバックがコールバック内にネストされ、複数のレベルに達すると、コールバック地獄となります。
Reactor は、ネスト レベルを最小限に抑えるための豊富な組み合わせオプションを提供します。これにより、コードの組織構造は、処理される抽象化の種類を反映でき、通常は同じレベルに保たれます。
組立ラインのアナロジー
データを処理するリアクティブ アプリケーションは、データが組立 (生産) ラインを通過するかのように考えることができます。 Reactor はベルトコンベアでもあり、ワークステーションでもあります。
原材料はソース (オリジナルの発行者) から継続的に取得され、最終的に完成した製品が消費者 (購読者) にプッシュされます。
原材料は、さまざまな変換を経たり、他の中間ステップになったり、大規模な組立ラインの一部になったりすることがあります。
どこかに不具合や障害がある場合、問題のワークステーションは上流に通知を送信して、原材料のフロー (速度) を制限することができます。
オペレーター
Reactor では、オペレーターは組立ラインに例えるとワークステーションです。各オペレーターはパブリッシャーに何らかの動作を追加し、以前のパブリッシャーを新しいインスタンスにラップします。このようにしてチェーン全体がリンクされます。
つまり、データは最初に最初のパブリッシャーから取得され、その後チェーンに沿って下流に移動し、各リンクによって変換されます。最後に、加入者はプロセスを終了します。
リアクティブ ストリームの仕様では演算子が明確に規定されていませんが、Reactor は単純な変換やフィルタリングから複雑なオーケストレーションやエラー処理まで、多くの側面を含む豊富な演算子のセットを提供します。
サブスクライブしない限り、何も起こりません
パブリッシャー チェーンを作成する場合、デフォルトでは、データはチェーンに入力され始めません。代わりに、非同期処理の抽象的な記述を作成するだけです。
この動作 (アクション) をサブスクライブすることにより、パブリッシャーとサブスクライバーが接続され、データがチェーン内を流れるようにトリガーされます。
これは内部で実装され、サブスクライバーからのリクエスト信号を通じて上流に伝播し、元のパブリッシャーまでずっと上流に伝播します。
Reactor の主な機能
Reactor は、コンポーザブルなリアクティブ型を導入し、パブリッシャー インターフェイスを実装しますが、Flux と Mono という豊富な演算子のセットも提供します。
Flux
(フロー) は、0 から N 個の要素を表します。
Mono
(単一) は、0 または 1 つの要素を表します。
これらの違いは主に意味的なものであり、非同期処理の大まかな濃度を示しています。
http リクエストが 1 つの応答のみを生成する場合、Monob9c6304980a6413006d7713f6edf94ae として表現する方が明らかに意味があり、カウント操作は明らかに 0/1 などのコンテキストに関連する演算子のみを提供します。あまり意味がありません。
演算子は、処理の最大カーディナリティを変更でき、関連するタイプにも切り替えます。たとえば、Flux8742468051c85b06f0a0af9e3e506b5c には count 演算子が存在しますが、その戻り値は Monoad84a9ca022f0fe586e370cd3963bff2 です。
Flux8742468051c85b06f0a0af9e3e506b5c
A Flux8742468051c85b06f0a0af9e3e506b5c は標準の Publisher8742468051c85b06f0a0af9e3e506b5c であり、0 から N 個の要素を発行できる非同期シーケンスを表します。完了信号またはエラー信号。
リアクティブ ストリーミング仕様と同様に、これら 3 種類の信号は、ダウンストリーム サブスクライバの onNext、onComplete、および onError メソッドへの呼び出しに変換されます。
これら 3 つのメソッドはイベント/コールバックとしても理解でき、すべてオプションです。
onNext がなく、onComplete がある場合、それは空の有限シーケンスを表します。 onNext も onComplete もなく、空の無限シーケンス (実用的ではなく、テストに使用できます) を示します。
無限シーケンスは空である必要はありません。たとえば、Flux.interval(Duration) は、無限でクロックから発せられる通常の「ティック」である Fluxad84a9ca022f0fe586e370cd3963bff2 を生成します。
Mono8742468051c85b06f0a0af9e3e506b5c
Mono8742468051c85b06f0a0af9e3e506b5c は、最大 1 つの要素を発行し、onComplete 信号またはonError 信号。
提供されるオペレーターは、Flux が提供するオペレーターのサブセットにすぎません。同様に、一部のオペレーター (Mono と Publisher の組み合わせなど) は、Flux に切り替えることができます。
たとえば、Mono#concatWith(Publisher) は Flux を返しますが、Mono#then(Mono) は別の Mono を返します。
Mono は、Mono903bf37051cf83cbd9686768ac0189ae で表される、戻り値のない非同期処理 (Runnable と同様) を表すために使用できます。
Flux または Mono を作成してサブスクライブする
最も簡単な方法は、それぞれのファクトリ メソッドを使用することです:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar"); List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux<String> seq2 = Flux.fromIterable(iterable); Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); Mono<String> noData = Mono.empty(); Mono<String> data = Mono.just("foo");
サブスクリプションに関しては、Java を使用できます。 8 ラムダ式、サブスクリプション メソッドには、さまざまなコールバックを備えたさまざまなバリエーションが多数あります。
次はメソッド シグネチャです:
//订阅并触发序列 subscribe(); //可以对每一个产生的值进行处理 subscribe(Consumer<? super T> consumer); //还可以响应一个错误 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); //还可以在成功结束后执行一些代码 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); //还可以对Subscription执行一些操作 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
Disposable を使用してサブスクライブを解除します
これらのラムダベースのサブスクリプション メソッドはすべて、そのメソッドを呼び出すことによって、Disposable 型を返します。このサブスクリプションをキャンセルするには、dispose() を使用します。
Flux と Mono の場合、キャンセルはソースが要素の生成を停止する必要があるという信号です。ただし、即時効果は保証されておらず、ソースによっては要素の生成が非常に早く、キャンセル信号を受信する前に終了してしまう場合もあります。
おすすめ関連記事:「java開発チュートリアル」
以上がJavaリアクティブプログラミングとは何ですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。