Maison >développement back-end >Golang >En utilisant errgroup pour implémenter le pool de travail Go, les goroutines restent bloquées

En utilisant errgroup pour implémenter le pool de travail Go, les goroutines restent bloquées

王林
王林avant
2024-02-08 21:09:181025parcourir

使用 errgroup 实现 Go 工作池,goroutines 卡住

L'éditeur de PHP Apple vous présente aujourd'hui une méthode d'utilisation d'errgroup pour implémenter le pool de travail Go, qui résout le problème du blocage des goroutines. En programmation simultanée, un traitement simultané efficace peut être obtenu en utilisant des goroutines, mais lorsqu'une erreur ou un blocage se produit dans une certaine goroutine, cela affectera l'exécution de l'ensemble du programme. En utilisant le package errgroup, nous pouvons gérer avec élégance l'exécution des goroutines et gérer les erreurs lorsque des erreurs se produisent, garantissant ainsi la stabilité et la fiabilité du programme. Voyons comment cela est mis en œuvre.

Contenu de la question


J'ai implémenté le modèle de pool de travailleurs en utilisant errgroup afin que les erreurs dans n'importe quelle goroutine puissent être détectées. Voici mes coordonnées :

jobs := make(chan usersinfo, totalusers)
    results := make(chan usersinfo, totalusers)

    g, gctx := errgroup.withcontext(ctx)

    for i := 1; i <= 4; i++ {
        g.go(func() error {
            err := workeruser(gctx, jobs, results)
            if err != nil {
                return err
            }
            return nil
        })
    }

    for _, user := range usersresp {
        jobs <- user
    }
    close(jobs)

    var usersarray []usersinfo
    for  i := 1; i <= totalusers; i++ {
        r := <-results
        usersarray = append(usersarray, r)
    }

    if err := g.wait(); err != nil {
        return nil, err
    }

Ensuite, la mise en œuvre de la fonction travailleur est la suivante :

func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
  for {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case user, _ := <-jobs:
        userInfo, err := CallUserAPI(ctx, user)
        if err != nil {
            return err
        }
        results <- userInfo
    }
 }
}

calluserapi renvoie une erreur interdite 403, qui devrait appeler g.wait() et devrait arrêter toutes les goroutines immédiatement en cas d'erreur non nulle. Mais ce n'est pas le cas ici, g.wait() n'est jamais appelé.


Solution


Il y a plusieurs problèmes :

  • Boucle

    for  i := 1; i <= totalusers; i++ {
          r := <-results
          usersarray = append(usersarray, r)
      }

    En attente que le personnel envoie les résultats pour chaque utilisateur. Cela ne se produit pas lorsque calluserapi renvoie une erreur.

  • le travailleur ne gère pas les jobssituations fermées.

Le code suivant peut résoudre ces deux problèmes :

Déclarez un type, précisez quels utilisateurs traiter et où mettre les résultats :

type job struct {
    user usersinfo
    result *usersinfo
}

Modifiez les threads de travail pour utiliser ce nouveau type. De plus, modifiez le travailleur pour qu'il se ferme lorsque jobs est fermé.

func workeruser(ctx context.context, jobs <-chan job) error {
    for {
        select {
        case <-ctx.done():
            return ctx.err()
        case job, ok := <-jobs:
            if !ok {
                // no more jobs, exit.
                return nil
            }
            var err error
            *job.result, err = calluserapi(ctx, job.user)
            if err != nil {
                return err
            }
        }
    }
}

Collez-les ensemble dans la goroutine principale :

jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)

// Start the workers.
for i := 1; i <= 4; i++ {
    g.Go(func() error {
        return workerUser(gCtx, jobs)
    })
}

// Feed the workers.  
for i, user := range usersResp {
    jobs <- job{user: user, result: &usersArray[i]}
}

// Close the channel to indicate that no more jobs will be sent.
// The workers exit via the `if !ok { return nil }` statement.
close(jobs)

// Wait for the workers to complete.
if err := g.Wait(); err != nil {
    return nil, err
}

// It is now safe to access the results in usersArray.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer