推薦教學:《java影片教學》
#java響應式程式設計是什麼?
java響應式程式設計是
響應式程式設計
作為響應式程式設計方向上的第一步,微軟在.NET生態系統中創建了Rx函式庫(Reactive Extensions)。 RxJava是在JVM上對它的實作。
響應式程式設計是一個非同步程式設計範式,通常出現在物件導向的語言中,作為觀察者模式的一個擴展。
它關注數據的流動、變化的傳播。這意味著可以輕易地使用程式語言表示靜態(如陣列)或動態(如事件發射源)資料流。
響應式串流
隨著時間的推移,一個專門為Java的標準化出現了。它是一個規範,定義了一些介面和互動規則,用於JVM平台上的響應式函式庫。它就是響應式流(Reactive Streams),它的這些介面已經被整合到Java 9裡,在java.util.concurrent.Flow這個父類別裡。響應式流和迭代器較相似,不過迭代器是基於「拉」(pull)的,而響應式流是基於「推」(push)的。迭代器的使用其實是命令式編程,因為由開發者決定何時調用next()來取得下一個元素。在響應式串流中,與上面等價的是發布者-訂閱者。但當有新的可用元素時,是由發布者推給訂閱者的。這個「推」就是響應式的關鍵。
另外,被推過來元素的操作也是以宣告的方式進行的,程式設計師只需表達做什麼就行了,不需要管怎麼做。
發布者使用onNext方法向訂閱者推送新元素,使用onError方法告知錯誤,使用onComplete方法告知已經結束。
可見,錯誤處理和完成(結束)也是以一個很好的方式被處理。錯誤和結束都可以終止序列。
這種方式非常有彈性。這種模式支援0個(沒有)元素/1個元素/n(多)個元素(包括無限序列,如果滴答的鐘錶)這些情況。
Reactor粉墨登場
Reactor是第四代響應式函式庫,是一個響應式程式設計範式的實現,用於在JVM平台上基於響應式串流規範構建非阻塞異步應用。
它大大實現了JVM上響應式串流的規範(http://www.reactive-streams.org/)。
它是一個完全非阻塞響應式程式設計的基石,帶有高效需求管理(以管理「後壓」的形式)。
它直接整合Java函式API,特別是CompletableFuture,Stream和Duration。
它支援使用reactor-netty工程實現非阻塞跨進程通信,適合微服務架構,支援HTTP(包括Websockets),TCP和UDP。
註:Reactor要Java 8
講了這麼多,是不是要先思考下,為什麼我們需要這樣一個非同步的響應式函式庫?
阻塞就是浪費
現代的應用能達到非常多的並髮用戶,即使現代硬體的能力被持續改進,現代軟體的性能仍然是一個關鍵的關注點
大體上有兩種方式可以改進一個程式的效能:
1、並行化,使用更多的執行緒和更多的硬體資源
#2 、提高效率,在目前資源用量的情況下尋求更高效率
通常,Java開發者使用阻塞程式碼來寫程式。這種實踐性很好,直到遇到效能瓶頸。
此時會引入額外線程,並執行相似的阻塞程式碼。但是這種擴展方法在資源利用方面會引起爭論和導致並發問題。
更糟的是,阻塞浪費資源。如果你仔細看,一旦一個程式涉及到一些延遲(特別是I/O,像資料庫請求或網路呼叫),資源就被浪費,因為線程現在是空閒的,在等待資料。
所以並行化方式不是銀彈。我們有必要讓硬體發揮完全的力量,但是關於資源浪費的影響和原因也是非常複雜的。
非同步性來營救
前面提到的第二種方式是尋求更高效率,可以作為資源浪費問題的一個解決方案。
透過寫非同步非阻塞程式碼,你能讓執行切換到其它活動的任務,使用相同的底層資源,稍後再回到目前的處理上。
但是要如何產生非同步程式碼到JVM上呢? Java提供兩種非同步程式設計模型:
1、Callbacks,非同步方法沒有回傳值,但是會帶一個回調,當結果可用時回調會被呼叫。
2、Futures,非同步方法立即傳回一個Future8742468051c85b06f0a0af9e3e506b5c,非同步處理過程就是計算一個T值,使用Future物件包裝了對它的存取。這個值不是立即可用的,該物件可以被輪詢來查看T值是否可用。
這兩種技術都夠好嗎?並不是對每種情況都是的,兩種方式都有限制。
回呼比較難於組合在一起,很快就會導致程式碼難以閱讀和維護(眾所周知的「回調地獄」)。
與回呼相比,Futures稍微好一點,但仍然在組合方面做得不好。組合多個Futures物件到一起是可行的但是並不容易。
Future
也有其它問題,很容易因為呼叫了get()方法造成了另一個阻塞。
另外,它不支援延遲計算,缺乏對多個值的支持,缺乏高階錯誤處理。
從命令式到響應式程式設計
像Reactor這樣的響應式函式庫的目標就是解決在JVM上「傳統」非同步方式的弊端,同時也關注一些額外方面:
可組合性和可讀性。
資料作為流,被豐富的操作符操作,什麼都不會發生,直到你訂閱,後壓,消費者通知生產者發射的速率太快了,高級別而不是高數值抽象。
可組合性和可讀性
可組合性,其實就是編排多個非同步任務的能力,使前一個任務的結果作為後續任務的輸入,或以fork-join(分叉-合併)的方式執行若干個任務,或在更高的層級重複利用這些非同步任務。
任務編排的能力和程式碼的可讀性和可維護性緊密地耦合在一起。隨著非同步處理在數量和複雜度上的增加,組合和閱讀程式碼變得更加困難。
就像我們看到的,回呼模型雖然簡單,但是當回呼裡嵌套回調,達到多層時就會變成回調地獄。
Reactor提供豐富的組合選項,使嵌套層級最小,讓程式碼的組織結構能反映出在進行什麼樣的抽象處理,且通常保持在同層級上。
組裝線類比
你可以認為響應式應用程式處理資料就像透過一個組裝(生產)線。 Reactor既是傳送帶也是工作站。
原材料從一個來源(原始發布者)持續不斷地獲取,以一個完成的產品被推送給消費者(訂閱者)結束。
原料可以經過許多不同的轉換,如其它的中間步驟,或是一個更大裝配線的一部分。
如果在某個地方出現一個小故障或阻塞了,出問題的工作站可以向上游發出通知來限制原材料的流動(速率)。
運算子
在Reactor裡,運算子就是組裝線類比中的工作站。每一個操作符都會為一個發布者添加某些行為,把上一步的發布者包裝到一個新的實例裡。整個鏈就是這樣被連結起來的。
所以資料一開始從第一個發布者出來,然後沿著鏈往下游移動,並且被每個連結轉換。最後,一個訂閱者結束了這個處理。
響應式流規範並沒有明確規定操作符,不過Reactor就提供了豐富的操作符,它們涉及到很多方面,從簡單的轉換、過濾到複雜的編排、錯誤處理。
只要不訂閱,就什麼都不發生
當你寫一個發布者鏈時,默認,數據是不會開始進入鏈中的。相反,你只是創建了非同步處理的抽象描述。
透過訂閱這個行為(動作),才把發布者和訂閱者連結起來,然後才會觸發資料在鏈中流動。
這是在內部實現好的,透過來自訂閱者的request訊號往上游傳播,一路逆流而上直到最開始的發布者那裡。
Reactor核心特性
Reactor引入可組合響應式的類型,實現了發布者接口,但也提供了豐富的操作符,就是Flux和Mono。
Flux
,流動,表示0到N個元素。
Mono
,單個,表示0或1個元素。
它們之間的差異主要在語意上,表示非同步處理的粗略基數。
如一個http請求只會產生一個回應,把它表示為Monob9c6304980a6413006d7713f6edf94ae顯然更有意義,而且它只提供相對於0/1這樣上下文的操作符,因為此時count操作顯然沒有太大意義。
運算子可以改變處理的最大基數,也會切換到相關型別上。如count運算子雖然存在於Flux8742468051c85b06f0a0af9e3e506b5c上,但它的回傳值卻是個Monoad84a9ca022f0fe586e370cd3963bff2。
Flux8742468051c85b06f0a0af9e3e506b5c
一個Flux8742468051c85b06f0a0af9e3e506b5c是一個標準的Publisher8742468051c85b06f0a0af9e3e506b5c,表示一個非同步序列,可以發射0到N個元素,可以透過一個完成訊號或錯誤訊號終止。
就像在響應式流規範裡那樣,這3種類型的訊號轉化為對一個下游訂閱者的onNext,onComplete,onError3個方法的呼叫。
這3個方法也可以理解為事件/回調,而且它們都是可選的。
如沒有onNext但有onComplete,表示一個空的有限序列。既沒有onNext也沒有onComplete,表示一個空的無限序列(沒有什麼實際用途,可用於測試)。
無限序列也沒有必要是空的,如Flux.interval(Duration)產生一個Fluxad84a9ca022f0fe586e370cd3963bff2 ,它是無限的,從鐘錶裡發射出的規則的「嘀嗒」。
Mono8742468051c85b06f0a0af9e3e506b5c
一個Mono8742468051c85b06f0a0af9e3e506b5c是一個特殊的Publisher8742468051c85b06f0a0af9e3e506b5c,最多發射一個元素,可以使用onComplete訊號或onError訊號來終止。
它提供的運算子只是Flux提供的子集,同樣,有些運算子(如把Mono和Publisher結合)可以把它切換到一個Flux。
如Mono#concatWith(Publisher)回傳一個Flux,然而Mono#then(Mono)回傳的是另一個Mono。
Mono可以用來表示沒有回傳值的非同步處理(與Runnable相似),用Mono903bf37051cf83cbd9686768ac0189ae表示。
建立Flux或Mono,並訂閱它們
最容易的方式就是使用它們各自的工廠方法:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar"); List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux<String> seq2 = Flux.fromIterable(iterable); Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); Mono<String> noData = Mono.empty(); Mono<String> data = Mono.just("foo");
當談到訂閱時,可以使用Java 8的lambda表達式,訂閱方法有多種不同的變體,帶有不同的回呼。
下面是方法簽章:
//订阅并触发序列 subscribe(); //可以对每一个产生的值进行处理 subscribe(Consumer<? super T> consumer); //还可以响应一个错误 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); //还可以在成功结束后执行一些代码 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); //还可以对Subscription执行一些操作 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
使用Disposable取消訂閱
這些基於lambda的訂閱方法都會傳回一個Disposable類型,透過呼叫它的dispose()來取消這個訂閱。
對於Flux和Mono,取消就是一個訊號,表示來源應該停止生產元素。然而,不保證立即生效,一些來源可能生產元素非常快,以致於還沒有收到取消訊號就已經生產完了。
推薦相關文章:《java開發教學》
以上是java響應式程式設計是什麼?的詳細內容。更多資訊請關注PHP中文網其他相關文章!