首页  >  文章  >  Java  >  Saga 模式如何解决分布式事务问题:方法和实际示例

Saga 模式如何解决分布式事务问题:方法和实际示例

Linda Hamilton
Linda Hamilton原创
2024-10-20 20:11:02490浏览

1. 理解问题:分布式事务的复杂性

How the Saga Pattern Resolves Distributed Transaction Issues: Methods and Real-World Example

分布式事务涉及多个微服务,其中每个服务执行事务的一部分。例如,电子商务平台可能涉及支付、库存和订单管理等服务。这些服务需要协同工作才能完成交易。但是,如果其中一项服务失败会发生什么情况?

1.1 真实场景

想象一个电子商务应用程序,在下订单期间会发生以下步骤:

  • 第1步:从客户的账户中扣除付款。
  • 第 2 步:减少库存中的物品数量。
  • 第3步:在订单管理系统中创建订单。

How the Saga Pattern Resolves Distributed Transaction Issues: Methods and Real-World Example

如果在扣款之后、创建订单之前库存服务失败,系统最终会处于不一致的状态。客户已付费,但未下订单。

1.2 传统解决方案及其局限性

为了处理此类故障,可以考虑使用具有两阶段提交协议的分布式事务。然而,这引入了几个问题:

  • 高延迟:每个服务必须在事务期间锁定资源,导致延迟增加。
  • 可用性降低:如果任何服务失败,整个事务将回滚,从而降低整体系统可用性。​​
  • 紧耦合:服务变得紧密耦合,使得扩展或修改单个服务变得更加困难。

2. Saga模式如何解决问题

在分布式系统中,事务通常跨越多个微服务。确保所有服务要么成功完成,要么根本没有完成是具有挑战性的。处理此问题的传统方法(使用具有两阶段提交的分布式事务)可能会因高延迟、紧密耦合和可用性降低等问题而出现问题。

How the Saga Pattern Resolves Distributed Transaction Issues: Methods and Real-World Example

Saga 模式提供了一种更灵活的方法。 Saga 模式不是尝试将事务作为单个单元执行,而是将事务分解为可以独立执行的更小的、孤立的步骤。每个步骤都是一个本地事务,更新数据库,然后触发下一步。如果某个步骤失败,系统会执行补偿操作以撤消先前步骤所做的更改,确保系统可以返回到一致的状态。

2.1 什么是Saga模式?

Saga 模式本质上是一系列依次执行的较小事务。其工作原理如下:

  • 本地事务:事务中涉及的每个服务都执行自己的本地事务。例如,在订单处理系统中,一项服务可能处理付款,另一项服务处理库存,另一项服务处理订单记录。
  • 事件或消息发布:服务完成其本地事务后,它会发布事件或发送消息,指示该步骤已成功完成。例如,在处理付款后,付款服务可能会发布“PaymentCompleted”事件。
  • 触发下一步:序列中的下一个服务侦听事件,并在收到事件后继续其本地事务。这将持续到交易中的所有步骤完成为止。
  • 补偿操作:如果任何步骤失败,则调用补偿操作。这些操作旨在逆转前面步骤所做的更改。例如,如果付款后库存减少失败,补偿操作将退还付款。

2.2 传奇的类型

实现Saga模式主要有两种方式:编排编排

2.2.1 编排传奇

在编舞传奇中,没有中央协调员。相反,Saga 中涉及的每个服务都会侦听事件并根据先前步骤的结果决定何时采取行动。这种方法是分散的,允许服务独立运行。其工作原理如下:

  • 基于事件的协调:每个服务负责处理与其相关的事件。例如,支付服务处理完一笔支付后,会发出“PaymentCompleted”事件。库存服务侦听此事件,并在收到该事件时扣除商品计数。
  • 去中心化控制:由于没有中央协调器,每个服务必须根据收到的事件知道下一步要做什么。这为系统提供了更大的灵活性,但需要仔细规划以确保所有服务理解正确的操作顺序。
  • 补偿操作:如果服务检测到出现问题,它可以发出失败事件,其他服务会侦听该事件以触发补偿操作。例如,如果库存服务无法更新库存,它可能会发出“InventoryUpdateFailed”事件,支付服务会侦听该事件以触发退款。

编排的优点:

  • 松耦合:服务是松散耦合的,这使得扩展和修改单个服务变得更加容易。
  • 弹性:由于每个服务都是独立运行的,因此系统对各个服务的故障具有更强的弹性。

编舞的挑战:

  • 复杂性:随着服务数量的增长,管理和理解事件流可能会变得复杂。
  • 缺乏中央控制:如果没有中央协调器,监控和调试整体交易流程可能会更加困难。

2.2.2 编排传奇

在 Orchestration Saga 中,中央编排器控制事务流。协调器确定步骤的顺序并处理服务之间的通信。其工作原理如下:

  • 集中控制:编排器按顺序向每个服务发送命令。例如,编排器可能首先指示支付服务处理支付。完成后,它会告诉库存服务更新库存,依此类推。
  • 顺序执行:每个服务仅在协调器的指示下执行其任务,确保步骤按正确的顺序发生。
  • 补偿逻辑:协调器还负责在出现问题时启动补偿操作。例如,如果库存更新失败,编排器可以命令支付服务退款。

编排的优点:

  • 集中控制:使用单个编排器,可以更轻松地监控、管理和调试交易流程。
  • 更简单的逻辑:由于编排器处理流程,各个服务不需要了解整体事务顺序。

编排的挑战:

  • 单点故障:如果没有针对高可用性进行设计,编排器可能会成为瓶颈或单点故障。
  • 与 Orchestrator 的紧密耦合:服务依赖于 Orchestrator,与编排相比,这可能会使系统不够灵活。

3. 实现简单编排传奇模式:分步指南

让我们考虑电子商务场景并使用Saga模式来实现它。

在我们的咖啡购买场景中,每项服务都代表一次本地交易。咖啡服务充当这个传奇的协调者,协调其他服务来完成购买。

以下是这个传奇如何运作的详细说明:

  • 客户下订单:客户通过订单服务下订单。
  • 咖啡服务启动传奇:咖啡服务收到订单并启动传奇。
  • 订单服务创建一个订单:订单服务创建一个新订单并保留它。
  • 账单服务计算费用:账单服务计算订单的总费用并创建账单记录。
  • 付款服务处理付款:付款服务处理付款。
  • 咖啡服务更新订单状态:支付成功后,咖啡服务将订单状态更新为“已完成”。

How the Saga Pattern Resolves Distributed Transaction Issues: Methods and Real-World Example

3.1 交易主体

How the Saga Pattern Resolves Distributed Transaction Issues: Methods and Real-World Example

在我的传奇实现中,每个 SagaItemBuilder 代表我们分布式事务流中的一个步骤。 ActionBuilder 定义了要执行的操作,包括主操作和发生错误时执行的回滚操作。 ActionBuilder 封装了三部分信息:

component :要调用的方法所在的 Bean 实例。

method :要调用的方法的名称。

args :要传递给方法的参数。

ActionBuilder

public class ActionBuilder {
    private Object component;
    private String method;
    private Object[] args;

    public static ActionBuilder builder() {
        return new ActionBuilder();
    }

    public ActionBuilder component(Object component) {
        this.component = component;
        return this;
    }

    public ActionBuilder method(String method) {
        this.method = method;
        return this;
    }

    public ActionBuilder args(Object... args) {
        this.args = args;
        return this;
    }

    public Object getComponent() { return component; }
    public String getMethod() { return method; }
    public Object[] getArgs() { return args; }
}

SagaItemBuilder

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class SagaItemBuilder {
    private ActionBuilder action;
    private Map<Class<? extends Exception>, ActionBuilder> onBehaviour;

    public static SagaItemBuilder builder() {
        return new SagaItemBuilder();
    }

    public SagaItemBuilder action(ActionBuilder action) {
        this.action = action;
        return this;
    }

    public SagaItemBuilder onBehaviour(Class<? extends Exception> exception, ActionBuilder action) {
        if (Objects.isNull(onBehaviour)) onBehaviour = new HashMap<>();
        onBehaviour.put(exception, action);
        return this;
    }

    public ActionBuilder getAction() {
        return action;
    }

    public Map<Class<? extends Exception>, ActionBuilder> getBehaviour() {
        return onBehaviour;
    }
}

场景

import java.util.ArrayList;
import java.util.List;

public class Scenarios {
    List<SagaItemBuilder> scenarios;

    public static Scenarios builder() {
        return new Scenarios();
    }

    public Scenarios scenario(SagaItemBuilder sagaItemBuilder) {
        if (scenarios == null) scenarios = new ArrayList<>();
        scenarios.add(sagaItemBuilder);
        return this;
    }

    public List<SagaItemBuilder> getScenario() {
        return scenarios;
    }
}

以下是我如何提交分发事务。

package com.example.demo.saga;

import com.example.demo.saga.exception.CanNotRollbackException;
import com.example.demo.saga.exception.RollBackException;
import com.example.demo.saga.pojo.ActionBuilder;
import com.example.demo.saga.pojo.SagaItemBuilder;
import com.example.demo.saga.pojo.Scenarios;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.Set;

@Component
public class DTC {

    public boolean commit(Scenarios scenarios) throws Exception {
        validate(scenarios);
        for (int i = 0; i < scenarios.getScenario().size(); i++) {
            SagaItemBuilder scenario = scenarios.getScenario().get(i);
            ActionBuilder action = scenario.getAction();
            Object bean = action.getComponent();
            String method = action.getMethod();
            Object[] args = action.getArgs();

            try {
                invoke(bean, method, args);
            } catch (Exception e) {
                rollback(scenarios, i, e);
                return false;
            }
        }
        return true;
    }

    private void rollback(Scenarios scenarios, Integer failStep, Exception currentStepFailException) {
        for (int i = failStep; i >= 0; i--) {
            SagaItemBuilder scenario = scenarios.getScenario().get(i);
            Map<Class<? extends Exception>, ActionBuilder> behaviours = scenario.getBehaviour();
            Set<Class<? extends Exception>> exceptions = behaviours.keySet();
            ActionBuilder actionWhenException = null;

            if (failStep == i) {
                for(Class<? extends Exception> exception: exceptions) {
                    if (exception.isInstance(currentStepFailException)) {
                        actionWhenException = behaviours.get(exception);
                    }
                }
                if (actionWhenException == null) actionWhenException = behaviours.get(RollBackException.class);
            } else {
                actionWhenException = behaviours.get(RollBackException.class);
            }

            Object bean = actionWhenException.getComponent();
            String method = actionWhenException.getMethod();
            Object[] args = actionWhenException.getArgs();
            try {
                invoke(bean, method, args);
            } catch (Exception e) {
                throw new CanNotRollbackException("Error in %s belong to %s. Can not rollback transaction".formatted(method, bean.getClass()));
            }
        }
    }

    private void validate(Scenarios scenarios) throws Exception {
        for (int i = 0; i < scenarios.getScenario().size(); i++) {
            SagaItemBuilder scenario = scenarios.getScenario().get(i);
            ActionBuilder action = scenario.getAction();
            if (action.getComponent() == null) throw new Exception("Missing bean in scenario");
            if (action.getMethod() == null) throw new Exception("Missing method in scenario");

            Map<Class<? extends Exception>, ActionBuilder> behaviours = scenario.getBehaviour();
            Set<Class<? extends Exception>> exceptions = behaviours.keySet();
            if (exceptions.contains(null)) throw new Exception("Exception can not be null in scenario has method %s, bean %s " .formatted(action.getMethod(), action.getComponent().getClass()));
            if (!exceptions.contains(RollBackException.class)) throw new Exception("Missing default RollBackException in scenario has method %s, bean %s " .formatted(action.getMethod(), action.getComponent().getClass()));
        }
    }

