首頁  >  文章  >  Java  >  java響應式程式設計是什麼?

java響應式程式設計是什麼?

coldplay.xixi
coldplay.xixi原創
2020-06-22 11:47:123396瀏覽

java響應式程式設計是什麼?

推薦教學:《java影片教學

#java響應式程式設計是什麼?

java響應式程式設計是

響應式程式設計

作為響應式程式設計方向上的第一步,微軟在.NET生態系統中創建了Rx函式庫(Reactive Extensions)。 RxJava是在JVM上對它的實作。

響應式程式設計是一個非同步程式設計範式,通常出現在物件導向的語言中,作為觀察者模式的一個擴展。

它關注數據的流動、變化的傳播。這意味著可以輕易地使用程式語言表示靜態(如陣列)或動態(如事件發射源)資料流。

響應式串流

      隨著時間的推移,一個專門為Java的標準化出現了。它是一個規範,定義了一些介面和互動規則,用於JVM平台上的響應式函式庫。它就是響應式流(Reactive Streams),它的這些介面已經被整合到Java 9裡,在java.util.concurrent.Flow這個父類別裡。響應式流和迭代器較相似,不過迭代器是基於「拉」(pull)的,而響應式流是基於「推」(push)的。迭代器的使用其實是命令式編程,因為由開發者決定何時調用next()來取得下一個元素。在響應式串流中,與上面等價的是發布者-訂閱者。但當有新的可用元素時,是由發布者推給訂閱者的。這個「推」就是響應式的關鍵。

        另外,被推過來元素的操作也是以宣告的方式進行的,程式設計師只需表達做什麼就行了,不需要管怎麼做。

發布者使用onNext方法向訂閱者推送新元素,使用onError方法告知錯誤,使用onComplete方法告知已經結束。

        可見,錯誤處理和完成(結束)也是以一個很好的方式被處理。錯誤和結束都可以終止序列。

         這種方式非常有彈性。這種模式支援0個(沒有)元素/1個元素/n(多)個元素(包括無限序列,如果滴答的鐘錶)這些情況。

Reactor粉墨登場

Reactor是第四代響應式函式庫,是一個響應式程式設計範式的實現,用於在JVM平台上基於響應式串流規範構建非阻塞異步應用。

它大大實現了JVM上響應式串流的規範(http://www.reactive-streams.org/)。

它是一個完全非阻塞響應式程式設計的基石,帶有高效需求管理(以管理「後壓」的形式)。

它直接整合Java函式API,特別是CompletableFuture,Stream和Duration。

它支援使用reactor-netty工程實現非阻塞跨進程通信,適合微服務架構,支援HTTP(包括Websockets),TCP和UDP。

註:Reactor要Java 8

講了這麼多,是不是要先思考下,為什麼我們需要這樣一個非同步的響應式函式庫?

阻塞就是浪費

現代的應用能達到非常多的並髮用戶,即使現代硬體的能力被持續改進,現代軟體的性能仍然是一個關鍵的關注點

大體上有兩種方式可以改進一個程式的效能:

1、並行化,使用更多的執行緒和更多的硬體資源

#2 、提高效率,在目前資源用量的情況下尋求更高效率

通常,Java開發者使用阻塞程式碼來寫程式。這種實踐性很好,直到遇到效能瓶頸。

此時會引入額外線程,並執行相似的阻塞程式碼。但是這種擴展方法在資源利用方面會引起爭論和導致並發問題。

更糟的是,阻塞浪費資源。如果你仔細看,一旦一個程式涉及到一些延遲(特別是I/O,像資料庫請求或網路呼叫),資源就被浪費,因為線程現在是空閒的,在等待資料。

所以並行化方式不是銀彈。我們有必要讓硬體發揮完全的力量,但是關於資源浪費的影響和原因也是非常複雜的。

非同步性來營救

前面提到的第二種方式是尋求更高效率,可以作為資源浪費問題的一個解決方案。

透過寫非同步非阻塞程式碼,你能讓執行切換到其它活動的任務,使用相同的底層資源,稍後再回到目前的處理上。

但是要如何產生非同步程式碼到JVM上呢? Java提供兩種非同步程式設計模型:

1、Callbacks,非同步方法沒有回傳值,但是會帶一個回調,當結果可用時回調會被呼叫。

2、Futures,非同步方法立即傳回一個Future8742468051c85b06f0a0af9e3e506b5c,非同步處理過程就是計算一個T值,使用Future物件包裝了對它的存取。這個值不是立即可用的,該物件可以被輪詢來查看T值是否可用。

這兩種技術都夠好嗎?並不是對每種情況都是的,兩種方式都有限制。

回呼比較難於組合在一起,很快就會導致程式碼難以閱讀和維護(眾所周知的「回調地獄」)。

與回呼相比,Futures稍微好一點,但仍然在組合方面做得不好。組合多個Futures物件到一起是可行的但是並不容易。

Future也有其它問題,很容易因為呼叫了get()方法造成了另一個阻塞。

另外,它不支援延遲計算,缺乏對多個值的支持,缺乏高階錯誤處理。

從命令式到響應式程式設計

像Reactor這樣的響應式函式庫的目標就是解決在JVM上「傳統」非同步方式的弊端,同時也關注一些額外方面:

可組合性和可讀性。

資料作為流,被豐富的操作符操作,什麼都不會發生,直到你訂閱,後壓,消費者通知生產者發射的速率太快了,高級別而不是高數值抽象。

可組合性和可讀性

可組合性,其實就是編排多個非同步任務的能力,使前一個任務的結果作為後續任務的輸入,或以fork-join(分叉-合併)的方式執行若干個任務,或在更高的層級重複利用這些非同步任務。

任務編排的能力和程式碼的可讀性和可維護性緊密地耦合在一起。隨著非同步處理在數量和複雜度上的增加,組合和閱讀程式碼變得更加困難。

就像我們看到的,回呼模型雖然簡單,但是當回呼裡嵌套回調,達到多層時就會變成回調地獄。

Reactor提供豐富的組合選項,使嵌套層級最小,讓程式碼的組織結構能反映出在進行什麼樣的抽象處理,且通常保持在同層級上。

組裝線類比

你可以認為響應式應用程式處理資料就像透過一個組裝(生產)線。 Reactor既是傳送帶也是工作站。

原材料從一個來源(原始發布者)持續不斷地獲取,以一個完成的產品被推送給消費者(訂閱者)結束。

原料可以經過許多不同的轉換,如其它的中間步驟,或是一個更大裝配線的一部分。

如果在某個地方出現一個小故障或阻塞了,出問題的工作站可以向上游發出通知來限制原材料的流動(速率)。

運算子

在Reactor裡,運算子就是組裝線類比中的工作站。每一個操作符都會為一個發布者添加某些行為,把上一步的發布者包裝到一個新的實例裡。整個鏈就是這樣被連結起來的。

所以資料一開始從第一個發布者出來,然後沿著鏈往下游移動,並且被每個連結轉換。最後,一個訂閱者結束了這個處理。

響應式流規範並沒有明確規定操作符,不過Reactor就提供了豐富的操作符,它們涉及到很多方面,從簡單的轉換、過濾到複雜的編排、錯誤處理。

只要不訂閱,就什麼都不發生

當你寫一個發布者鏈時,默認,數據是不會開始進入鏈中的。相反,你只是創建了非同步處理的抽象描述。

透過訂閱這個行為(動作),才把發布者和訂閱者連結起來,然後才會觸發資料在鏈中流動。

這是在內部實現好的,透過來自訂閱者的request訊號往上游傳播,一路逆流而上直到最開始的發布者那裡。

Reactor核心特性

Reactor引入可組合響應式的類型,實現了發布者接口,但也提供了豐富的操作符,就是Flux和Mono。

Flux,流動,表示0到N個元素。

Mono,單個,表示0或1個元素。

它們之間的差異主要在語意上,表示非同步處理的粗略基數。

如一個http請求只會產生一個回應,把它表示為Monob9c6304980a6413006d7713f6edf94ae顯然更有意義,而且它只提供相對於0/1這樣上下文的操作符,因為此時count操作顯然沒有太大意義。

運算子可以改變處理的最大基數,也會切換到相關型別上。如count運算子雖然存在於Flux8742468051c85b06f0a0af9e3e506b5c上,但它的回傳值卻是個Monoad84a9ca022f0fe586e370cd3963bff2。

Flux8742468051c85b06f0a0af9e3e506b5c

一個Flux8742468051c85b06f0a0af9e3e506b5c是一個標準的Publisher8742468051c85b06f0a0af9e3e506b5c,表示一個非同步序列,可以發射0到N個元素,可以透過一個完成訊號或錯誤訊號終止。

就像在響應式流規範裡那樣,這3種類型的訊號轉化為對一個下游訂閱者的onNext,onComplete,onError3個方法的呼叫。

這3個方法也可以理解為事件/回調,而且它們都是可選的。

如沒有onNext但有onComplete,表示一個空的有限序列。既沒有onNext也沒有onComplete,表示一個空的無限序列(沒有什麼實際用途,可用於測試)。

無限序列也沒有必要是空的,如Flux.interval(Duration)產生一個Fluxad84a9ca022f0fe586e370cd3963bff2 ,它是無限的,從鐘錶裡發射出的規則的「嘀嗒」。

Mono8742468051c85b06f0a0af9e3e506b5c

一個Mono8742468051c85b06f0a0af9e3e506b5c是一個特殊的Publisher8742468051c85b06f0a0af9e3e506b5c,最多發射一個元素,可以使用onComplete訊號或onError訊號來終止。

它提供的運算子只是Flux提供的子集,同樣,有些運算子(如把Mono和Publisher結合)可以把它切換到一個Flux。

如Mono#concatWith(Publisher)回傳一個Flux,然而Mono#then(Mono)回傳的是另一個Mono。

Mono可以用來表示沒有回傳值的非同步處理(與Runnable相似),用Mono903bf37051cf83cbd9686768ac0189ae表示。

建立Flux或Mono,並訂閱它們

最容易的方式就是使用它們各自的工廠方法:

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
Mono<String> noData = Mono.empty();
Mono<String> data = Mono.just("foo");

當談到訂閱時,可以使用Java 8的lambda表達式,訂閱方法有多種不同的變體,帶有不同的回呼。

下面是方法簽章:

//订阅并触发序列
subscribe(); 
//可以对每一个产生的值进行处理
subscribe(Consumer<? super T> consumer); 
//还可以响应一个错误
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 
//还可以在成功结束后执行一些代码
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 
//还可以对Subscription执行一些操作
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer);

使用Disposable取消訂閱

這些基於lambda的訂閱方法都會傳回一個Disposable類型,透過呼叫它的dispose()來取消這個訂閱。

對於Flux和Mono,取消就是一個訊號,表示來源應該停止生產元素。然而,不保證立即生效,一些來源可能生產元素非常快,以致於還沒有收到取消訊號就已經生產完了。

推薦相關文章:《java開發教學

以上是java響應式程式設計是什麼?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn