首頁 >Java >串聯通量,其中第二個通量是與第一個通量的最後值即時創建的?

串聯通量,其中第二個通量是與第一個通量的最後值即時創建的?

王林
王林轉載
2024-02-12 12:00:101188瀏覽

php小編新一在解釋"串聯通量,其中第二個通量是與第一個通量的最後值即時創建的"時,可以簡潔明了地解釋這個概念。在串聯通量中,第一個通量的值會傳遞給第二個通量,而第二個通量的值則是根據第一個通量的最後值即時產生的。這種機制可以用來實現動態的資料傳遞和處理,使得程式的流程更加靈活和有效率。透過合理運用串聯通量,可以提升程式的效能和可維護性,提供更好的使用者體驗。

問題內容

我懷疑這肯定是重複的,但我只是在谷歌上搜尋了錯誤的術語。

我有兩個通量 a 和 b,但 b 只能使用 a 的最後一個值來建立。

我想創建一個本質上是 a 和 b 的串聯的通量,但 b 的創建會被推遲,直到我們獲得 a 的最後一個值。

也許,它看起來像這樣:

fluxC = fluxA.concatWith(lastA -> createFluxB(lastA))

解決方法

我不知道庫中是否有任何函數完全執行此操作。

但是,您可以透過以下方式製作這樣的運算子:

  1. 快取輸入流量的最新值
  2. 使用標準 concat 操作從快取流中取得最後一個值來建立以下序列。

注意:它不應該有太多開銷,因為該操作一次只緩存一個值,演算法的第二部分應該直接取回快取的值,而不會觸發反向來源通量.

這是一個範例實作和測試:

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

public class TestConcatLast {

    /**
     * Create a stream that emit all elements from input flux,
     * followed by all items from a flux created by provided function.
     * Input function is triggered using last element of source flux as input.
     *
     * @param source The flux providing elements of the first part of the concatenation.
     * @param createFromLastElement A function that provides the tail of the concatenation from a given element.
     *                              It will be triggered <em>at most once</em> using the last element of input flux.
     * @param errorIfSourceEmpty If true and input stream is empty, the returned flow will trigger an error.
     *                           If false, an empty flux is produced if input is empty.
     */
    public <T> Flux<T> concatLast(Flux<T> source, boolean errorIfSourceEmpty, Function<T, Flux<T>> createFromLastElement) {
        var sourceWithLatestCached = source.cache(1);
        final Mono<T> deferLast = Mono.defer(errorIfSourceEmpty ? sourceWithLatestCached::last : sourceWithLatestCached::next);
        return sourceWithLatestCached.concatWith(
                deferLast.flatMapMany(createFromLastElement)
        );
    }

    @Test
    public void testConcat() {
        var nextExpectedElement = new AtomicInteger(1);
        var elts = Flux.just(1, 2, 3, 4)
                // Check cache works and no element has been fetched back from source
                .doOnNext(i -> {
                    assert nextExpectedElement.compareAndSet(i, i+1);
                });

        var concatenated = concatLast(elts, true, i -> Flux.just(i + 1, i + 2, i + 3));
        StepVerifier.create(concatenated)
                .expectNext(1, 2, 3, 4, 5, 6, 7)
                .verifyComplete();
    }
}

以上是串聯通量,其中第二個通量是與第一個通量的最後值即時創建的?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除