Home  >  Article  >  Java  >  What is java reactive programming?

What is java reactive programming?

coldplay.xixi
coldplay.xixiOriginal
2020-06-22 11:47:123321browse

What is java reactive programming?

Recommended tutorial: "java Video Tutorial"

What is java responsive programming?

Java reactive programming is

Responsive programming

As the first in the direction of reactive programming As a first step, Microsoft created the Rx library (Reactive Extensions) in the .NET ecosystem. RxJava is its implementation on the JVM.

Reactive programming is an asynchronous programming paradigm that usually appears in object-oriented languages ​​as an extension of the Observer pattern.

It focuses on the flow of data and the propagation of changes. This means that static (such as arrays) or dynamic (such as event emitters) data flows can easily be represented using programming languages.

Responsive Streaming

Over time, a standardization emerged specifically for Java. It is a specification that defines some interfaces and interaction rules for reactive libraries on the JVM platform. It is Reactive Streams, and its interfaces have been integrated into Java 9, in the parent class java.util.concurrent.Flow. Reactive streams are similar to iterators, but iterators are based on "pull", while reactive streams are based on "push". The use of iterators is actually imperative programming, because it is up to the developer to decide when to call next() to get the next element. In reactive streaming, the equivalent of the above is a publisher-subscriber. But when new elements are available, they are pushed to subscribers by the publisher. This "push" is the key to responsiveness.

In addition, the operation of pushed elements is also performed in a declarative manner. The programmer only needs to express what to do, and does not need to worry about how to do it.

The publisher uses the onNext method to push new elements to subscribers, the onError method to notify an error, and the onComplete method to inform that it has ended.

It can be seen that error handling and completion (end) are also handled in a good way. Both error and end can terminate the sequence.

This method is very flexible. This mode supports the cases of 0 (no) elements / 1 element / n (many) elements (including infinite sequences, if the clock is ticking).

Reactor debuts

Reactor is the fourth generation reactive library and an implementation of the reactive programming paradigm, used to build on the JVM platform based on the reactive stream specification Non-blocking asynchronous applications.

It greatly implements the specification of reactive streams on the JVM (http://www.reactive-streams.org/).

It is a cornerstone of completely non-blocking reactive programming with efficient demand management (in the form of managing "back pressure").

It directly integrates Java functional API, especially CompletableFuture, Stream and Duration.

It supports using the reactor-netty project to achieve non-blocking cross-process communication, is suitable for microservice architecture, and supports HTTP (including Websockets), TCP and UDP.

Note: Reactor requires Java 8

Having said so much, should we first think about why we need such an asynchronous reactive library?

Blocking is waste

Modern applications can reach a very large number of concurrent users. Even if the capabilities of modern hardware are continuously improved, the performance of modern software is still a key Points of concern

There are generally two ways to improve the performance of a program:

1. Parallelization, using more threads and more hardware resources

2 , Improve efficiency, and seek higher efficiency under the current resource usage

Usually, Java developers use blocking code to write programs. This works well until you hit a performance bottleneck.

At this time, additional threads will be introduced to run similar blocking code. But this scaling approach can be controversial and lead to concurrency issues in terms of resource utilization.

What's worse, blocking wastes resources. If you look carefully, once a program involves some delay (especially I/O, like a database request or a network call), resources are wasted because the thread is now idle, waiting for data.

So parallelization is not a silver bullet. It is necessary for us to allow the hardware to exert its full power, but the impact and causes of resource waste are also very complicated.

Asynchronicity to the rescue

The second way mentioned earlier is to seek higher efficiency, which can be used as a solution to the problem of resource waste.

By writing asynchronous, non-blocking code, you can switch execution to other active tasks, using the same underlying resources, and return to the current process later.

But how to generate asynchronous code to the JVM? Java provides two asynchronous programming models:

1. Callbacks. Asynchronous methods have no return value, but will bring a callback. The callback will be called when the result is available.

2. Futures, the asynchronous method immediately returns a Future8742468051c85b06f0a0af9e3e506b5c. The asynchronous processing process is to calculate a T value and use the Future object to wrap its access. This value is not immediately available, the object can be polled to see if the T value is available.

Are both technologies good enough? It's not perfect for every situation, and both methods have limitations.

Callbacks are difficult to put together and can quickly lead to code that is difficult to read and maintain (the well-known "callback hell").

Futures are slightly better than callbacks, but still don't do a good job of composition. Combining multiple Futures objects together is possible but not easy.

Future There are also other problems. It is easy to cause another blocking due to calling the get() method.

Additionally, it does not support lazy calculations, lacks support for multiple values, and lacks advanced error handling.

From imperative to reactive programming

The goal of reactive libraries like Reactor is to solve the shortcomings of the "traditional" asynchronous approach on the JVM, while also paying attention to Some additional aspects:

Composability and readability.

Data as a stream, manipulated by rich operators, nothing happens until you subscribe, post-press, and the consumer notifies the producer that the rate of emission is too fast, high level rather than high numerical abstraction.

Composability and readability

Composability is actually the ability to orchestrate multiple asynchronous tasks so that the results of the previous task can be used as the input of subsequent tasks , or execute several tasks in a fork-join (fork-merge) manner, or reuse these asynchronous tasks at a higher level.

The ability of task orchestration is closely coupled with the readability and maintainability of the code. As asynchronous processing increases in number and complexity, it becomes more difficult to compose and read code.

As we can see, although the callback model is simple, when callbacks are nested within callbacks and reach multiple levels, it will become callback hell.

Reactor provides rich combination options to minimize the nesting level, so that the organization structure of the code can reflect what kind of abstraction is being processed, and is usually kept at the same level.

Assembly Line Analogy

You can think of a reactive application processing data as if it were passed through an assembly (production) line. Reactor is both a conveyor belt and a workstation.

Raw materials are continuously obtained from a source (original publisher), ending with a completed product being pushed to consumers (subscribers).

Raw materials can go through many different transformations, be other intermediate steps, or be part of a larger assembly line.

If there is a glitch or blockage somewhere, the offending workstation can send a notification upstream to limit the flow (rate) of raw materials.

Operators

In Reactor, operators are workstations in the assembly line analogy. Each operator adds some behavior to a publisher, wrapping the previous publisher into a new instance. This is how the entire chain is linked.

So the data initially comes out of the first publisher, then moves downstream along the chain, and is transformed by each link. Finally, a subscriber ends the process.

The reactive stream specification does not clearly stipulate operators, but Reactor provides a rich set of operators, which involve many aspects, from simple conversion and filtering to complex orchestration and error handling.

As long as you don't subscribe, nothing happens

When you write a publisher chain, by default, data will not start entering the chain. Instead, you just create an abstract description of asynchronous processing.

By subscribing to this behavior (action), publishers and subscribers are connected, and then data is triggered to flow in the chain.

This is implemented internally, propagating upstream through the request signal from the subscriber, all the way upstream to the original publisher.

Core Features of Reactor

Reactor introduces composable reactive types and implements the publisher interface, but also provides a rich set of operators, namely Flux and Mono.

Flux, flow, represents 0 to N elements.

Mono, single, represents 0 or 1 element.

The difference between them is mainly semantic, indicating the rough cardinality of asynchronous processing.

If an http request will only produce one response, it is obviously more meaningful to express it as Monob9c6304980a6413006d7713f6edf94ae, and it only provides operators relative to contexts such as 0/1, because the count operation is obviously Doesn't make much sense. The

operator can change the maximum cardinality of processing and will also switch to related types. For example, although the count operator exists on Flux8742468051c85b06f0a0af9e3e506b5c, its return value is a Monoad84a9ca022f0fe586e370cd3963bff2.

Flux8742468051c85b06f0a0af9e3e506b5c

A Flux8742468051c85b06f0a0af9e3e506b5c is a standard Publisher8742468051c85b06f0a0af9e3e506b5c, representing an asynchronous sequence that can emit 0 to N elements. Terminate by a completion signal or an error signal.

Just like in the reactive streaming specification, these three types of signals are converted into calls to the onNext, onComplete, and onError methods of a downstream subscriber.

These three methods can also be understood as events/callbacks, and they are all optional.

If there is no onNext but there is onComplete, it represents an empty finite sequence. There is neither onNext nor onComplete, indicating an empty infinite sequence (of no practical use and can be used for testing).

The infinite sequence does not need to be empty. For example, Flux.interval(Duration) generates a Fluxad84a9ca022f0fe586e370cd3963bff2, which is infinite and a regular "tick" emitted from the clock.

Mono8742468051c85b06f0a0af9e3e506b5c

A Mono8742468051c85b06f0a0af9e3e506b5c is a special Publisher8742468051c85b06f0a0af9e3e506b5c that emits at most one element and can be terminated using the onComplete signal or the onError signal .

The operators it provides are only a subset of those provided by Flux. Similarly, some operators (such as combining Mono with Publisher) can switch it to a Flux.

For example, Mono#concatWith(Publisher) returns a Flux, but Mono#then(Mono) returns another Mono.

Mono can be used to represent asynchronous processing without a return value (similar to Runnable), represented by Mono903bf37051cf83cbd9686768ac0189ae.

Create a Flux or Mono, and subscribe to them

The easiest way is to use their respective factory methods:

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
Mono<String> noData = Mono.empty();
Mono<String> data = Mono.just("foo");

When it comes to subscriptions, you can use Java 8's There are many different variations of lambda expressions, subscription methods, with different callbacks.

The following is the method signature:

//订阅并触发序列
subscribe(); 
//可以对每一个产生的值进行处理
subscribe(Consumer<? super T> consumer); 
//还可以响应一个错误
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 
//还可以在成功结束后执行一些代码
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 
//还可以对Subscription执行一些操作
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer);

Unsubscribe using Disposable

These lambda-based subscription methods all return a Disposable type, by calling its dispose() to cancel this subscription.

For Flux and Mono, cancellation is a signal that the source should stop producing elements. However, immediate effect is not guaranteed, and some sources may produce elements so quickly that they are finished before a cancellation signal is received.

Recommended related articles: "java Development Tutorial"

The above is the detailed content of What is java reactive programming?. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn