php小编苹果今天为大家介绍一个使用 errgroup 实现 Go 工作池的方法,解决了 goroutines 卡住的问题。在并发编程中,使用 goroutines 可以实现高效的并发处理,但当遇到某个 goroutine 发生错误或卡住时,会影响整个程序的执行。通过使用 errgroup 包,我们可以优雅地管理 goroutines 的执行,并在其中发生错误时进行处理,保证程序的稳定性和可靠性。让我们来看看具体的实现方式。
我已经使用 errgroup 实现了工作池模式,以便可以捕获任何 goroutine 中的错误。这是我的详细信息:
jobs := make(chan usersinfo, totalusers) results := make(chan usersinfo, totalusers) g, gctx := errgroup.withcontext(ctx) for i := 1; i <= 4; i++ { g.go(func() error { err := workeruser(gctx, jobs, results) if err != nil { return err } return nil }) } for _, user := range usersresp { jobs <- user } close(jobs) var usersarray []usersinfo for i := 1; i <= totalusers; i++ { r := <-results usersarray = append(usersarray, r) } if err := g.wait(); err != nil { return nil, err }
然后worker函数的实现是这样的:
func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error { for { select { case <-ctx.Done(): return ctx.Err() case user, _ := <-jobs: userInfo, err := CallUserAPI(ctx, user) if err != nil { return err } results <- userInfo } } }
calluserapi 返回 403 forbidden 错误,该错误应调用 g.wait() 并应在出现非 nil 错误时立即停止所有 goroutine。但这里的情况并非如此,g.wait() 永远不会被调用。
有几个问题:
循环
for i := 1; i <= totalusers; i++ { r := <-results usersarray = append(usersarray, r) }
等待工作人员为每个用户发送结果。当 calluserapi
返回错误时,不会发生这种情况。
worker不处理jobs
关闭的情况。
下面的代码可以解决这两个问题:
声明一个类型,指定要处理的用户以及将结果放在何处:
type job struct { user usersinfo result *usersinfo }
修改工作线程以使用这种新类型。另外,修改worker,使其在jobs
关闭时退出。
func workeruser(ctx context.context, jobs <-chan job) error { for { select { case <-ctx.done(): return ctx.err() case job, ok := <-jobs: if !ok { // no more jobs, exit. return nil } var err error *job.result, err = calluserapi(ctx, job.user) if err != nil { return err } } } }
在主 goroutine 中将它们粘合在一起:
jobs := make(chan UsersInfo, totalUsers) usersArray := make([]UsersInfo, totalUsers) g, gCtx := errgroup.WithContext(ctx) // Start the workers. for i := 1; i <= 4; i++ { g.Go(func() error { return workerUser(gCtx, jobs) }) } // Feed the workers. for i, user := range usersResp { jobs <- job{user: user, result: &usersArray[i]} } // Close the channel to indicate that no more jobs will be sent. // The workers exit via the `if !ok { return nil }` statement. close(jobs) // Wait for the workers to complete. if err := g.Wait(); err != nil { return nil, err } // It is now safe to access the results in usersArray.
以上是使用 errgroup 实现 Go 工作池,goroutines 卡住的详细内容。更多信息请关注PHP中文网其他相关文章!