首頁 >後端開發 >Golang >在 Go 中建立健全的任務執行上下文

在 Go 中建立健全的任務執行上下文

Susan Sarandon
Susan Sarandon原創
2025-01-01 01:02:10295瀏覽

Building a Robust Task Execution Context in Go

這可能是我最後一次在 Go 中進行錯誤處理。我認為這也是最好的。我們知道我們執行的每條指令都在上下文中。並且上下文可能有錯誤。這時我想為什麼不簡單地在當前上下文之上創建一個包裝器。因此,如果透過特定 fn 執行所有任務,那麼我們可以檢查 ctx 是否有錯誤,如果有錯誤,則不執行,否則執行並收集錯誤。這可能會成為一種反模式,但在它成為之前,我們可以嘗試一下。

好吧,遊標有一些東西要加 ->

問題

處理併發任務時請考慮以下常見挑戰:

  1. 從多個 goroutine 收集錯誤
  2. 維護線程安全
  3. 限制並發執行
  4. 收集所有錯誤的同時保留第一個錯誤
  5. 乾淨的錯誤處理模式

解決方案:TaskContext

讓我們建立一個 TaskContext 來解決這些問題:

package taskctx

import (
    "context"
    "errors"
    "fmt"
    "sync"
)

type RunFn[T any] func() (T, error)

type TaskContext struct {
    context.Context
    mu       sync.RWMutex
    err      error
    multiErr []error
}

func NewTaskContext(parent context.Context) *TaskContext {
    if parent == nil {
        panic("cannot create context from nil parent")
    }
    return &TaskContext{Context: parent}
}

主要特點

1. 線程安全的錯誤處理

func (c *TaskContext) WithError(err error) *TaskContext {
    if err == nil {
        return c
    }

    c.mu.Lock()
    defer c.mu.Unlock()

    c.multiErr = append(c.multiErr, err)
    if c.err == nil {
        c.err = err
    } else {
        c.err = errors.Join(c.err, err)
    }
    return c
}

2. 單任務執行

func Run[T any](ctx *TaskContext, fn RunFn[T]) T {
    var zero T
    if err := ctx.Err(); err != nil {
        return zero
    }

    result, err := fn()
    if err != nil {
        ctx.WithError(err)
        return zero
    }
    return result
}

3. 並行任務執行

func RunParallel[T any](ctx *TaskContext, fns ...func() (T, error)) ([]T, error) {
    if err := ctx.Err(); err != nil {
        return nil, err
    }

    results := make([]T, len(fns))
    var resultsMu sync.Mutex
    var wg sync.WaitGroup
    wg.Add(len(fns))

    for i, fn := range fns {
        i, fn := i, fn
        go func() {
            defer wg.Done()
            result, err := fn()
            if err != nil {
                ctx.AddError(fmt.Errorf("task %d: %w", i+1, err))
            } else {
                resultsMu.Lock()
                results[i] = result
                resultsMu.Unlock()
            }
        }()
    }

    wg.Wait()
    return results, ctx.Errors()
}

4. 受控並發

func RunParallelWithLimit[T any](ctx *TaskContext, limit int, fns ...func() (T, error)) ([]T, error) {
    // ... similar to RunParallel but with semaphore ...
    sem := make(chan struct{}, limit)
    // ... implementation ...
}

使用範例

簡單的任務執行

func ExampleTaskContext_ShipmentProcessing() {
    ctx := goctx.NewTaskContext(context.Background())

    order := dummyOrder()
    shipment := dummyShipment()

    // Step 1: Validate address
    // Step 2: Calculate shipping cost
    // Step 3: Generate label
    _ = goctx.Run(ctx, validateAddress("123 Main St"))
    cost := goctx.Run(ctx, calculateShipping(order))
    trackingNum := goctx.Run(ctx, generateLabel(shipment.OrderID, cost))

    if ctx.Err() != nil {
        fmt.Printf("Error: %v\n", ctx.Err())
        return
    }

    shipment.Status = "READY"
    shipment.TrackingNum = trackingNum
    fmt.Printf("Shipment processed: %+v\n", shipment)

    // Output:
    // Shipment processed: {OrderID:ORD123 Status:READY TrackingNum:TRACK-ORD123-1234567890}
}

平行任務執行

func ExampleTaskContext_OrderProcessing() {
    ctx := goctx.NewTaskContext(context.Background())

    // Mock order
    order := []OrderItem{
        {ProductID: "LAPTOP", Quantity: 2},
        {ProductID: "MOUSE", Quantity: 3},
    }

    taskCtx := goctx.NewTaskContext(ctx)

    // Create inventory checks for each item
    inventoryChecks := goctx.Run[[]goctx.RunFn[bool]](taskCtx,
        func() ([]goctx.RunFn[bool], error) {
            return streams.NewTransformer[OrderItem, goctx.RunFn[bool]](order).
                Transform(streams.MapItSimple(checkInventory)).
                Result()
        })

    // Run inventory checks in parallel
    _, err := goctx.RunParallel(ctx, inventoryChecks...)
    fmt.Printf("Inventory check error: %v\n", err)

    // Output:
    // Inventory check error: task 1: insufficient inventory for LAPTOP
}

好處

  1. 執行緒安全性:所有操作都受到互斥鎖的保護
  2. 錯誤集合:維護第一個錯誤和所有錯誤
  3. 上下文整合:與Go的上下文套件一起使用
  4. 通用支援:適用於任何回傳類型
  5. 並發控制:限制並行執行的內建支援

測試

以下是測試實作的方法:

func TestTaskContext(t *testing.T) {
    t.Run("handles parallel errors", func(t *testing.T) {
        ctx := NewTaskContext(context.Background())
        _, err := RunParallel(ctx,
            func() (int, error) { return 0, errors.New("error 1") },
            func() (int, error) { return 0, errors.New("error 2") },
        )
        assert.Error(t, err)
        assert.Contains(t, err.Error(), "error 1")
        assert.Contains(t, err.Error(), "error 2")
    })
}

結論

這個 TaskContext 實作提供了一個強大的解決方案,可以在 Go 中透過正確的錯誤處理來處理並發任務執行。當您需要執行以下操作時,它特別有用:

  • 同時執行多個任務
  • 收集所有任務的錯誤
  • 限制並發執行
  • 維護線程安全
  • 收集所有錯誤的同時追蹤第一個錯誤

完整程式碼可在 GitHub 上取得。

資源

  • Go 上下文包
  • Go 並發模式
  • Go 中的錯誤處理

您使用什麼模式來處理 Go 中的並發任務執行?在下面的評論中分享你的想法!

  • https://x.com/mahadev_k_
  • https://in.linkedin.com/in/mahadev-k-934520223

以上是在 Go 中建立健全的任務執行上下文的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn