Flow API is the official support for the reactive flows specification since Java 9. It is a combined pattern of Iterator and Observer. Flow API is an interop specification, not an end-user API like RxJava.
Flow API consists of four basic interfaces:
In the example below, we create a basic subscriber that requests a data object, prints it and requests another data object. We can use the publisher implementation provided by Java (SubmissionPublisher) to complete our session.
import java.util.concurrent.Flow; import java.util.List; import java.util.concurrent.SubmissionPublisher; class MySubscriber<T>implements <strong>Flow.Subscriber<T></strong> { private <strong>Flow.Subscription</strong> subscription; <strong>@Override</strong> public void <strong>onSubscribe</strong>(Flow.Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } <strong>@Override</strong> public void <strong>onNext</strong>(T item) { System.out.println(item); subscription.request(1); } <strong>@Override</strong> public void <strong>onError</strong>(Throwable throwable) { throwable.printStackTrace(); } <strong>@Override</strong> public void <strong>onComplete</strong>() { System.out.println("Done"); } } <strong>// main class</strong> public class FlowTest { public static void main(String args[]) { <strong>List<String></strong> items = <strong>List.of</strong>("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"); <strong>SubmissionPublisher<String></strong> publisher = new SubmissionPublisher<>(); publisher.<strong>subscribe</strong>(new MySubscriber<>()); items.forEach(s -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } publisher.<strong>submit</strong>(s); }); publisher.close(); } }
<strong>1 2 3 4 5 6 7 8 9</strong> <strong>10 Done</strong>
The above is the detailed content of How to implement reactive streaming using Flow API in Java 9?. For more information, please follow other related articles on the PHP Chinese website!