Home  >  Article  >  Series fluxes, where the second flux is created on the fly with the last value of the first flux?

Series fluxes, where the second flux is created on the fly with the last value of the first flux?

王林
王林forward
2024-02-12 12:00:101163browse

php editor Shinichi can explain this concept succinctly and clearly when he explains "series fluxes, where the second flux is created instantaneously with the last value of the first flux". In series fluxes, the value of the first flux is passed to the second flux, and the value of the second flux is generated on the fly based on the last value of the first flux. This mechanism can be used to implement dynamic data transfer and processing, making the program flow more flexible and efficient. By rationally using serial flux, the performance and maintainability of the program can be improved, and a better user experience can be provided.

Question content

I suspect this must be a duplicate, but I just googled the wrong term.

I have two fluxes a and b, but b can only be created using the last value of a.

I want to create a flux that is essentially a concatenation of a and b, but the creation of b is deferred until we get the last value of a.

Maybe, it looks like this:

fluxC = fluxA.concatWith(lastA -> createFluxB(lastA))

Workaround

I don't know if there is any function in the library that does exactly this.

However, you can make such an operator via:

  1. Cache the latest value of input traffic
  2. Use the standard concat operation to get the last value from the cache stream to create the following sequence.

Note: It shouldn't have too much overhead since the operation only caches one value at a time and the second part of the algorithm should directly retrieve the cached value without triggering the reverse source flux .

This is an example implementation and test:

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

public class TestConcatLast {

    /**
     * Create a stream that emit all elements from input flux,
     * followed by all items from a flux created by provided function.
     * Input function is triggered using last element of source flux as input.
     *
     * @param source The flux providing elements of the first part of the concatenation.
     * @param createFromLastElement A function that provides the tail of the concatenation from a given element.
     *                              It will be triggered <em>at most once</em> using the last element of input flux.
     * @param errorIfSourceEmpty If true and input stream is empty, the returned flow will trigger an error.
     *                           If false, an empty flux is produced if input is empty.
     */
    public <T> Flux<T> concatLast(Flux<T> source, boolean errorIfSourceEmpty, Function<T, Flux<T>> createFromLastElement) {
        var sourceWithLatestCached = source.cache(1);
        final Mono<T> deferLast = Mono.defer(errorIfSourceEmpty ? sourceWithLatestCached::last : sourceWithLatestCached::next);
        return sourceWithLatestCached.concatWith(
                deferLast.flatMapMany(createFromLastElement)
        );
    }

    @Test
    public void testConcat() {
        var nextExpectedElement = new AtomicInteger(1);
        var elts = Flux.just(1, 2, 3, 4)
                // Check cache works and no element has been fetched back from source
                .doOnNext(i -> {
                    assert nextExpectedElement.compareAndSet(i, i+1);
                });

        var concatenated = concatLast(elts, true, i -> Flux.just(i + 1, i + 2, i + 3));
        StepVerifier.create(concatenated)
                .expectNext(1, 2, 3, 4, 5, 6, 7)
                .verifyComplete();
    }
}

The above is the detailed content of Series fluxes, where the second flux is created on the fly with the last value of the first flux?. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:stackoverflow.com. If there is any infringement, please contact admin@php.cn delete