Heim >Java >Reihenflüsse, bei denen der zweite Fluss im laufenden Betrieb mit dem letzten Wert des ersten Flusses erzeugt wird?

Reihenflüsse, bei denen der zweite Fluss im laufenden Betrieb mit dem letzten Wert des ersten Flusses erzeugt wird?

王林
王林nach vorne
2024-02-12 12:00:101197Durchsuche

PHP-Herausgeber Shinichi kann dieses Konzept prägnant und klar erklären, wenn er „Verkettete Flüsse, bei denen der zweite Fluss sofort mit dem letzten Wert des ersten Flusses erzeugt wird“ erklärt. Bei Reihenflüssen wird der Wert des ersten Flusses an den zweiten Fluss weitergegeben, und der Wert des zweiten Flusses wird im laufenden Betrieb basierend auf dem letzten Wert des ersten Flusses generiert. Mit diesem Mechanismus kann eine dynamische Datenübertragung und -verarbeitung implementiert werden, wodurch der Programmablauf flexibler und effizienter wird. Durch die rationelle Verwendung von seriellem Fluss können die Leistung und Wartbarkeit des Programms verbessert und eine bessere Benutzererfahrung bereitgestellt werden.

Frageninhalt

Ich habe vermutet, dass es sich um ein Duplikat handeln muss, habe aber einfach den falschen Begriff gegoogelt.

Ich habe zwei Flüsse a und b, aber b kann nur mit dem letzten Wert von a erstellt werden.

Ich möchte einen Fluss erstellen, der im Wesentlichen die Verkettung von a und b ist, aber die Erstellung von b wird verschoben, bis wir den letzten Wert von a erhalten.

Vielleicht sieht es so aus:

fluxC = fluxA.concatWith(lastA -> createFluxB(lastA))

Workaround

Ich weiß nicht, ob es in der Bibliothek eine Funktion gibt, die genau das tut.

Sie können einen solchen Operator jedoch erstellen, indem Sie:

  1. Zwischenspeichern des neuesten Werts des Eingabeverkehrs
  2. Erstellen Sie die folgende Sequenz mit der Standard-Concat-Operation, um den letzten Wert aus dem Cache-Stream abzurufen.

Hinweis: Es sollte nicht zu viel Overhead verursachen, da die Operation jeweils nur einen Wert zwischenspeichert und der zweite Teil des Algorithmus den zwischengespeicherten Wert direkt abrufen sollte, ohne den umgekehrten Quellenfluss auszulösen.

Hier ist eine Beispielimplementierung und ein Test:

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

public class TestConcatLast {

    /**
     * Create a stream that emit all elements from input flux,
     * followed by all items from a flux created by provided function.
     * Input function is triggered using last element of source flux as input.
     *
     * @param source The flux providing elements of the first part of the concatenation.
     * @param createFromLastElement A function that provides the tail of the concatenation from a given element.
     *                              It will be triggered <em>at most once</em> using the last element of input flux.
     * @param errorIfSourceEmpty If true and input stream is empty, the returned flow will trigger an error.
     *                           If false, an empty flux is produced if input is empty.
     */
    public <T> Flux<T> concatLast(Flux<T> source, boolean errorIfSourceEmpty, Function<T, Flux<T>> createFromLastElement) {
        var sourceWithLatestCached = source.cache(1);
        final Mono<T> deferLast = Mono.defer(errorIfSourceEmpty ? sourceWithLatestCached::last : sourceWithLatestCached::next);
        return sourceWithLatestCached.concatWith(
                deferLast.flatMapMany(createFromLastElement)
        );
    }

    @Test
    public void testConcat() {
        var nextExpectedElement = new AtomicInteger(1);
        var elts = Flux.just(1, 2, 3, 4)
                // Check cache works and no element has been fetched back from source
                .doOnNext(i -> {
                    assert nextExpectedElement.compareAndSet(i, i+1);
                });

        var concatenated = concatLast(elts, true, i -> Flux.just(i + 1, i + 2, i + 3));
        StepVerifier.create(concatenated)
                .expectNext(1, 2, 3, 4, 5, 6, 7)
                .verifyComplete();
    }
}

Das obige ist der detaillierte Inhalt vonReihenflüsse, bei denen der zweite Fluss im laufenden Betrieb mit dem letzten Wert des ersten Flusses erzeugt wird?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:stackoverflow.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen