Home >Java >javaTutorial >How do we implement Subscriber interface in Java 9?

How do we implement Subscriber interface in Java 9?

WBOY
WBOYforward
2023-09-04 13:33:07792browse

在Java 9中,我们如何实现Subscriber接口?

Java 9 supports the creation of reactive streams by introducing some interfaces: Publisher, Subscriber, Subscription and the SubmissionPublisher class that implements the Publisher interface. Each interface can play a different role according to the principles of Reactive Streaming.

We can use the Subscriber interface to subscribe to the data published by publisher. We need to implement the Subscriber interface and provide implementations for abstract methods.

Methods of the Flow.Subscriber interface:

  • onComplete(): This method is called when the Publisher object completes its role.
  • onError(): This method is called when the Publisher has a problem and notifies the Subscriber.
  • onNext(): This method is called when the Publisher has new information to notify all Subscribers.
  • onSubscribe(): This method is called when Publisher adds Subscriber.

Example

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.IntStream;

public class SubscriberImplTest {
   public static class Subscriber implements <strong>Flow.Subscriber<Integer></strong> {
      private <strong>Flow.Subscription</strong> subscription;
      private boolean isDone;
      
     <strong> @Override</strong>
      public void <strong>onSubscribe</strong>(Flow.Subscription subscription) {
         System.out.println("Subscribed");
         this.subscription = subscription;
         this.subscription.request(1);
      }
      <strong>@Override</strong>
      public void <strong>onNext</strong>(Integer item) {
         System.out.println("Processing " + item);
         this.subscription.request(1);
      }
      <strong>@Override</strong>
      public void <strong>onError</strong>(Throwable throwable) {
         throwable.printStackTrace();
      }
      <strong>@Override</strong>
      public void <strong>onComplete()</strong> {
         System.out.println("Processing done");
         isDone = true;
      }
   }
   public static void main(String args[]) throws InterruptedException {
      <strong>SubmissionPublisher<Integer></strong> publisher = new <strong>SubmissionPublisher<></strong><strong>()</strong>;
      <strong>Subscriber </strong>subscriber = new <strong>Subscriber()</strong>;
      publisher.subscribe(subscriber);
      <strong>IntStream</strong> intData = <strong>IntStream.rangeClosed</strong>(1, 10);
      intData.forEach(<strong>publisher::submit</strong>);
      publisher.<strong>close()</strong>;
      while(!subscriber.isDone) {
         Thread.sleep(10);
      }
      System.out.println("Done");
   }
}

Output

<strong>Subscribed
Processing 1
Processing 2
Processing 3
Processing 4
Processing 5
Processing 6
Processing 7
Processing 8
Processing 9
Processing 10
Processing done
Done</strong>

The above is the detailed content of How do we implement Subscriber interface in Java 9?. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:tutorialspoint.com. If there is any infringement, please contact admin@php.cn delete