ホームページ >バックエンド開発 >Golang >Go で堅牢なタス​​ク実行コンテキストを構築する

Go で堅牢なタス​​ク実行コンテキストを構築する

Susan Sarandon
Susan Sarandonオリジナル
2025-01-01 01:02:10285ブラウズ

Building a Robust Task Execution Context in Go

これは、Go でのエラー処理に関する私の最後の見解になるかもしれません。私もこれが一番良いと思います。私たちは、実行するすべての命令がコンテキスト内にあることを知っています。また、コンテキストにはエラーが含まれる可能性があります。このとき、現在のコンテキストの上に単純にラッパーを作成したらどうだろうと考えました。したがって、すべてのタスクが特定の fn 経由で実行された場合、ctx にエラーがあるかどうかを確認し、エラーがある場合は実行しないで、実行してエラーを収集することができます。これはアンチパターンになるかもしれませんが、そうなるまでは試してみることができます。

まあ、カーソルには追加するものがほとんどありませんでした ->

問題

同時タスクを扱うときは、次の一般的な課題を考慮してください。

  1. 複数のゴルーチンからのエラーの収集
  2. スレッドの安全性を維持する
  3. 同時実行の制限
  4. すべてのエラーを収集しながら最初のエラーを保持する
  5. クリーンなエラー処理パターン

解決策: TaskContext

これらの問題を解決する TaskContext を構築しましょう:

package taskctx

import (
    "context"
    "errors"
    "fmt"
    "sync"
)

type RunFn[T any] func() (T, error)

type TaskContext struct {
    context.Context
    mu       sync.RWMutex
    err      error
    multiErr []error
}

func NewTaskContext(parent context.Context) *TaskContext {
    if parent == nil {
        panic("cannot create context from nil parent")
    }
    return &TaskContext{Context: parent}
}

主な特長

1. スレッドセーフなエラー処理

func (c *TaskContext) WithError(err error) *TaskContext {
    if err == nil {
        return c
    }

    c.mu.Lock()
    defer c.mu.Unlock()

    c.multiErr = append(c.multiErr, err)
    if c.err == nil {
        c.err = err
    } else {
        c.err = errors.Join(c.err, err)
    }
    return c
}

2. 単一タスクの実行

func Run[T any](ctx *TaskContext, fn RunFn[T]) T {
    var zero T
    if err := ctx.Err(); err != nil {
        return zero
    }

    result, err := fn()
    if err != nil {
        ctx.WithError(err)
        return zero
    }
    return result
}

3. タスクの並列実行

func RunParallel[T any](ctx *TaskContext, fns ...func() (T, error)) ([]T, error) {
    if err := ctx.Err(); err != nil {
        return nil, err
    }

    results := make([]T, len(fns))
    var resultsMu sync.Mutex
    var wg sync.WaitGroup
    wg.Add(len(fns))

    for i, fn := range fns {
        i, fn := i, fn
        go func() {
            defer wg.Done()
            result, err := fn()
            if err != nil {
                ctx.AddError(fmt.Errorf("task %d: %w", i+1, err))
            } else {
                resultsMu.Lock()
                results[i] = result
                resultsMu.Unlock()
            }
        }()
    }

    wg.Wait()
    return results, ctx.Errors()
}

4. 同時実行性の制御

func RunParallelWithLimit[T any](ctx *TaskContext, limit int, fns ...func() (T, error)) ([]T, error) {
    // ... similar to RunParallel but with semaphore ...
    sem := make(chan struct{}, limit)
    // ... implementation ...
}

使用例

単純なタスクの実行

func ExampleTaskContext_ShipmentProcessing() {
    ctx := goctx.NewTaskContext(context.Background())

    order := dummyOrder()
    shipment := dummyShipment()

    // Step 1: Validate address
    // Step 2: Calculate shipping cost
    // Step 3: Generate label
    _ = goctx.Run(ctx, validateAddress("123 Main St"))
    cost := goctx.Run(ctx, calculateShipping(order))
    trackingNum := goctx.Run(ctx, generateLabel(shipment.OrderID, cost))

    if ctx.Err() != nil {
        fmt.Printf("Error: %v\n", ctx.Err())
        return
    }

    shipment.Status = "READY"
    shipment.TrackingNum = trackingNum
    fmt.Printf("Shipment processed: %+v\n", shipment)

    // Output:
    // Shipment processed: {OrderID:ORD123 Status:READY TrackingNum:TRACK-ORD123-1234567890}
}

タスクの並列実行

func ExampleTaskContext_OrderProcessing() {
    ctx := goctx.NewTaskContext(context.Background())

    // Mock order
    order := []OrderItem{
        {ProductID: "LAPTOP", Quantity: 2},
        {ProductID: "MOUSE", Quantity: 3},
    }

    taskCtx := goctx.NewTaskContext(ctx)

    // Create inventory checks for each item
    inventoryChecks := goctx.Run[[]goctx.RunFn[bool]](taskCtx,
        func() ([]goctx.RunFn[bool], error) {
            return streams.NewTransformer[OrderItem, goctx.RunFn[bool]](order).
                Transform(streams.MapItSimple(checkInventory)).
                Result()
        })

    // Run inventory checks in parallel
    _, err := goctx.RunParallel(ctx, inventoryChecks...)
    fmt.Printf("Inventory check error: %v\n", err)

    // Output:
    // Inventory check error: task 1: insufficient inventory for LAPTOP
}

利点

  1. スレッド セーフティ: すべての操作はミューテックスによって保護されています
  2. エラー収集: 最初のエラーとすべてのエラーの両方を保持します
  3. コンテキスト統合: Go のコンテキスト パッケージと連携します
  4. 汎用サポート: 任意の戻り値の型で動作します
  5. 同時実行制御: 並列実行を制限するための組み込みサポート

テスト

実装をテストする方法は次のとおりです:

func TestTaskContext(t *testing.T) {
    t.Run("handles parallel errors", func(t *testing.T) {
        ctx := NewTaskContext(context.Background())
        _, err := RunParallel(ctx,
            func() (int, error) { return 0, errors.New("error 1") },
            func() (int, error) { return 0, errors.New("error 2") },
        )
        assert.Error(t, err)
        assert.Contains(t, err.Error(), "error 1")
        assert.Contains(t, err.Error(), "error 2")
    })
}

結論

この TaskContext 実装は、Go で適切なエラー処理を行って同時タスクの実行を処理するための堅牢なソリューションを提供します。これは、次のような場合に特に役立ちます。

  • 複数のタスクを同時に実行します
  • すべてのタスクからエラーを収集します
  • 同時実行を制限する
  • スレッドの安全性を維持する
  • すべてのエラーを収集しながら最初のエラーを追跡します

完全なコードは GitHub で入手できます。

リソース

  • Go コンテキスト パッケージ
  • Go 同時実行パターン
  • Go でのエラー処理

Go での同時タスク実行の処理にはどのようなパターンを使用しますか?以下のコメント欄でご意見を共有してください!

  • https://x.com/mahadev_k_
  • https://in.linkedin.com/in/mahadev-k-934520223

以上がGo で堅牢なタス​​ク実行コンテキストを構築するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。