Heim  >  Artikel  >  Java  >  So nutzen Sie die neue Funktion der Java9 Reactive Stream Responsive Programming API

So nutzen Sie die neue Funktion der Java9 Reactive Stream Responsive Programming API

王林
王林nach vorne
2023-05-10 13:46:061470Durchsuche

1. Java9 Reactive Stream API

Java 9 bietet eine Reihe von Schnittstellen, die die reaktive Stream-Programmierung definieren. Alle diese Schnittstellen sind in der Klasse java.util.concurrent.Flow als statische interne Schnittstellen definiert. java.util.concurrent.Flow类里面。

So nutzen Sie die neue Funktion der Java9 Reactive Stream Responsive Programming API

下面是Java 响应式编程中的一些重要角色和概念,先简单理解一下

发布者(Publisher)是潜在的无限数量的有序数据元素的生产者。 它根据收到的需求(subscription)向当前订阅者发布一定数量的数据元素。

订阅者(Subscriber)从发布者那里订阅并接收数据元素。与发布者建立订阅关系后,发布者向订阅者发送订阅令牌(subscription),订阅者可以根据自己的处理能力请求发布者发布数据元素的数量。

订阅令牌(subscription)表示订阅者与发布者之间建立的订阅关系。 当建立订阅关系后,发布者将其传递给订阅者。 订阅者使用订阅令牌与发布者进行交互,例如请求数据元素的数量或取消订阅。

二、Java响应式编程四大接口

2.1.Subscriber Interface(订阅者订阅接口)

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}

onSubscribe:在发布者接受订阅者的订阅动作之后,发布任何的订阅消息之前被调用。新创建的Subscription订阅令牌对象通过此方法传递给订阅者。

onNext:下一个待处理的数据项的处理函数

onError:在发布者或订阅遇到不可恢复的错误时调用

onComplete:当没有订阅者调用(包括onNext()方法)发生时调用。

2.2.Subscription Interface (订阅令牌接口)

订阅令牌对象通过Subscriber.onSubscribe()方法传递

public static interface Subscription {    public void request(long n);    public void cancel();}

request(long n)是无阻塞背压概念背后的关键方法。订阅者使用它来请求n个以上的消费项目。这样,订阅者控制了它当前能够接收多少个数据。cancel()由订阅者主动来取消其订阅,取消后将不会在接收到任何数据消息。

2.3.Publisher Interface(发布者接口)

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

调用该方法,建立订阅者Subscriber与发布者Publisher之间的消息订阅关系。

2.4.Processor Interface(处理器接口)

处理者Processor 可以同时充当订阅者和发布者,起到转换发布者——订阅者管道中的元素的作用。用于将发布者T类型的数据元素,接收并转换为类型R的数据并发布。

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

二、实战案例

现在我们要去实现上面的四个接口来完成响应式编程

Subscription Interface订阅令牌接口通常不需要我们自己编程去实现,我们只需要在知道request()方法和cancle()方法含义即可。

Publisher Interface发布者接口,Java 9 已经默认为我们提供了实现SubmissionPublisher,该实现类除了实现Publisher接口的方法外,提供了一个方法叫做submit()来完成消息数据的发送。

Subscriber Interface订阅者接口,通常需要我们自己去实现。因为在数据订阅接收之后,不同的业务有不同的处理逻辑。

Processor实际上是 Publisher Interface和Subscriber Interface的集合体,有需要数据类型转换及数据处理的需求才去实现这个接口

下面的例子实现的式字符串的数据消息订阅处理

实现订阅者Subscriber Interface

import java.util.concurrent.Flow;
public class MySubscriber implements Flow.Subscriber<String> {
  private Flow.Subscription subscription;  //订阅令牌
  @Override
  public void onSubscribe(Flow.Subscription subscription) {
      System.out.println("订阅关系建立onSubscribe: " + subscription);
      this.subscription = subscription;
      subscription.request(2);
  }
  @Override
  public void onNext(String item) {
      System.out.println("item: " + item);
      // 一个消息处理完成之后,可以继续调用subscription.request(n);向发布者要求数据发送
      //subscription.request(n);
  }
  @Override
  public void onError(Throwable throwable) {
      System.out.println("onError: " + throwable);
  }
  @Override
  public void onComplete() {
      System.out.println("onComplete");
  }
}

SubmissionPublisher消息发布者

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class SubmissionPublisherExample {
  public static void main(String[] args) throws InterruptedException {
      ExecutorService executor = Executors.newFixedThreadPool(1);
      SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
      sb.subscribe(new MySubscriber());   //建立订阅关系,可以有多个订阅者
      sb.submit("数据 1");  //发送消息1
      sb.submit("数据 2"); //发送消息2
      sb.submit("数据 3"); //发送消息3
      executor.shutdown();
  }
}

控制台打印输出结果

订阅关系建立
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 数据 1
item: 数据 2

请注意:即使发布者submit了3条数据,MySubscriber也仅收到了2条数据进行了处理。是因为我们在MySubscriber#onSubscribe()方法中使用了subscription.request(2);

So verwenden Sie die neue Funktion der Java9 Reactive Stream Responsive Programming API 🎜🎜 Im Folgenden sind einige wichtige Rollen und Konzepte in der reaktiven Java-Programmierung aufgeführt. Lassen Sie uns zunächst kurz verstehen, dass der Herausgeber eine potenziell unbegrenzte Anzahl geordneter Datenelemente ist. Es veröffentlicht eine bestimmte Anzahl von Datenelementen an aktuelle Abonnenten basierend auf der eingegangenen Nachfrage (Abonnement). 🎜🎜Abonnent abonniert und erhält Datenelemente vom Herausgeber. Nach dem Aufbau einer Abonnementbeziehung mit dem Herausgeber sendet der Herausgeber ein Abonnement-Token (Abonnement) an den Abonnenten, und der Abonnent kann die Anzahl der vom Herausgeber veröffentlichten Datenelemente entsprechend seinen eigenen Verarbeitungsmöglichkeiten anfordern. 🎜🎜Das Abonnement-Token (Abonnement) stellt die zwischen dem Abonnenten und dem Herausgeber eingerichtete Abonnementbeziehung dar. Wenn eine Abonnementbeziehung zustande kommt, übergibt der Herausgeber diese an den Abonnenten. Abonnenten verwenden das Abonnement-Token, um mit dem Herausgeber zu interagieren, beispielsweise um die Anzahl der Datenelemente anzufordern oder sich abzumelden. 🎜🎜2. Die vier Hauptschnittstellen der Java-Responsive-Programmierung🎜

2.1.Subscriber-Schnittstelle

rrreee🎜onSubscribe: Wird aufgerufen, bevor der Herausgeber die Abonnementaktion des Abonnenten akzeptiert, bevor Abonnementnachrichten veröffentlicht werden . Das neu erstellte Subscription-Abonnement-Token-Objekt wird über diese Methode an den Abonnenten übergeben. 🎜🎜onNext: Die Verarbeitungsfunktion des nächsten zu verarbeitenden Datenelements 🎜🎜onError: Wird aufgerufen, wenn beim Herausgeber oder Abonnement ein nicht behebbarer Fehler auftritt🎜🎜onComplete: Wird aufgerufen, wenn keine Abonnentenaufrufe (einschließlich der onNext()-Methode) erfolgen. 🎜<h4>2.2.Subscription-Schnittstelle (Subscription-Token-Schnittstelle)</h4>🎜Das Abonnement-Token-Objekt wird über die Methode <code>Subscriber.onSubscribe() übergeben🎜rrreee🎜request(long n) ist die Schlüsselmethode hinter dem Konzept des nicht blockierenden Gegendrucks. Abonnenten fordern damit mehr als n Verbrauchsartikel an. Auf diese Weise steuert der Teilnehmer, wie viele Daten er aktuell empfangen kann. <code>cancel()Der Abonnent kündigt sein Abonnement aktiv. Nach der Kündigung erhält er keine Datennachrichten. 🎜

2.3.Publisher-Schnittstelle

rrreee🎜Rufen Sie diese Methode auf, um die Nachrichtenabonnementbeziehung zwischen dem Abonnenten und dem Herausgeber herzustellen. 🎜

2.4.Prozessorschnittstelle

🎜Der Prozessor kann sowohl als Abonnent als auch als Herausgeber fungieren und spielt die Rolle der Konvertierung von Elementen in der Herausgeber-Abonnenten-Pipeline. Wird verwendet, um Datenelemente vom Typ T vom Herausgeber zu empfangen, in Daten vom Typ R umzuwandeln und zu veröffentlichen. 🎜rrreee🎜2. Praktischer Fall🎜🎜Jetzt müssen wir die oben genannten vier Schnittstellen implementieren, um die reaktive Programmierung abzuschließen Sie müssen nur die Bedeutung der request()-Methode und der cancel()-Methode kennen. 🎜🎜Publisher-SchnittstellePublisher-Schnittstelle, Java 9 hat uns standardmäßig die Implementierung von SubmissionPublisher zur Verfügung gestellt. Zusätzlich zur Methode zur Implementierung der Publisher-Schnittstelle stellt diese Implementierungsklasse eine Methode namens „submit()“ zur Vervollständigung bereit die Nachrichtendaten senden. 🎜🎜Abonnentenschnittstelle Die Abonnentenschnittstelle muss normalerweise von uns selbst implementiert werden. Denn nach Erhalt des Datenabonnements verfügen verschiedene Unternehmen über unterschiedliche Verarbeitungslogiken. 🎜🎜Prozessor ist eigentlich eine Sammlung von Publisher-Schnittstellen und Abonnentenschnittstellen. Diese Schnittstelle erfordert eine Datentypkonvertierung und Datenverarbeitung. 🎜🎜Das folgende Beispiel implementiert eine Abonnementverarbeitung für Zeichenfolgendaten. 🎜

Abonnentenschnittstelle implementieren

rrreee

SubmissionPublisher-Nachrichtenverleger

rrreee🎜Ergebnisse der Konsolendruckausgabe🎜
🎜Einrichtung einer Abonnementbeziehung
onSubscribe: java. util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: Daten 1
item: Daten 2🎜
🎜Bitte beachten Sie: Auch wenn der Herausgeber 3 Daten übermittelt hat, hat MySubscriber nur 2 Daten erhalten. Das liegt daran, dass wir subscription.request(2); in der Methode MySubscriber#onSubscribe() verwendet haben. Dies ist der reaktive Programmiereffekt des „Gegendrucks“. So viele Daten ich verarbeiten kann, werde ich dem Herausgeber der Nachricht mitteilen, wie viele Daten er übermitteln soll. 🎜

Das obige ist der detaillierte Inhalt vonSo nutzen Sie die neue Funktion der Java9 Reactive Stream Responsive Programming API. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen