首页  >  文章  >  Java  >  如何在 Spring WebFlux 的反应链中异步运行方法?

如何在 Spring WebFlux 的反应链中异步运行方法?

PHPz
PHPz原创
2024-07-26 20:37:31547浏览

How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux?

我正在尝试在基于 Project Reactor 的应用程序中的现有反应链中异步执行方法。 doUpdateLayoutInAsync 方法旨在执行繁重的后台任务,但我的方法似乎没有按预期工作。这是我当前的实现:

public Mono<Boolean> publishPackage(String branchedPackageId) {
    PackagePublishingMetaDTO publishingMetaDTO = new PackagePublishingMetaDTO();
    publishingMetaDTO.setPublishEvent(true);

    return packageRepository
            .findById(branchedPackageId, packagePermission.getPublishPermission())
            .switchIfEmpty(Mono.error(new AppsmithException(
                    AppsmithError.ACL_NO_RESOURCE_FOUND, FieldName.PACKAGE_ID, branchedPackageId)))
            .flatMap(originalPackage -> {
                String nextVersion = PackageUtils.getNextVersion(originalPackage.getVersion());

                Package packageToBePublished = constructPackageToBePublished(originalPackage);

                originalPackage.setVersion(nextVersion);
                originalPackage.setLastPublishedAt(packageToBePublished.getLastPublishedAt());
                publishingMetaDTO.setOriginPackageId(branchedPackageId);
                publishingMetaDTO.setWorkspaceId(originalPackage.getWorkspaceId());

                Mono<Void> unsetCurrentLatestMono = packageRepository.unsetLatestPackageByOriginId(originalPackage.getId(), null);
                Mono<Package> saveOriginalPackage = packageRepository.save(originalPackage);
                Mono<Package> savePackageToBePublished = packageRepository.save(packageToBePublished);

                return unsetCurrentLatestMono
                        .then(Mono.zip(saveOriginalPackage, savePackageToBePublished))
                        .flatMap(tuple2 -> {
                            Package publishedPackage = tuple2.getT2();
                            publishingMetaDTO.setPublishedPackage(publishedPackage);

                            return modulePackagePublishableService
                                    .publishEntities(publishingMetaDTO)
                                    .flatMap(publishedModules -> {
                                        if (publishedModules.isEmpty()) {
                                            return Mono.error(new AppsmithException(
                                                    AppsmithError.PACKAGE_CANNOT_BE_PUBLISHED,
                                                    originalPackage.getUnpublishedPackage().getName()));
                                        }
                                        return moduleInstancePackagePublishableService
                                                .publishEntities(publishingMetaDTO)
                                                .then(Mono.defer(() ->
                                                        newActionPackagePublishableService.publishEntities(publishingMetaDTO))
                                                        .then(Mono.defer(() ->
                                                                actionCollectionPackagePublishableService
                                                                        .publishEntities(publishingMetaDTO))));
                                    })
                                    .then(Mono.defer(() -> autoUpgradeService.handleAutoUpgrade(publishingMetaDTO)));
                        })
                        .as(transactionalOperator::transactional)
                        .then(Mono.defer(() -> doUpdateLayoutInAsync(publishingMetaDTO)));
            });
}

private Mono<Boolean> doUpdateLayoutInAsync(PackagePublishingMetaDTO publishingMetaDTO) {
    Mono<List<String>> updateLayoutsMono = Flux.fromIterable(publishingMetaDTO.getAutoUpgradedPageIds())
            .flatMap(pageId -> updateLayoutService
                    .updatePageLayoutsByPageId(pageId)
                    .onErrorResume(throwable -> {
                        log.warn("Update layout failed for pageId: {} with error: {}", pageId, throwable.getMessage());
                        return Mono.just(pageId);
                    }))
            .collectList();

    // Running the updateLayoutsMono task asynchronously
    updateLayoutsMono.subscribeOn(Schedulers.boundedElastic()).subscribe();

    return Mono.just(Boolean.TRUE);
}

问题: 我希望 doUpdateLayoutInAsync 在后台运行,而反应链的其余部分完成。然而,该方法似乎是同步执行的,反应链并没有按预期继续。

问题:如何确保 doUpdateLayoutInAsync 异步运行并且不会阻止反应链的继续?

以上是如何在 Spring WebFlux 的反应链中异步运行方法?的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn