首頁 >Java >java教程 >Saga 模式如何解決分散式事務問題:方法和實際範例

Saga 模式如何解決分散式事務問題:方法和實際範例

Linda Hamilton
Linda Hamilton原創
2024-10-20 20:11:02601瀏覽

1. 理解問題:分散式事務的複雜性

How the Saga Pattern Resolves Distributed Transaction Issues: Methods and Real-World Example

分散式事務涉及多個微服務,其中每個服務執行事務的一部分。例如,電子商務平台可能涉及支付、庫存和訂單管理等服務。這些服務需要協同工作才能完成交易。但是,如果其中一項服務失敗會發生什麼情況?

1.1 真實場景

想像一個電子商務應用程序,在下訂單期間會發生以下步驟:

  • 第1步:從客戶的帳戶中扣除付款。
  • 第 2 步:減少庫存中的物品數量。
  • 第3步:在訂單管理系統中建立訂單。

How the Saga Pattern Resolves Distributed Transaction Issues: Methods and Real-World Example

如果在扣款之後、建立訂單之前庫存服務失敗,系統最終會處於不一致的狀態。客戶已付費,但未下訂單。

1.2 傳統解決方案及其局限性

為了處理此類故障,可以考慮使用具有兩階段提交協定的分散式事務。然而,這引入了幾個問題:

  • 高延遲:每個服務必須在交易期間鎖定資源,導致延遲增加。
  • 可用性降低:如果任何服務失敗,整個事務將回滾,從而降低整體系統可用性。
  • 緊密耦合:服務變得緊密耦合,使得擴展或修改單一服務變得更加困難。

2. Saga模式如何解決問題

在分散式系統中,事務通常跨越多個微服務。確保所有服務要么成功完成,要么根本沒有完成是具有挑戰性的。處理此問題的傳統方法(使用具有兩階段提交的分散式事務)可能會因高延遲、緊密耦合和可用性降低等問題而出現問題。

How the Saga Pattern Resolves Distributed Transaction Issues: Methods and Real-World Example

Saga 模式提供了一種更靈活的方法。 Saga 模式不是嘗試將事務作為單一單元執行,而是將事務分解為可以獨立執行的較小的、孤立的步驟。每個步驟都是一個本地事務,更新資料庫,然後觸發下一步。如果某個步驟失敗,系統會執行補償操作以撤銷先前步驟所做的更改,確保系統可以回到一致的狀態。

2.1 什麼是Saga模式?

Saga 模式本質上是一系列依序執行的較小事務。其工作原理如下:

  • 本地事務:事務中涉及的每個服務都執行自己的本地事務。例如,在訂單處理系統中,一項服務可能處理付款,另一項服務處理庫存,另一項服務處理訂單記錄。
  • 事件或訊息發布:服務完成其本地事務後,它會發布事件或發送訊息,指示該步驟已成功完成。例如,在處理付款後,付款服務可能會發布「PaymentCompleted」事件。
  • 觸發下一步:序列中的下一個服務偵聽事件,並在收到事件後繼續其本地事務。這將持續到交易中的所有步驟完成為止。
  • 補償操作:如果任何步驟失敗,則呼叫補償操作。這些操作旨在逆轉前面步驟所做的變更。例如,如果付款後庫存減少失敗,補償操作將退還付款。

2.2 傳奇的類型

實現Saga模式主要有兩種方式:編排編排

2.2.1 編排傳奇

在編舞傳奇中,沒有中央協調員。相反,Saga 中涉及的每個服務都會偵聽事件並根據先前步驟的結果決定何時採取行動。這種方法是分散的,允許服務獨立運作。其工作原理如下:

  • 基於事件的協調:每個服務負責處理與其相關的事件。例如,支付服務處理完一筆付款後,會發出「PaymentCompleted」事件。庫存服務偵聽此事件,並在收到事件時扣除商品計數。
  • 去中心化控制:由於沒有中央協調器,每個服務都必須根據收到的事件知道下一步要做什麼。這為系統提供了更大的靈活性,但需要仔細規劃以確保所有服務都理解正確的操作順序。
  • 補償操作:如果服務偵測到出現問題,它可以發出失敗事件,其他服務會偵聽該事件以觸發補償操作。例如,如果庫存服務無法更新庫存,它可能會發出「InventoryUpdateFailed」事件,支付服務會偵聽該事件以觸發退款。

編排的優點:

  • 鬆散耦合:服務是鬆散耦合的,這使得擴展和修改單一服務變得更加容易。
  • 彈性:由於每個服務都是獨立運作的,因此系統對各個服務的故障具有更強的彈性。

編舞的挑戰:

  • 複雜性:隨著服務數量的增長,管理和理解事件流可能會變得複雜。
  • 缺乏中央控制:如果沒有中央協調器,監控和調試整體交易流程可能會更加困難。

2.2.2 編排傳奇

在 Orchestration Saga 中,中央編排器控制事務流。協調器決定步驟的順序並處理服務之間的通訊。其工作原理如下:

  • 集中控制:編排器依序傳送指令給每個服務。例如,編排器可能首先指示支付服務處理支付。完成後,它會告訴庫存服務更新庫存,依此類推。
  • 順序執行:每個服務僅在協調器的指示下執行其任務,確保步驟以正確的順序發生。
  • 補償邏輯:協調器也負責在出現問題時啟動補償操作。例如,如果庫存更新失敗,編排器可以命令支付服務退款。

編排的優點:

  • 集中控制:使用單一編排器,可以更輕鬆地監控、管理和偵錯交易流程。
  • 更簡單的邏輯:由於編排器處理流程,各個服務不需要了解整體事務順序。

編排的挑戰:

  • 單點故障:如果沒有針對高可用性進行設計,編排器可能會成為瓶頸或單點故障。
  • 與 Orchestrator 的緊密耦合:服務依賴 Orchestrator,與編排相比,這可能會使系統不夠靈活。

3. 實現簡單編排傳奇模式:逐步指南

讓我們考慮電子商務場景並使用Saga模式來實現它。

在我們的咖啡購買場景中,每項服務都代表一次本地交易。咖啡服務充當這個傳奇的協調者,協調其他服務來完成購買。

以下是這個傳奇如何運作的詳細說明:

  • 客戶下訂單:客戶透過訂單服務下訂單。
  • 咖啡服務啟動傳奇:咖啡服務收到訂單並啟動傳奇。
  • 訂單服務建立一個訂單:訂單服務建立一個新訂單並保留它。
  • 帳單服務計算費用:帳單服務計算訂單的總費用並建立帳單記錄。
  • 付款服務處理付款:付款服務處理付款。
  • 咖啡服務更新訂單狀態:付款成功後,咖啡服務將訂單狀態更新為「完成」。

How the Saga Pattern Resolves Distributed Transaction Issues: Methods and Real-World Example

3.1 交易主體

How the Saga Pattern Resolves Distributed Transaction Issues: Methods and Real-World Example

在我的傳奇實作中,每個 SagaItemBuilder 代表我們分散式事務流程中的一個步驟。 ActionBuilder 定義了要執行的操作,包括主操作和發生錯誤時執行的回溯操作。 ActionBuilder 封裝了三部分資訊:

component :要呼叫的方法所在的 Bean 實例。

method :要呼叫的方法的名稱。

args :要傳遞給方法的參數。

ActionBuilder

public class ActionBuilder {
    private Object component;
    private String method;
    private Object[] args;

    public static ActionBuilder builder() {
        return new ActionBuilder();
    }

    public ActionBuilder component(Object component) {
        this.component = component;
        return this;
    }

    public ActionBuilder method(String method) {
        this.method = method;
        return this;
    }

    public ActionBuilder args(Object... args) {
        this.args = args;
        return this;
    }

    public Object getComponent() { return component; }
    public String getMethod() { return method; }
    public Object[] getArgs() { return args; }
}

SagaItemBuilder

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class SagaItemBuilder {
    private ActionBuilder action;
    private Map<Class<? extends Exception>, ActionBuilder> onBehaviour;

    public static SagaItemBuilder builder() {
        return new SagaItemBuilder();
    }

    public SagaItemBuilder action(ActionBuilder action) {
        this.action = action;
        return this;
    }

    public SagaItemBuilder onBehaviour(Class<? extends Exception> exception, ActionBuilder action) {
        if (Objects.isNull(onBehaviour)) onBehaviour = new HashMap<>();
        onBehaviour.put(exception, action);
        return this;
    }

    public ActionBuilder getAction() {
        return action;
    }

    public Map<Class<? extends Exception>, ActionBuilder> getBehaviour() {
        return onBehaviour;
    }
}

場景

import java.util.ArrayList;
import java.util.List;

public class Scenarios {
    List<SagaItemBuilder> scenarios;

    public static Scenarios builder() {
        return new Scenarios();
    }

    public Scenarios scenario(SagaItemBuilder sagaItemBuilder) {
        if (scenarios == null) scenarios = new ArrayList<>();
        scenarios.add(sagaItemBuilder);
        return this;
    }

    public List<SagaItemBuilder> getScenario() {
        return scenarios;
    }
}

以下是我如何提交分發事務。

package com.example.demo.saga;

import com.example.demo.saga.exception.CanNotRollbackException;
import com.example.demo.saga.exception.RollBackException;
import com.example.demo.saga.pojo.ActionBuilder;
import com.example.demo.saga.pojo.SagaItemBuilder;
import com.example.demo.saga.pojo.Scenarios;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.Set;

@Component
public class DTC {

    public boolean commit(Scenarios scenarios) throws Exception {
        validate(scenarios);
        for (int i = 0; i < scenarios.getScenario().size(); i++) {
            SagaItemBuilder scenario = scenarios.getScenario().get(i);
            ActionBuilder action = scenario.getAction();
            Object bean = action.getComponent();
            String method = action.getMethod();
            Object[] args = action.getArgs();

            try {
                invoke(bean, method, args);
            } catch (Exception e) {
                rollback(scenarios, i, e);
                return false;
            }
        }
        return true;
    }

    private void rollback(Scenarios scenarios, Integer failStep, Exception currentStepFailException) {
        for (int i = failStep; i >= 0; i--) {
            SagaItemBuilder scenario = scenarios.getScenario().get(i);
            Map<Class<? extends Exception>, ActionBuilder> behaviours = scenario.getBehaviour();
            Set<Class<? extends Exception>> exceptions = behaviours.keySet();
            ActionBuilder actionWhenException = null;

            if (failStep == i) {
                for(Class<? extends Exception> exception: exceptions) {
                    if (exception.isInstance(currentStepFailException)) {
                        actionWhenException = behaviours.get(exception);
                    }
                }
                if (actionWhenException == null) actionWhenException = behaviours.get(RollBackException.class);
            } else {
                actionWhenException = behaviours.get(RollBackException.class);
            }

            Object bean = actionWhenException.getComponent();
            String method = actionWhenException.getMethod();
            Object[] args = actionWhenException.getArgs();
            try {
                invoke(bean, method, args);
            } catch (Exception e) {
                throw new CanNotRollbackException("Error in %s belong to %s. Can not rollback transaction".formatted(method, bean.getClass()));
            }
        }
    }

    private void validate(Scenarios scenarios) throws Exception {
        for (int i = 0; i < scenarios.getScenario().size(); i++) {
            SagaItemBuilder scenario = scenarios.getScenario().get(i);
            ActionBuilder action = scenario.getAction();
            if (action.getComponent() == null) throw new Exception("Missing bean in scenario");
            if (action.getMethod() == null) throw new Exception("Missing method in scenario");

            Map<Class<? extends Exception>, ActionBuilder> behaviours = scenario.getBehaviour();
            Set<Class<? extends Exception>> exceptions = behaviours.keySet();
            if (exceptions.contains(null)) throw new Exception("Exception can not be null in scenario has method %s, bean %s " .formatted(action.getMethod(), action.getComponent().getClass()));
            if (!exceptions.contains(RollBackException.class)) throw new Exception("Missing default RollBackException in scenario has method %s, bean %s " .formatted(action.getMethod(), action.getComponent().getClass()));
        }
    }

    public String invoke(Object bean, String methodName, Object... args) throws Exception {
        try {
            Class<?>[] paramTypes = new Class[args.length];
            for (int i = 0; i < args.length; i++) {
                paramTypes[i] = parameterType(args[i]);
            }
            Method method = bean.getClass().getDeclaredMethod(methodName, paramTypes);
            Object result = method.invoke(bean, args);
            return result != null ? result.toString() : null;
        } catch (Exception e) {
            throw e;
        }
    }

    private static Class<?> parameterType (Object o) {
        if (o instanceof Integer) {
           return int.class;
        } else if (o instanceof Boolean) {
            return boolean.class;
        } else if (o instanceof Double) {
            return double.class;
        } else if (o instanceof Float) {
            return float.class;
        } else if (o instanceof Long) {
            return long.class;
        } else if (o instanceof Short) {
            return short.class;
        } else if (o instanceof Byte) {
            return byte.class;
        } else if (o instanceof Character) {
            return char.class;
        } else {
            return o.getClass();
        }
    }
}

3.2 使用

我有 3 個呼叫外部服務的服務:BillingServiceOrderServicePaymentService

訂單服務

package com.example.demo.service;

import org.springframework.stereotype.Service;

@Service
public class OrderService {

    public String prepareOrder(String name, int number) {
        System.out.println("Prepare order for %s with order id %d ".formatted(name, number));
        return "Prepare order for %s with order id %d ".formatted(name, number);
    }

    public void Rollback_prepareOrder_NullPointException() {
        System.out.println("Rollback prepareOrder because NullPointException");
    }

    public void Rollback_prepareOrder_RollBackException() {
        System.out.println("Rollback prepareOrder because RollBackException");
    }
}

計費服務

package com.example.demo.service;

import org.springframework.stereotype.Service;

@Service
public class BillingService {

    public String prepareBilling(String name, int number) {
        System.out.println("Prepare billing for %s with order id %d ".formatted(name, number));
        return "Prepare billing for %s with order id %d ".formatted(name, number);
    }

    public String createBilling(String name, int number) {
        System.out.println("Create billing for %s with order id %d ".formatted(name, number));
        return "Create billing for %s with order id %d ".formatted(name, number);
    }

    public void Rollback_prepareBilling_NullPointException() {
        System.out.println("Rollback prepareBilling because NullPointException");
    }

    public void Rollback_prepareBilling_ArrayIndexOutOfBoundsException() {
        System.out.println("Rollback prepareBilling because ArrayIndexOutOfBoundsException");
    }

    public void Rollback_prepareBilling_RollBackException() {
        System.out.println("Rollback prepareBilling because RollBackException");
    }

    public void Rollback_createBilling_NullPointException() {
        System.out.println("Rollback createBilling because NullPointException");
    }

    public void Rollback_createBilling_ArrayIndexOutOfBoundsException() {
        System.out.println("Rollback createBilling because ArrayIndexOutOfBoundsException");
    }

    public void Rollback_createBilling_RollBackException() {
        System.out.println("Rollback createBilling because RollBackException");
    }
}

付款服務

package com.example.demo.service;

import org.springframework.stereotype.Service;

@Service
public class PaymentService {

    public String createPayment() {
        System.out.println("Create payment");
        return "Create payment";
    }

    public void Rollback_createPayment_NullPointException() {
        System.out.println("Rollback createPayment because NullPointException");
    }

    public void Rollback_createPayment_RollBackException() {
        System.out.println("Rollback createPayment because RollBackException");
    }
}

在 Coffee Service 中,我如下實作它,建立一個場景,然後提交它。

package com.example.demo.service;

import com.example.demo.saga.DTC;
import com.example.demo.saga.exception.RollBackException;
import com.example.demo.saga.pojo.ActionBuilder;
import com.example.demo.saga.pojo.SagaItemBuilder;
import com.example.demo.saga.pojo.Scenarios;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class CoffeeService {

    @Autowired
    private OrderService orderService;

    @Autowired
    private BillingService billingService;

    @Autowired
    private PaymentService paymentService;

    @Autowired
    private DTC dtc;

    public String test() throws Exception {
        Scenarios scenarios = Scenarios.builder()
                .scenario(
                        SagaItemBuilder.builder()
                                .action(ActionBuilder.builder().component(orderService).method("prepareOrder").args("tuanh.net", 123))
                                .onBehaviour(NullPointerException.class, ActionBuilder.builder().component(orderService).method("Rollback_prepareOrder_NullPointException").args())
                                .onBehaviour(RollBackException.class, ActionBuilder.builder().component(orderService).method("Rollback_prepareOrder_RollBackException").args())
                ).scenario(
                        SagaItemBuilder.builder()
                                .action(ActionBuilder.builder().component(billingService).method("prepareBilling").args("tuanh.net", 123))
                                .onBehaviour(NullPointerException.class, ActionBuilder.builder().component(billingService).method("Rollback_prepareBilling_NullPointException").args())
                                .onBehaviour(RollBackException.class, ActionBuilder.builder().component(billingService).method("Rollback_prepareBilling_RollBackException").args())
                ).scenario(
                         SagaItemBuilder.builder()
                                .action(ActionBuilder.builder().component(billingService).method("createBilling").args("tuanh.net", 123))
                                .onBehaviour(NullPointerException.class, ActionBuilder.builder().component(billingService).method("Rollback_createBilling_ArrayIndexOutOfBoundsException").args())
                                .onBehaviour(RollBackException.class, ActionBuilder.builder().component(billingService).method("Rollback_createBilling_RollBackException").args())
                ).scenario(
                        SagaItemBuilder.builder()
                                .action(ActionBuilder.builder().component(paymentService).method("createPayment").args())
                                .onBehaviour(NullPointerException.class, ActionBuilder.builder().component(paymentService).method("Rollback_createPayment_NullPointException").args())
                                .onBehaviour(RollBackException.class, ActionBuilder.builder().component(paymentService).method("Rollback_createPayment_RollBackException").args())
                );
        dtc.commit(scenarios);
        return "ok";
    }
}

3.3 結果

當我在建立帳單時例外。

public String createBilling(String name, int number) {
    throw new NullPointerException();
}

結果

2024-08-24T14:21:45.445+07:00 INFO 19736 --- [demo] [main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8080 (http) with context path '/'
2024-08-24T14:21:45.450+07:00 INFO 19736 --- [demo] [main] com.example.demo.DemoApplication : Started DemoApplication in 1.052 seconds (process running for 1.498)
2024-08-24T14:21:47.756+07:00 INFO 19736 --- [demo] [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2024-08-24T14:21:47.756+07:00 INFO 19736 --- [demo] [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2024-08-24T14:21:47.757+07:00 INFO 19736 --- [demo] [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
Prepare order for tuanh.net with order id 123 
Prepare billing for tuanh.net with order id 123 
Rollback createBilling because RollBackException
Rollback prepareBilling because RollBackException
Rollback prepareOrder because RollBackException

查看我的 GitHub 儲存庫

4. 結論

總之,Saga 模式透過將分散式事務分解為更小的、可管理的步驟,為管理分散式事務提供了一個強大的解決方案。編排和編排之間的選擇取決於系統的特定需求和架構。編排提供鬆散耦合和彈性,而編排提供集中控制和更輕鬆的監控。透過使用 Saga 模式仔細設計系統,您可以在分散式微服務架構中實現一致性、可用性和靈活性。

如果您對在系統中實現 Saga 模式有任何疑問或需要進一步說明,請隨時在下面發表評論!

閱讀更多文章:Saga 模式如何解決分散式交易問題:方法與實際範例

以上是Saga 模式如何解決分散式事務問題:方法和實際範例的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn