Maison >développement back-end >Golang >En utilisant errgroup pour implémenter le pool de travail Go, les goroutines restent bloquées
L'éditeur de PHP Apple vous présente aujourd'hui une méthode d'utilisation d'errgroup pour implémenter le pool de travail Go, qui résout le problème du blocage des goroutines. En programmation simultanée, un traitement simultané efficace peut être obtenu en utilisant des goroutines, mais lorsqu'une erreur ou un blocage se produit dans une certaine goroutine, cela affectera l'exécution de l'ensemble du programme. En utilisant le package errgroup, nous pouvons gérer avec élégance l'exécution des goroutines et gérer les erreurs lorsque des erreurs se produisent, garantissant ainsi la stabilité et la fiabilité du programme. Voyons comment cela est mis en œuvre.
J'ai implémenté le modèle de pool de travailleurs en utilisant errgroup afin que les erreurs dans n'importe quelle goroutine puissent être détectées. Voici mes coordonnées :
jobs := make(chan usersinfo, totalusers) results := make(chan usersinfo, totalusers) g, gctx := errgroup.withcontext(ctx) for i := 1; i <= 4; i++ { g.go(func() error { err := workeruser(gctx, jobs, results) if err != nil { return err } return nil }) } for _, user := range usersresp { jobs <- user } close(jobs) var usersarray []usersinfo for i := 1; i <= totalusers; i++ { r := <-results usersarray = append(usersarray, r) } if err := g.wait(); err != nil { return nil, err }
Ensuite, la mise en œuvre de la fonction travailleur est la suivante :
func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error { for { select { case <-ctx.Done(): return ctx.Err() case user, _ := <-jobs: userInfo, err := CallUserAPI(ctx, user) if err != nil { return err } results <- userInfo } } }
calluserapi renvoie une erreur interdite 403, qui devrait appeler g.wait() et devrait arrêter toutes les goroutines immédiatement en cas d'erreur non nulle. Mais ce n'est pas le cas ici, g.wait() n'est jamais appelé.
Il y a plusieurs problèmes :
Boucle
for i := 1; i <= totalusers; i++ { r := <-results usersarray = append(usersarray, r) }
En attente que le personnel envoie les résultats pour chaque utilisateur. Cela ne se produit pas lorsque calluserapi
renvoie une erreur.
le travailleur ne gère pas les jobs
situations fermées.
Le code suivant peut résoudre ces deux problèmes :
Déclarez un type, précisez quels utilisateurs traiter et où mettre les résultats :
type job struct { user usersinfo result *usersinfo }
Modifiez les threads de travail pour utiliser ce nouveau type. De plus, modifiez le travailleur pour qu'il se ferme lorsque jobs
est fermé.
func workeruser(ctx context.context, jobs <-chan job) error { for { select { case <-ctx.done(): return ctx.err() case job, ok := <-jobs: if !ok { // no more jobs, exit. return nil } var err error *job.result, err = calluserapi(ctx, job.user) if err != nil { return err } } } }
Collez-les ensemble dans la goroutine principale :
jobs := make(chan UsersInfo, totalUsers) usersArray := make([]UsersInfo, totalUsers) g, gCtx := errgroup.WithContext(ctx) // Start the workers. for i := 1; i <= 4; i++ { g.Go(func() error { return workerUser(gCtx, jobs) }) } // Feed the workers. for i, user := range usersResp { jobs <- job{user: user, result: &usersArray[i]} } // Close the channel to indicate that no more jobs will be sent. // The workers exit via the `if !ok { return nil }` statement. close(jobs) // Wait for the workers to complete. if err := g.Wait(); err != nil { return nil, err } // It is now safe to access the results in usersArray.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!