Maison >Java >javaDidacticiel >Comment utiliser la nouvelle fonctionnalité de l'API de programmation réactive Java9 Reactive Stream
Java 9 fournit un ensemble d'interfaces qui définissent la programmation de flux réactif. Toutes ces interfaces sont définies dans la classe java.util.concurrent.Flow
en tant qu'interfaces internes statiques. java.util.concurrent.Flow
类里面。
下面是Java 响应式编程中的一些重要角色和概念,先简单理解一下
发布者(Publisher)是潜在的无限数量的有序数据元素的生产者。 它根据收到的需求(subscription)向当前订阅者发布一定数量的数据元素。
订阅者(Subscriber)从发布者那里订阅并接收数据元素。与发布者建立订阅关系后,发布者向订阅者发送订阅令牌(subscription),订阅者可以根据自己的处理能力请求发布者发布数据元素的数量。
订阅令牌(subscription)表示订阅者与发布者之间建立的订阅关系。 当建立订阅关系后,发布者将其传递给订阅者。 订阅者使用订阅令牌与发布者进行交互,例如请求数据元素的数量或取消订阅。
public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); }
onSubscribe
:在发布者接受订阅者的订阅动作之后,发布任何的订阅消息之前被调用。新创建的Subscription
订阅令牌对象通过此方法传递给订阅者。
onNext
:下一个待处理的数据项的处理函数
onError
:在发布者或订阅遇到不可恢复的错误时调用
onComplete
:当没有订阅者调用(包括onNext()方法)发生时调用。
订阅令牌对象通过Subscriber.onSubscribe()
方法传递
public static interface Subscription { public void request(long n); public void cancel();}
request(long n)
是无阻塞背压概念背后的关键方法。订阅者使用它来请求n个以上的消费项目。这样,订阅者控制了它当前能够接收多少个数据。cancel()
由订阅者主动来取消其订阅,取消后将不会在接收到任何数据消息。
@FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); }
调用该方法,建立订阅者Subscriber与发布者Publisher之间的消息订阅关系。
处理者Processor 可以同时充当订阅者和发布者,起到转换发布者——订阅者管道中的元素的作用。用于将发布者T类型的数据元素,接收并转换为类型R的数据并发布。
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
现在我们要去实现上面的四个接口来完成响应式编程
Subscription Interface
订阅令牌接口通常不需要我们自己编程去实现,我们只需要在知道request()方法和cancle()方法含义即可。
Publisher Interface
发布者接口,Java 9 已经默认为我们提供了实现SubmissionPublisher,该实现类除了实现Publisher接口的方法外,提供了一个方法叫做submit()来完成消息数据的发送。
Subscriber Interface
订阅者接口,通常需要我们自己去实现。因为在数据订阅接收之后,不同的业务有不同的处理逻辑。
Processor
实际上是 Publisher Interface和Subscriber Interface的集合体,有需要数据类型转换及数据处理的需求才去实现这个接口
下面的例子实现的式字符串的数据消息订阅处理
import java.util.concurrent.Flow; public class MySubscriber implements Flow.Subscriber<String> { private Flow.Subscription subscription; //订阅令牌 @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println("订阅关系建立onSubscribe: " + subscription); this.subscription = subscription; subscription.request(2); } @Override public void onNext(String item) { System.out.println("item: " + item); // 一个消息处理完成之后,可以继续调用subscription.request(n);向发布者要求数据发送 //subscription.request(n); } @Override public void onError(Throwable throwable) { System.out.println("onError: " + throwable); } @Override public void onComplete() { System.out.println("onComplete"); } }
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; public class SubmissionPublisherExample { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(1); SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize()); sb.subscribe(new MySubscriber()); //建立订阅关系,可以有多个订阅者 sb.submit("数据 1"); //发送消息1 sb.submit("数据 2"); //发送消息2 sb.submit("数据 3"); //发送消息3 executor.shutdown(); } }
控制台打印输出结果
订阅关系建立
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 数据 1
item: 数据 2
请注意:即使发布者submit了3条数据,MySubscriber也仅收到了2条数据进行了处理。是因为我们在MySubscriber#onSubscribe()
方法中使用了subscription.request(2);
onSubscribe
: après que l'éditeur a accepté l'action d'abonnement de l'abonné, appelé avant de publier des messages d'abonnement . L'objet jeton d'abonnement Subscription
nouvellement créé est transmis à l'abonné via cette méthode. 🎜🎜onNext
: La fonction de traitement de la prochaine donnée à traiter 🎜🎜onError
: Appelée lorsque l'éditeur ou l'abonnement rencontre une erreur irrécupérable🎜🎜 onComplete : appelé lorsqu'aucun appel d'abonné (y compris la méthode onNext()) ne se produit. 🎜<h4>2.2.Interface d'abonnement (interface de jeton d'abonnement)</h4>🎜L'objet jeton d'abonnement est transmis via la méthode <code>Subscriber.onSubscribe()
🎜rrreee🎜request(long n)
est la méthode clé derrière le concept de contre-pression non bloquante. Les abonnés l'utilisent pour demander plus de n éléments de consommation. De cette manière, l'abonné contrôle la quantité de données qu'il peut actuellement recevoir. cancel()
L'abonné annule activement son abonnement. Après annulation, il ne recevra aucun message de données. 🎜Interface d'abonnement
L'interface du jeton d'abonnement ne nous oblige généralement pas à la programmer nous-mêmes, nous juste Vous il suffit de connaître la signification de la méthode request() et de la méthode Cancel(). 🎜🎜Publisher Interface
Interface Publisher, Java 9 nous a fourni l'implémentation de SubmissionPublisher par défaut En plus de la méthode d'implémentation de l'interface Publisher, cette classe d'implémentation fournit une méthode appelée submit() pour compléter. les données du message envoyées. 🎜🎜Interface d'abonné
L'interface d'abonné doit généralement être implémentée par nous-mêmes. Parce qu'une fois l'abonnement aux données reçu, différentes entreprises ont une logique de traitement différente. 🎜🎜Processeur
est en fait une collection d'interfaces d'éditeur et d'interface d'abonné. Cette interface nécessite une conversion de type de données et un traitement de données. 🎜🎜L'exemple suivant implémente un traitement d'abonnement de message de données de chaîne🎜🎜Établissement de la relation d'abonnement🎜Veuillez noter : même si l'éditeur a soumis 3 éléments de données, MySubscriber n'en a reçu que 2. Les données ont été traitées. C'est parce que nous avons utilisé
onSubscribe : java util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item : Données 1
item : Données 2🎜
subscription.request(2);
dans la méthode MySubscriber#onSubscribe()
. Il s'agit de l'effet de programmation réactive de la « contre-pression ». Autant de données que j'ai la capacité de traiter, j'informerai l'éditeur du message de la quantité de données à fournir. 🎜Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!