ホームページ  >  記事  >  Java  >  Spring WebFlux のリアクティブ チェーンでメソッドを非同期に実行するにはどうすればよいですか?

Spring WebFlux のリアクティブ チェーンでメソッドを非同期に実行するにはどうすればよいですか?

PHPz
PHPzオリジナル
2024-07-26 20:37:31554ブラウズ

How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux?

Project Reactor ベースのアプリケーションの既存のリアクティブ チェーン内でメソッドを非同期に実行しようとしています。 doUpdateLayoutInAsync メソッドは、重いバックグラウンド タスクを実行することを目的としていますが、私のアプローチは期待どおりに機能していないようです。私の現在の実装は次のとおりです:

public Mono<Boolean> publishPackage(String branchedPackageId) {
    PackagePublishingMetaDTO publishingMetaDTO = new PackagePublishingMetaDTO();
    publishingMetaDTO.setPublishEvent(true);

    return packageRepository
            .findById(branchedPackageId, packagePermission.getPublishPermission())
            .switchIfEmpty(Mono.error(new AppsmithException(
                    AppsmithError.ACL_NO_RESOURCE_FOUND, FieldName.PACKAGE_ID, branchedPackageId)))
            .flatMap(originalPackage -> {
                String nextVersion = PackageUtils.getNextVersion(originalPackage.getVersion());

                Package packageToBePublished = constructPackageToBePublished(originalPackage);

                originalPackage.setVersion(nextVersion);
                originalPackage.setLastPublishedAt(packageToBePublished.getLastPublishedAt());
                publishingMetaDTO.setOriginPackageId(branchedPackageId);
                publishingMetaDTO.setWorkspaceId(originalPackage.getWorkspaceId());

                Mono<Void> unsetCurrentLatestMono = packageRepository.unsetLatestPackageByOriginId(originalPackage.getId(), null);
                Mono<Package> saveOriginalPackage = packageRepository.save(originalPackage);
                Mono<Package> savePackageToBePublished = packageRepository.save(packageToBePublished);

                return unsetCurrentLatestMono
                        .then(Mono.zip(saveOriginalPackage, savePackageToBePublished))
                        .flatMap(tuple2 -> {
                            Package publishedPackage = tuple2.getT2();
                            publishingMetaDTO.setPublishedPackage(publishedPackage);

                            return modulePackagePublishableService
                                    .publishEntities(publishingMetaDTO)
                                    .flatMap(publishedModules -> {
                                        if (publishedModules.isEmpty()) {
                                            return Mono.error(new AppsmithException(
                                                    AppsmithError.PACKAGE_CANNOT_BE_PUBLISHED,
                                                    originalPackage.getUnpublishedPackage().getName()));
                                        }
                                        return moduleInstancePackagePublishableService
                                                .publishEntities(publishingMetaDTO)
                                                .then(Mono.defer(() ->
                                                        newActionPackagePublishableService.publishEntities(publishingMetaDTO))
                                                        .then(Mono.defer(() ->
                                                                actionCollectionPackagePublishableService
                                                                        .publishEntities(publishingMetaDTO))));
                                    })
                                    .then(Mono.defer(() -> autoUpgradeService.handleAutoUpgrade(publishingMetaDTO)));
                        })
                        .as(transactionalOperator::transactional)
                        .then(Mono.defer(() -> doUpdateLayoutInAsync(publishingMetaDTO)));
            });
}

private Mono<Boolean> doUpdateLayoutInAsync(PackagePublishingMetaDTO publishingMetaDTO) {
    Mono<List<String>> updateLayoutsMono = Flux.fromIterable(publishingMetaDTO.getAutoUpgradedPageIds())
            .flatMap(pageId -> updateLayoutService
                    .updatePageLayoutsByPageId(pageId)
                    .onErrorResume(throwable -> {
                        log.warn("Update layout failed for pageId: {} with error: {}", pageId, throwable.getMessage());
                        return Mono.just(pageId);
                    }))
            .collectList();

    // Running the updateLayoutsMono task asynchronously
    updateLayoutsMono.subscribeOn(Schedulers.boundedElastic()).subscribe();

    return Mono.just(Boolean.TRUE);
}

問題: 残りのリアクティブ チェーンが完了している間、doUpdateLayoutInAsync をバックグラウンドで実行したいと考えています。ただし、メソッドは同期的に実行されているようで、リアクティブ チェーンは期待どおりに継続しません。

質問: doUpdateLayoutInAsync が非同期で実行され、リアクティブ チェーンの継続をブロックしないようにするにはどうすればよいですか?

以上がSpring WebFlux のリアクティブ チェーンでメソッドを非同期に実行するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。