    public String invoke(Object bean, String methodName, Object... args) throws Exception {
        try {
            Class<?>[] paramTypes = new Class[args.length];
            for (int i = 0; i < args.length; i++) {
                paramTypes[i] = parameterType(args[i]);
            }
            Method method = bean.getClass().getDeclaredMethod(methodName, paramTypes);
            Object result = method.invoke(bean, args);
            return result != null ? result.toString() : null;
        } catch (Exception e) {
            throw e;
        }
    }

    private static Class<?> parameterType (Object o) {
        if (o instanceof Integer) {
           return int.class;
        } else if (o instanceof Boolean) {
            return boolean.class;
        } else if (o instanceof Double) {
            return double.class;
        } else if (o instanceof Float) {
            return float.class;
        } else if (o instanceof Long) {
            return long.class;
        } else if (o instanceof Short) {
            return short.class;
        } else if (o instanceof Byte) {
            return byte.class;
        } else if (o instanceof Character) {
            return char.class;
        } else {
            return o.getClass();
        }
    }
}

3.2 使用

我有 3 个调用外部服务的服务:BillingServiceOrderServicePaymentService

订单服务

package com.example.demo.service;

import org.springframework.stereotype.Service;

@Service
public class OrderService {

    public String prepareOrder(String name, int number) {
        System.out.println("Prepare order for %s with order id %d ".formatted(name, number));
        return "Prepare order for %s with order id %d ".formatted(name, number);
    }

    public void Rollback_prepareOrder_NullPointException() {
        System.out.println("Rollback prepareOrder because NullPointException");
    }

    public void Rollback_prepareOrder_RollBackException() {
        System.out.println("Rollback prepareOrder because RollBackException");
    }
}

计费服务

package com.example.demo.service;

import org.springframework.stereotype.Service;

@Service
public class BillingService {

    public String prepareBilling(String name, int number) {
        System.out.println("Prepare billing for %s with order id %d ".formatted(name, number));
        return "Prepare billing for %s with order id %d ".formatted(name, number);
    }

    public String createBilling(String name, int number) {
        System.out.println("Create billing for %s with order id %d ".formatted(name, number));
        return "Create billing for %s with order id %d ".formatted(name, number);
    }

    public void Rollback_prepareBilling_NullPointException() {
        System.out.println("Rollback prepareBilling because NullPointException");
    }

    public void Rollback_prepareBilling_ArrayIndexOutOfBoundsException() {
        System.out.println("Rollback prepareBilling because ArrayIndexOutOfBoundsException");
    }

    public void Rollback_prepareBilling_RollBackException() {
        System.out.println("Rollback prepareBilling because RollBackException");
    }

    public void Rollback_createBilling_NullPointException() {
        System.out.println("Rollback createBilling because NullPointException");
    }

    public void Rollback_createBilling_ArrayIndexOutOfBoundsException() {
        System.out.println("Rollback createBilling because ArrayIndexOutOfBoundsException");
    }

    public void Rollback_createBilling_RollBackException() {
        System.out.println("Rollback createBilling because RollBackException");
    }
}

付款服务

package com.example.demo.service;

import org.springframework.stereotype.Service;

@Service
public class PaymentService {

    public String createPayment() {
        System.out.println("Create payment");
        return "Create payment";
    }

    public void Rollback_createPayment_NullPointException() {
        System.out.println("Rollback createPayment because NullPointException");
    }

    public void Rollback_createPayment_RollBackException() {
        System.out.println("Rollback createPayment because RollBackException");
    }
}

在 Coffee Service 中,我按如下方式实现它,创建一个场景,然后提交它。

package com.example.demo.service;

import com.example.demo.saga.DTC;
import com.example.demo.saga.exception.RollBackException;
import com.example.demo.saga.pojo.ActionBuilder;
import com.example.demo.saga.pojo.SagaItemBuilder;
import com.example.demo.saga.pojo.Scenarios;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class CoffeeService {

    @Autowired
    private OrderService orderService;

    @Autowired
    private BillingService billingService;

    @Autowired
    private PaymentService paymentService;

    @Autowired
    private DTC dtc;

    public String test() throws Exception {
        Scenarios scenarios = Scenarios.builder()
                .scenario(
                        SagaItemBuilder.builder()
                                .action(ActionBuilder.builder().component(orderService).method("prepareOrder").args("tuanh.net", 123))
                                .onBehaviour(NullPointerException.class, ActionBuilder.builder().component(orderService).method("Rollback_prepareOrder_NullPointException").args())
                                .onBehaviour(RollBackException.class, ActionBuilder.builder().component(orderService).method("Rollback_prepareOrder_RollBackException").args())
                ).scenario(
                        SagaItemBuilder.builder()
                                .action(ActionBuilder.builder().component(billingService).method("prepareBilling").args("tuanh.net", 123))
                                .onBehaviour(NullPointerException.class, ActionBuilder.builder().component(billingService).method("Rollback_prepareBilling_NullPointException").args())
                                .onBehaviour(RollBackException.class, ActionBuilder.builder().component(billingService).method("Rollback_prepareBilling_RollBackException").args())
                ).scenario(
                         SagaItemBuilder.builder()
                                .action(ActionBuilder.builder().component(billingService).method("createBilling").args("tuanh.net", 123))
                                .onBehaviour(NullPointerException.class, ActionBuilder.builder().component(billingService).method("Rollback_createBilling_ArrayIndexOutOfBoundsException").args())
                                .onBehaviour(RollBackException.class, ActionBuilder.builder().component(billingService).method("Rollback_createBilling_RollBackException").args())
                ).scenario(
                        SagaItemBuilder.builder()
                                .action(ActionBuilder.builder().component(paymentService).method("createPayment").args())
                                .onBehaviour(NullPointerException.class, ActionBuilder.builder().component(paymentService).method("Rollback_createPayment_NullPointException").args())
                                .onBehaviour(RollBackException.class, ActionBuilder.builder().component(paymentService).method("Rollback_createPayment_RollBackException").args())
                );
        dtc.commit(scenarios);
        return "ok";
    }
}

3.3 结果

当我在创建账单时例外。

public String createBilling(String name, int number) {
    throw new NullPointerException();
}

结果

2024-08-24T14:21:45.445+07:00 INFO 19736 --- [demo] [main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8080 (http) with context path '/'
2024-08-24T14:21:45.450+07:00 INFO 19736 --- [demo] [main] com.example.demo.DemoApplication : Started DemoApplication in 1.052 seconds (process running for 1.498)
2024-08-24T14:21:47.756+07:00 INFO 19736 --- [demo] [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2024-08-24T14:21:47.756+07:00 INFO 19736 --- [demo] [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2024-08-24T14:21:47.757+07:00 INFO 19736 --- [demo] [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
Prepare order for tuanh.net with order id 123 
Prepare billing for tuanh.net with order id 123 
Rollback createBilling because RollBackException
Rollback prepareBilling because RollBackException
Rollback prepareOrder because RollBackException

查看我的 GitHub 存储库

4. 结论

总之,Saga 模式通过将分布式事务分解为更小的、可管理的步骤,为管理分布式事务提供了一个强大的解决方案。编排和编排之间的选择取决于系统的特定需求和架构。编排提供松散耦合和弹性,而编排提供集中控制和更轻松的监控。通过使用 Saga 模式仔细设计系统,您可以在分布式微服务架构中实现一致性、可用性和灵活性。

如果您对在系统中实现 Saga 模式有任何疑问或需要进一步说明,请随时在下面发表评论!

阅读更多帖子:Saga 模式如何解决分布式事务问题:方法和实际示例

以上是Saga 模式如何解决分布式事务问题:方法和实际示例的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn