php小編蘋果今天為大家介紹一個使用 errgroup 實作 Go 工作池的方法,解決了 goroutines 卡住的問題。在並發程式設計中,使用 goroutines 可以實現高效的並發處理,但當遇到某個 goroutine 發生錯誤或卡住時,會影響整個程式的執行。透過使用 errgroup 套件,我們可以優雅地管理 goroutines 的執行,並在其中發生錯誤時進行處理,保證程式的穩定性和可靠性。讓我們來看看具體的實現方式。
我已經使用 errgroup 實作了工作池模式,以便可以捕獲任何 goroutine 中的錯誤。這是我的詳細資訊:
jobs := make(chan usersinfo, totalusers) results := make(chan usersinfo, totalusers) g, gctx := errgroup.withcontext(ctx) for i := 1; i <= 4; i++ { g.go(func() error { err := workeruser(gctx, jobs, results) if err != nil { return err } return nil }) } for _, user := range usersresp { jobs <- user } close(jobs) var usersarray []usersinfo for i := 1; i <= totalusers; i++ { r := <-results usersarray = append(usersarray, r) } if err := g.wait(); err != nil { return nil, err }
然後worker函數的實作是這樣的:
func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error { for { select { case <-ctx.Done(): return ctx.Err() case user, _ := <-jobs: userInfo, err := CallUserAPI(ctx, user) if err != nil { return err } results <- userInfo } } }
calluserapi 傳回 403 forbidden 錯誤,該錯誤應呼叫 g.wait() 並應在出現非 nil 錯誤時立即停止所有 goroutine。但這裡的情況並非如此,g.wait() 永遠不會被呼叫。
有幾個問題:
循環
for i := 1; i <= totalusers; i++ { r := <-results usersarray = append(usersarray, r) }
等待工作人員為每個使用者發送結果。當 calluserapi
傳回錯誤時,不會發生這種情況。
worker不處理jobs
關閉的情況。
下面的程式碼可以解決這兩個問題:
宣告一個類型,指定要處理的使用者以及將結果放在何處:
type job struct { user usersinfo result *usersinfo }
修改工作執行緒以使用這種新類型。另外,修改worker,使其在jobs
關閉時退出。
func workeruser(ctx context.context, jobs <-chan job) error { for { select { case <-ctx.done(): return ctx.err() case job, ok := <-jobs: if !ok { // no more jobs, exit. return nil } var err error *job.result, err = calluserapi(ctx, job.user) if err != nil { return err } } } }
在主 goroutine 中將它們黏合在一起:
jobs := make(chan UsersInfo, totalUsers) usersArray := make([]UsersInfo, totalUsers) g, gCtx := errgroup.WithContext(ctx) // Start the workers. for i := 1; i <= 4; i++ { g.Go(func() error { return workerUser(gCtx, jobs) }) } // Feed the workers. for i, user := range usersResp { jobs <- job{user: user, result: &usersArray[i]} } // Close the channel to indicate that no more jobs will be sent. // The workers exit via the `if !ok { return nil }` statement. close(jobs) // Wait for the workers to complete. if err := g.Wait(); err != nil { return nil, err } // It is now safe to access the results in usersArray.
以上是使用 errgroup 實作 Go 工作池,goroutines 卡住的詳細內容。更多資訊請關注PHP中文網其他相關文章!