首頁 >Java >java教程 >java9新功能Reactive Stream響應式程式設計API怎麼用

java9新功能Reactive Stream響應式程式設計API怎麼用

王林
王林轉載
2023-05-10 13:46:061563瀏覽

一、Java9 Reactive Stream API

Java 9提供了一組定義響應式串流程式設計的介面。所有這些介面都作為靜態內部介面定義在java.util.concurrent.Flow類別裡面。

java9新功能Reactive Stream響應式程式設計API怎麼用

以下是Java 響應式程式設計中的一些重要角色和概念,先簡單理解一下

發布者(Publisher)是潛在的無限數量的有序資料元素的生產者。它根據收到的需求(subscription)向目前訂閱者發布一定數量的資料元素。

訂閱者(Subscriber)從發布者訂閱並接收資料元素。與發布者建立訂閱關係後,發布者向訂閱者發送訂閱令牌(subscription),訂閱者可以根據自己的處理能力請求發布者發布資料元素的數量。

訂閱令牌(subscription)表示訂閱者與發布者之間建立的訂閱關係。當建立訂閱關係後,發布者將其傳遞給訂閱者。訂閱者使用訂閱令牌與發布者進行交互,例如請求資料元素的數量或取消訂閱。

二、Java響應式程式設計四大介面

2.1.Subscriber Interface(訂閱者訂閱介面)

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}

onSubscribe#:在發布者接受訂閱者的訂閱動作之後,發布任何的訂閱訊息之前被呼叫。新建立的Subscription訂閱令牌物件透過此方法傳遞給訂閱者。

onNext:下一個待處理的資料項目的處理函數

#onError:在發布者或訂閱遇到不可復原的錯誤時呼叫

onComplete:當沒有訂閱者呼叫(包括onNext()方法)發生時呼叫。

2.2.Subscription Interface (訂閱令牌介面)

訂閱令牌物件透過Subscriber.onSubscribe()方法傳遞

public static interface Subscription {    public void request(long n);    public void cancel();}

# request(long n)是無阻塞背壓概念背後的關鍵方法。訂閱者使用它來請求n個以上的消費項目。這樣,訂閱者就控制了它目前能夠接收多少個資料。 cancel()由訂閱者主動來取消其訂閱,取消後將不會在接收任何資料訊息。

2.3.Publisher Interface(發布者介面)

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

呼叫此方法,建立訂閱者Subscriber與發布者Publisher之間的訊息訂閱關係。

2.4.Processor Interface(處理器介面)

處理者Processor 可以同時充當訂閱者和發布者,起到轉換發布者——訂閱者管道中的元素的作用。用於將發布者T類型的資料元素,接收並轉換為類型R的資料並發布。

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

二、實戰案例

現在我們要去實作上面的四個介面來完成響應式程式設計

Subscription Interface訂閱令牌接口通常不需要我們自己編程去實現,我們只需要在知道request()方法和cancle()方法意義即可。

Publisher Interface發布者接口,Java 9 已經預設為我們提供了實作SubmissionPublisher,該實作類別除了實作Publisher介面的方法外,提供了一個方法叫做submit()來完成訊息資料的發送。

Subscriber Interface訂閱者接口,通常需要我們自己去實作。因為在資料訂閱接收之後,不同的業務有不同的處理邏輯。

Processor實際上是Publisher Interface和Subscriber Interface的集合體,有需要資料類型轉換及資料處理的需求才去實現這個介面

下面的例子實現的式字串的資料訊息訂閱處理

實作訂閱者Subscriber Interface

import java.util.concurrent.Flow;
public class MySubscriber implements Flow.Subscriber<String> {
  private Flow.Subscription subscription;  //订阅令牌
  @Override
  public void onSubscribe(Flow.Subscription subscription) {
      System.out.println("订阅关系建立onSubscribe: " + subscription);
      this.subscription = subscription;
      subscription.request(2);
  }
  @Override
  public void onNext(String item) {
      System.out.println("item: " + item);
      // 一个消息处理完成之后,可以继续调用subscription.request(n);向发布者要求数据发送
      //subscription.request(n);
  }
  @Override
  public void onError(Throwable throwable) {
      System.out.println("onError: " + throwable);
  }
  @Override
  public void onComplete() {
      System.out.println("onComplete");
  }
}

SubmissionPublisher訊息發布者

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class SubmissionPublisherExample {
  public static void main(String[] args) throws InterruptedException {
      ExecutorService executor = Executors.newFixedThreadPool(1);
      SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
      sb.subscribe(new MySubscriber());   //建立订阅关系,可以有多个订阅者
      sb.submit("数据 1");  //发送消息1
      sb.submit("数据 2"); //发送消息2
      sb.submit("数据 3"); //发送消息3
      executor.shutdown();
  }
}

控制台列印輸出結果

訂閱關係建立
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 資料1
item: 資料2

#請注意:即使發布者submit了3條數據,MySubscriber也僅收到了2條數據進行了處理。是因為我們在MySubscriber#onSubscribe()方法中使用了subscription.request(2);。這就是「背壓」的響應式程式設計效果,我有能力處理多少數據,就會通知訊息發布者給多少數據。

以上是java9新功能Reactive Stream響應式程式設計API怎麼用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除