Maison  >  Article  >  Java  >  Comment implémentons-nous l’interface Abonné dans Java 9 ?

Comment implémentons-nous l’interface Abonné dans Java 9 ?

WBOY
WBOYavant
2023-09-04 13:33:07783parcourir

在Java 9中,我们如何实现Subscriber接口?

Java 9 prend en charge la création de flux réactifs en introduisant certaines interfaces : Publisher, Subscriber, Subscription et SubmissionPublisher qui implémentent le Classe d'interface éditeur . Chaque interface peut jouer un rôle différent basé sur les principes du Reactive Streaming.

Nous pouvons utiliser l'interface Abonné pour nous abonner aux données publiées par éditeur. Nous devons implémenter l'interface Subscriber et fournir des implémentations pour les méthodes abstraites.

Méthodes de l'interface Flow.Subscriber :

  • onComplete() : Cette méthode est appelée lorsque l'objet Publisher termine son rôle.
  • onError() : Cette méthode est appelée lorsque l'éditeur rencontre un problème et en informe l'abonné.
  • onNext() : Cette méthode est appelée lorsque l'éditeur dispose de nouvelles informations pour informer tous les abonnés.
  • onSubscribe() : Cette méthode est appelée lorsque l'éditeur ajoute un abonné.

Exemple

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.IntStream;

public class SubscriberImplTest {
   public static class Subscriber implements <strong>Flow.Subscriber<Integer></strong> {
      private <strong>Flow.Subscription</strong> subscription;
      private boolean isDone;
      
     <strong> @Override</strong>
      public void <strong>onSubscribe</strong>(Flow.Subscription subscription) {
         System.out.println("Subscribed");
         this.subscription = subscription;
         this.subscription.request(1);
      }
      <strong>@Override</strong>
      public void <strong>onNext</strong>(Integer item) {
         System.out.println("Processing " + item);
         this.subscription.request(1);
      }
      <strong>@Override</strong>
      public void <strong>onError</strong>(Throwable throwable) {
         throwable.printStackTrace();
      }
      <strong>@Override</strong>
      public void <strong>onComplete()</strong> {
         System.out.println("Processing done");
         isDone = true;
      }
   }
   public static void main(String args[]) throws InterruptedException {
      <strong>SubmissionPublisher<Integer></strong> publisher = new <strong>SubmissionPublisher<></strong><strong>()</strong>;
      <strong>Subscriber </strong>subscriber = new <strong>Subscriber()</strong>;
      publisher.subscribe(subscriber);
      <strong>IntStream</strong> intData = <strong>IntStream.rangeClosed</strong>(1, 10);
      intData.forEach(<strong>publisher::submit</strong>);
      publisher.<strong>close()</strong>;
      while(!subscriber.isDone) {
         Thread.sleep(10);
      }
      System.out.println("Done");
   }
}

Sortie

<strong>Subscribed
Processing 1
Processing 2
Processing 3
Processing 4
Processing 5
Processing 6
Processing 7
Processing 8
Processing 9
Processing 10
Processing done
Done</strong>

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer