Home  >  Article  >  Backend Development  >  Using errgroup to implement Go work pool, goroutines get stuck

Using errgroup to implement Go work pool, goroutines get stuck

王林
王林forward
2024-02-08 21:09:18984browse

使用 errgroup 实现 Go 工作池,goroutines 卡住

php editor Apple today introduces you to a method of using errgroup to implement Go work pool, which solves the problem of goroutines getting stuck. In concurrent programming, efficient concurrent processing can be achieved by using goroutines, but when an error or stuck occurs in a certain goroutine, it will affect the execution of the entire program. By using the errgroup package, we can elegantly manage the execution of goroutines and handle errors when errors occur, ensuring the stability and reliability of the program. Let's take a look at how this is implemented.

Question content


I have implemented the worker pool pattern using errgroup so that errors in any goroutine can be caught. Here are my details:

jobs := make(chan usersinfo, totalusers)
    results := make(chan usersinfo, totalusers)

    g, gctx := errgroup.withcontext(ctx)

    for i := 1; i <= 4; i++ {
        g.go(func() error {
            err := workeruser(gctx, jobs, results)
            if err != nil {
                return err
            }
            return nil
        })
    }

    for _, user := range usersresp {
        jobs <- user
    }
    close(jobs)

    var usersarray []usersinfo
    for  i := 1; i <= totalusers; i++ {
        r := <-results
        usersarray = append(usersarray, r)
    }

    if err := g.wait(); err != nil {
        return nil, err
    }

Then the implementation of the worker function is as follows:

func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
  for {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case user, _ := <-jobs:
        userInfo, err := CallUserAPI(ctx, user)
        if err != nil {
            return err
        }
        results <- userInfo
    }
 }
}

calluserapi returns a 403 forbidden error, which should call g.wait() and should stop all goroutines immediately on a non-nil error. But that's not the case here, g.wait() is never called.


Solution


There are several problems:

  • cycle

    for  i := 1; i <= totalusers; i++ {
          r := <-results
          usersarray = append(usersarray, r)
      }

    Wait for workers to send results for each user. This does not happen when calluserapi returns an error.

  • worker does not handle jobs shutdown situations.

The following code can solve these two problems:

Declare a type, specifying which users to process and where to place the results:

type job struct {
    user usersinfo
    result *usersinfo
}

Modify worker threads to use this new type. Additionally, modify the worker so that it exits when jobs is closed.

func workeruser(ctx context.context, jobs <-chan job) error {
    for {
        select {
        case <-ctx.done():
            return ctx.err()
        case job, ok := <-jobs:
            if !ok {
                // no more jobs, exit.
                return nil
            }
            var err error
            *job.result, err = calluserapi(ctx, job.user)
            if err != nil {
                return err
            }
        }
    }
}

Glue them together in the main goroutine:

jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)

// Start the workers.
for i := 1; i <= 4; i++ {
    g.Go(func() error {
        return workerUser(gCtx, jobs)
    })
}

// Feed the workers.  
for i, user := range usersResp {
    jobs <- job{user: user, result: &usersArray[i]}
}

// Close the channel to indicate that no more jobs will be sent.
// The workers exit via the `if !ok { return nil }` statement.
close(jobs)

// Wait for the workers to complete.
if err := g.Wait(); err != nil {
    return nil, err
}

// It is now safe to access the results in usersArray.

The above is the detailed content of Using errgroup to implement Go work pool, goroutines get stuck. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:stackoverflow.com. If there is any infringement, please contact admin@php.cn delete