Maison >développement back-end >Golang >Créer un contexte d'exécution de tâches robuste dans Go

Créer un contexte d'exécution de tâches robuste dans Go

Susan Sarandon
Susan Sarandonoriginal
2025-01-01 01:02:10320parcourir

Building a Robust Task Execution Context in Go

Cela pourrait être ma dernière vision de la gestion des erreurs en go. Je pense que c'est aussi le meilleur. Nous savons que chaque instruction que nous exécutons est dans un contexte. Et le contexte peut contenir des erreurs. C'est à ce moment-là que j'ai pensé pourquoi ne pas simplement créer un wrapper au-dessus du contexte actuel. Ainsi, toute la tâche si elle est exécutée via un fn spécifique, nous pourrions éventuellement vérifier si le ctx a une erreur et si c'est le cas, ne pas exécuter sinon exécuter et collecter l'erreur. Cela pourrait devenir un anti-modèle, mais oui, en attendant, nous pouvons essayer de jouer.

Eh bien, le curseur avait peu de choses à ajouter ->

Le problème

Considérez ces défis courants lorsque vous traitez des tâches simultanées :

  1. Collecte des erreurs de plusieurs goroutines
  2. Maintenir la sécurité des threads
  3. Limiter les exécutions simultanées
  4. Préserver la première erreur tout en collectant toutes les erreurs
  5. Modèles de gestion des erreurs propres

La solution : TaskContext

Créons un TaskContext qui résout ces problèmes :

package taskctx

import (
    "context"
    "errors"
    "fmt"
    "sync"
)

type RunFn[T any] func() (T, error)

type TaskContext struct {
    context.Context
    mu       sync.RWMutex
    err      error
    multiErr []error
}

func NewTaskContext(parent context.Context) *TaskContext {
    if parent == nil {
        panic("cannot create context from nil parent")
    }
    return &TaskContext{Context: parent}
}

Principales fonctionnalités

1. Gestion des erreurs Thread-Safe

func (c *TaskContext) WithError(err error) *TaskContext {
    if err == nil {
        return c
    }

    c.mu.Lock()
    defer c.mu.Unlock()

    c.multiErr = append(c.multiErr, err)
    if c.err == nil {
        c.err = err
    } else {
        c.err = errors.Join(c.err, err)
    }
    return c
}

2. Exécution d'une tâche unique

func Run[T any](ctx *TaskContext, fn RunFn[T]) T {
    var zero T
    if err := ctx.Err(); err != nil {
        return zero
    }

    result, err := fn()
    if err != nil {
        ctx.WithError(err)
        return zero
    }
    return result
}

3. Exécution de tâches parallèles

func RunParallel[T any](ctx *TaskContext, fns ...func() (T, error)) ([]T, error) {
    if err := ctx.Err(); err != nil {
        return nil, err
    }

    results := make([]T, len(fns))
    var resultsMu sync.Mutex
    var wg sync.WaitGroup
    wg.Add(len(fns))

    for i, fn := range fns {
        i, fn := i, fn
        go func() {
            defer wg.Done()
            result, err := fn()
            if err != nil {
                ctx.AddError(fmt.Errorf("task %d: %w", i+1, err))
            } else {
                resultsMu.Lock()
                results[i] = result
                resultsMu.Unlock()
            }
        }()
    }

    wg.Wait()
    return results, ctx.Errors()
}

4. Concurrence contrôlée

func RunParallelWithLimit[T any](ctx *TaskContext, limit int, fns ...func() (T, error)) ([]T, error) {
    // ... similar to RunParallel but with semaphore ...
    sem := make(chan struct{}, limit)
    // ... implementation ...
}

Exemples d'utilisation

Exécution de tâches simple

func ExampleTaskContext_ShipmentProcessing() {
    ctx := goctx.NewTaskContext(context.Background())

    order := dummyOrder()
    shipment := dummyShipment()

    // Step 1: Validate address
    // Step 2: Calculate shipping cost
    // Step 3: Generate label
    _ = goctx.Run(ctx, validateAddress("123 Main St"))
    cost := goctx.Run(ctx, calculateShipping(order))
    trackingNum := goctx.Run(ctx, generateLabel(shipment.OrderID, cost))

    if ctx.Err() != nil {
        fmt.Printf("Error: %v\n", ctx.Err())
        return
    }

    shipment.Status = "READY"
    shipment.TrackingNum = trackingNum
    fmt.Printf("Shipment processed: %+v\n", shipment)

    // Output:
    // Shipment processed: {OrderID:ORD123 Status:READY TrackingNum:TRACK-ORD123-1234567890}
}

Exécution de tâches parallèles

func ExampleTaskContext_OrderProcessing() {
    ctx := goctx.NewTaskContext(context.Background())

    // Mock order
    order := []OrderItem{
        {ProductID: "LAPTOP", Quantity: 2},
        {ProductID: "MOUSE", Quantity: 3},
    }

    taskCtx := goctx.NewTaskContext(ctx)

    // Create inventory checks for each item
    inventoryChecks := goctx.Run[[]goctx.RunFn[bool]](taskCtx,
        func() ([]goctx.RunFn[bool], error) {
            return streams.NewTransformer[OrderItem, goctx.RunFn[bool]](order).
                Transform(streams.MapItSimple(checkInventory)).
                Result()
        })

    // Run inventory checks in parallel
    _, err := goctx.RunParallel(ctx, inventoryChecks...)
    fmt.Printf("Inventory check error: %v\n", err)

    // Output:
    // Inventory check error: task 1: insufficient inventory for LAPTOP
}

Avantages

  1. Thread Safety : Toutes les opérations sont protégées par des mutex
  2. Collection d'erreurs : conserve à la fois la première erreur et toutes les erreurs
  3. Intégration de contexte : fonctionne avec le package de contexte de Go
  4. Support générique : fonctionne avec n'importe quel type de retour
  5. Contrôle de la concurrence : prise en charge intégrée pour limiter les exécutions parallèles

Essai

Voici comment tester la mise en œuvre :

func TestTaskContext(t *testing.T) {
    t.Run("handles parallel errors", func(t *testing.T) {
        ctx := NewTaskContext(context.Background())
        _, err := RunParallel(ctx,
            func() (int, error) { return 0, errors.New("error 1") },
            func() (int, error) { return 0, errors.New("error 2") },
        )
        assert.Error(t, err)
        assert.Contains(t, err.Error(), "error 1")
        assert.Contains(t, err.Error(), "error 2")
    })
}

Conclusion

Cette implémentation de TaskContext fournit une solution robuste pour gérer l'exécution de tâches simultanées avec une gestion appropriée des erreurs dans Go. C'est particulièrement utile lorsque vous avez besoin de :

  • Exécuter plusieurs tâches simultanément
  • Collecter les erreurs de toutes les tâches
  • Limiter les exécutions simultanées
  • Maintenir la sécurité des threads
  • Gardez une trace de la première erreur tout en collectant toutes les erreurs

Le code complet est disponible sur GitHub.

Ressources

  • Pack Contexte Go
  • Allez aux modèles de concurrence
  • Gestion des erreurs dans Go

Quels modèles utilisez-vous pour gérer l'exécution de tâches simultanées dans Go ? Partagez vos réflexions dans les commentaires ci-dessous !

  • https://x.com/mahadev_k_
  • https://in.linkedin.com/in/mahadev-k-934520223

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn