首页 >后端开发 >Golang >使用通用框架在 Go 中构建健壮的 SQL 事务执行

使用通用框架在 Go 中构建健壮的 SQL 事务执行

DDD
DDD原创
2024-12-11 10:04:10702浏览

Building Robust SQL Transaction Execution in Go with a Generic Framework

在 Go 中使用 SQL 数据库时,确保原子性并在多步骤事务期间管理回滚可能具有挑战性。在本文中,我将指导您创建一个健壮、可重用且可测试的框架,用于在 Go 中执行 SQL 事务,并使用泛型来实现灵活性。

我们将构建一个 SqlWriteExec 实用程序,用于在事务中执行多个相关数据库操作。它支持无状态和有状态操作,支持复杂的工作流程,例如插入相关实体,同时无缝管理依赖关系。

为什么我们需要 SQL 事务框架?

在实际应用中,数据库操作很少是孤立的。考虑以下场景:

插入用户并自动更新其库存。
创建订单并处理付款,确保一致性。
由于涉及多个步骤,在故障期间管理回滚对于确保数据完整性至关重要。

在 Txn 管理中使用 go。

如果您正在编写数据库交易,那么在编写核心逻辑之前您可能需要考虑几个样板。虽然这个 txn 管理是由 java 中的 spring boot 管理的,并且在用 java 编写代码时你从来没有太在意这些,但在 golang 中却不是这样。下面提供了一个简单的示例

func basicTxn(db *sql.DB) error {
    // start a transaction
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    defer func() {
        if r := recover(); r != nil {
            tx.Rollback()
        } else if err != nil {
            tx.Rollback()
        } else {
            tx.Commit()
        }
    }()

    // insert data into the orders table
    _, err = tx.Exec("INSERT INTO orders (id, customer_name, order_date) VALUES (1, 'John Doe', '2022-01-01')")
    if err != nil {
        return err
    }
    return nil
}

我们不能指望为每个函数重复回滚/提交代码。这里我们有两个选择,要么创建一个类,该类将提供一个函数作为返回类型,该类在 defer 中执行时将提交/回滚 txn,要么创建一个包装类,它将所有 txn 函数包装在一起并一次性执行。

我选择了后面的选择,代码的更改如下所示。

func TestSqlWriteExec_CreateOrderTxn(t *testing.T) {

    db := setupDatabase()
    // create a new SQL Write Executor
    err := dbutils.NewSqlTxnExec[OrderRequest, OrderProcessingResponse](context.TODO(), db, nil, &OrderRequest{CustomerName: "CustomerA", ProductID: 1, Quantity: 10}).
        StatefulExec(InsertOrder).
        StatefulExec(UpdateInventory).
        StatefulExec(InsertShipment).
        Commit()
    // check if the transaction was committed successfully
    if err != nil {
        t.Fatal(err)
        return
    }
    verifyTransactionSuccessful(t, db)
    t.Cleanup(
        func() { 
            cleanup(db)
            db.Close() 
        },
    )
}
func InsertOrder(ctx context.Context, txn *sql.Tx, order *OrderRequest, orderProcessing *OrderProcessingResponse) error {
    // Insert Order
    result, err := txn.Exec("INSERT INTO orders (customer_name, product_id, quantity) VALUES (, , )", order.CustomerName, order.ProductID, order.Quantity)
    if err != nil {
        return err
    }
    // Get the inserted Order ID
    orderProcessing.OrderID, err = result.LastInsertId()
    return err
}

func UpdateInventory(ctx context.Context, txn *sql.Tx, order *OrderRequest, orderProcessing *OrderProcessingResponse) error {
    // Update Inventory if it exists and the quantity is greater than the quantity check if it exists
    result, err := txn.Exec("UPDATE inventory SET product_quantity = product_quantity -  WHERE id =  AND product_quantity >= ", order.Quantity, order.ProductID)
    if err != nil {
        return err
    }
    // Get the number of rows affected
    rowsAffected, err := result.RowsAffected()
    if rowsAffected == 0 {
        return errors.New("Insufficient inventory")
    }
    return err
}

func InsertShipment(ctx context.Context, txn *sql.Tx, order *OrderRequest, orderProcessing *OrderProcessingResponse) error {
    // Insert Shipment
    result, err := txn.Exec("INSERT INTO shipping_info (customer_name, shipping_address) VALUES (, 'Shipping Address')", order.CustomerName)
    if err != nil {
        return err
    }
    // Get the inserted Shipping ID
    orderProcessing.ShippingID, err = result.LastInsertId()
    return err
}

这段代码将会更加精确和简洁。

核心逻辑是如何实现的

这个想法是将 txn 隔离到单个 go 结构,以便它可以接受多个 txn。我所说的 txn 是指将使用我们为类创建的 txn 执行操作的函数。

type TxnFn[T any] func(ctx context.Context, txn *sql.Tx, processingReq *T) error
type StatefulTxnFn[T any, R any] func(ctx context.Context, txn *sql.Tx, processingReq *T, processedRes *R) error

这两个是函数类型,它们将接受 txn 来处理某些内容。现在在数据层中实现创建一个像这样的函数并将其传递给执行器类,该执行器类负责注入参数并执行该函数。

// SQL Write Executor is responsible when executing write operations
// For dependent writes you may need to add the dependent data to processReq and proceed to the next function call
type SqlTxnExec[T any, R any] struct {
    db               *sql.DB
    txn              *sql.Tx
    txnFns         []TxnFn[T]
    statefulTxnFns []StatefulTxnFn[T, R]
    processingReq    *T
    processedRes     *R
    ctx              context.Context
    err              error
}

这是我们存储所有 txn_fn 详细信息的地方,我们将使用 Commit() 方法来尝试提交 txn。

func (s *SqlTxnExec[T, R]) Commit() (err error) {
    defer func() {
        if p := recover(); p != nil {
            s.txn.Rollback()
            panic(p)
        } else if err != nil {
            err = errors.Join(err, s.txn.Rollback())
        } else {
            err = errors.Join(err, s.txn.Commit())
        }
        return
    }()

    for _, writeFn := range s.txnFns {
        if err = writeFn(s.ctx, s.txn, s.processingReq); err != nil {
            return
        }
    }

    for _, statefulWriteFn := range s.statefulTxnFns {
        if err = statefulWriteFn(s.ctx, s.txn, s.processingReq, s.processedRes); err != nil {
            return
        }
    }
    return
}

您可以在存储库中找到更多示例和测试 -
https://github.com/mahadev-k/go-utils/tree/main/examples

虽然现在我们偏向分布式系统和共识协议,但我们仍然使用sql并且它仍然存在。

如果有人愿意贡献并在此基础上继续发展,请告诉我!!
感谢您阅读本文!!
https://in.linkedin.com/in/mahadev-k-934520223
https://x.com/mahadev_k_

以上是使用通用框架在 Go 中构建健壮的 SQL 事务执行的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn