首页 >后端开发 >Golang >在 Go 中构建健壮的任务执行上下文

在 Go 中构建健壮的任务执行上下文

Susan Sarandon
Susan Sarandon原创
2025-01-01 01:02:10317浏览

Building a Robust Task Execution Context in Go

这可能是我最后一次在 Go 中进行错误处理。我认为这也是最好的。我们知道我们执行的每条指令都在上下文中。并且上下文可能有错误。这时我想为什么不简单地在当前上下文之上创建一个包装器。因此,如果通过特定 fn 执行所有任务,那么我们可以检查 ctx 是否有错误,如果有错误,则不执行,否则执行并收集错误。这可能会成为一种反模式,但在它成为之前,我们可以尝试一下。

好吧,光标有一些东西要添加 ->

问题

处理并发任务时请考虑以下常见挑战:

  1. 从多个 goroutine 收集错误
  2. 维护线程安全
  3. 限制并发执行
  4. 收集所有错误的同时保留第一个错误
  5. 干净的错误处理模式

解决方案:TaskContext

让我们构建一个 TaskContext 来解决这些问题:

package taskctx

import (
    "context"
    "errors"
    "fmt"
    "sync"
)

type RunFn[T any] func() (T, error)

type TaskContext struct {
    context.Context
    mu       sync.RWMutex
    err      error
    multiErr []error
}

func NewTaskContext(parent context.Context) *TaskContext {
    if parent == nil {
        panic("cannot create context from nil parent")
    }
    return &TaskContext{Context: parent}
}

主要特点

1. 线程安全的错误处理

func (c *TaskContext) WithError(err error) *TaskContext {
    if err == nil {
        return c
    }

    c.mu.Lock()
    defer c.mu.Unlock()

    c.multiErr = append(c.multiErr, err)
    if c.err == nil {
        c.err = err
    } else {
        c.err = errors.Join(c.err, err)
    }
    return c
}

2. 单任务执行

func Run[T any](ctx *TaskContext, fn RunFn[T]) T {
    var zero T
    if err := ctx.Err(); err != nil {
        return zero
    }

    result, err := fn()
    if err != nil {
        ctx.WithError(err)
        return zero
    }
    return result
}

3. 并行任务执行

func RunParallel[T any](ctx *TaskContext, fns ...func() (T, error)) ([]T, error) {
    if err := ctx.Err(); err != nil {
        return nil, err
    }

    results := make([]T, len(fns))
    var resultsMu sync.Mutex
    var wg sync.WaitGroup
    wg.Add(len(fns))

    for i, fn := range fns {
        i, fn := i, fn
        go func() {
            defer wg.Done()
            result, err := fn()
            if err != nil {
                ctx.AddError(fmt.Errorf("task %d: %w", i+1, err))
            } else {
                resultsMu.Lock()
                results[i] = result
                resultsMu.Unlock()
            }
        }()
    }

    wg.Wait()
    return results, ctx.Errors()
}

4. 受控并发

func RunParallelWithLimit[T any](ctx *TaskContext, limit int, fns ...func() (T, error)) ([]T, error) {
    // ... similar to RunParallel but with semaphore ...
    sem := make(chan struct{}, limit)
    // ... implementation ...
}

使用示例

简单的任务执行

func ExampleTaskContext_ShipmentProcessing() {
    ctx := goctx.NewTaskContext(context.Background())

    order := dummyOrder()
    shipment := dummyShipment()

    // Step 1: Validate address
    // Step 2: Calculate shipping cost
    // Step 3: Generate label
    _ = goctx.Run(ctx, validateAddress("123 Main St"))
    cost := goctx.Run(ctx, calculateShipping(order))
    trackingNum := goctx.Run(ctx, generateLabel(shipment.OrderID, cost))

    if ctx.Err() != nil {
        fmt.Printf("Error: %v\n", ctx.Err())
        return
    }

    shipment.Status = "READY"
    shipment.TrackingNum = trackingNum
    fmt.Printf("Shipment processed: %+v\n", shipment)

    // Output:
    // Shipment processed: {OrderID:ORD123 Status:READY TrackingNum:TRACK-ORD123-1234567890}
}

并行任务执行

func ExampleTaskContext_OrderProcessing() {
    ctx := goctx.NewTaskContext(context.Background())

    // Mock order
    order := []OrderItem{
        {ProductID: "LAPTOP", Quantity: 2},
        {ProductID: "MOUSE", Quantity: 3},
    }

    taskCtx := goctx.NewTaskContext(ctx)

    // Create inventory checks for each item
    inventoryChecks := goctx.Run[[]goctx.RunFn[bool]](taskCtx,
        func() ([]goctx.RunFn[bool], error) {
            return streams.NewTransformer[OrderItem, goctx.RunFn[bool]](order).
                Transform(streams.MapItSimple(checkInventory)).
                Result()
        })

    // Run inventory checks in parallel
    _, err := goctx.RunParallel(ctx, inventoryChecks...)
    fmt.Printf("Inventory check error: %v\n", err)

    // Output:
    // Inventory check error: task 1: insufficient inventory for LAPTOP
}

好处

  1. 线程安全:所有操作都受到互斥锁的保护
  2. 错误集合:维护第一个错误和所有错误
  3. 上下文集成:与Go的上下文包一起使用
  4. 通用支持:适用于任何返回类型
  5. 并发控制:限制并行执行的内置支持

测试

以下是测试实施的方法:

func TestTaskContext(t *testing.T) {
    t.Run("handles parallel errors", func(t *testing.T) {
        ctx := NewTaskContext(context.Background())
        _, err := RunParallel(ctx,
            func() (int, error) { return 0, errors.New("error 1") },
            func() (int, error) { return 0, errors.New("error 2") },
        )
        assert.Error(t, err)
        assert.Contains(t, err.Error(), "error 1")
        assert.Contains(t, err.Error(), "error 2")
    })
}

结论

这个 TaskContext 实现提供了一个强大的解决方案,可以在 Go 中通过正确的错误处理来处理并发任务执行。当您需要执行以下操作时,它特别有用:

  • 同时执行多个任务
  • 收集所有任务的错误
  • 限制并发执行
  • 维护线程安全
  • 收集所有错误的同时跟踪第一个错误

完整代码可在 GitHub 上获取。

资源

  • Go 上下文包
  • Go 并发模式
  • Go 中的错误处理

您使用什么模式来处理 Go 中的并发任务执行?在下面的评论中分享你的想法!

  • https://x.com/mahadev_k_
  • https://in.linkedin.com/in/mahadev-k-934520223

以上是在 Go 中构建健壮的任务执行上下文的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn