首頁  >  文章  >  後端開發  >  使用 errgroup 實作 Go 工作池,goroutines 卡住

使用 errgroup 實作 Go 工作池,goroutines 卡住

王林
王林轉載
2024-02-08 21:09:18984瀏覽

使用 errgroup 实现 Go 工作池,goroutines 卡住

php小編蘋果今天為大家介紹一個使用 errgroup 實作 Go 工作池的方法,解決了 goroutines 卡住的問題。在並發程式設計中,使用 goroutines 可以實現高效的並發處理,但當遇到某個 goroutine 發生錯誤或卡住時,會影響整個程式的執行。透過使用 errgroup 套件,我們可以優雅地管理 goroutines 的執行,並在其中發生錯誤時進行處理,保證程式的穩定性和可靠性。讓我們來看看具體的實現方式。

問題內容


我已經使用 errgroup 實作了工作池模式,以便可以捕獲任何 goroutine 中的錯誤。這是我的詳細資訊:

jobs := make(chan usersinfo, totalusers)
    results := make(chan usersinfo, totalusers)

    g, gctx := errgroup.withcontext(ctx)

    for i := 1; i <= 4; i++ {
        g.go(func() error {
            err := workeruser(gctx, jobs, results)
            if err != nil {
                return err
            }
            return nil
        })
    }

    for _, user := range usersresp {
        jobs <- user
    }
    close(jobs)

    var usersarray []usersinfo
    for  i := 1; i <= totalusers; i++ {
        r := <-results
        usersarray = append(usersarray, r)
    }

    if err := g.wait(); err != nil {
        return nil, err
    }

然後worker函數的實作是這樣的:

func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
  for {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case user, _ := <-jobs:
        userInfo, err := CallUserAPI(ctx, user)
        if err != nil {
            return err
        }
        results <- userInfo
    }
 }
}

calluserapi 傳回 403 forbidden 錯誤,該錯誤應呼叫 g.wait() 並應在出現非 nil 錯誤時立即停止所有 goroutine。但這裡的情況並非如此,g.wait() 永遠不會被呼叫。


解決方法


有幾個問題:

  • 循環

    for  i := 1; i <= totalusers; i++ {
          r := <-results
          usersarray = append(usersarray, r)
      }

    等待工作人員為每個使用者發送結果。當 calluserapi 傳回錯誤時,不會發生這種情況。

  • worker不處理jobs關閉的情況。

下面的程式碼可以解決這兩個問題:

宣告一個類型,指定要處理的使用者以及將結果放在何處:

type job struct {
    user usersinfo
    result *usersinfo
}

修改工作執行緒以使用這種新類型。另外,修改worker,使其在jobs關閉時退出。

func workeruser(ctx context.context, jobs <-chan job) error {
    for {
        select {
        case <-ctx.done():
            return ctx.err()
        case job, ok := <-jobs:
            if !ok {
                // no more jobs, exit.
                return nil
            }
            var err error
            *job.result, err = calluserapi(ctx, job.user)
            if err != nil {
                return err
            }
        }
    }
}

在主 goroutine 中將它們黏合在一起:

jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)

// Start the workers.
for i := 1; i <= 4; i++ {
    g.Go(func() error {
        return workerUser(gCtx, jobs)
    })
}

// Feed the workers.  
for i, user := range usersResp {
    jobs <- job{user: user, result: &usersArray[i]}
}

// Close the channel to indicate that no more jobs will be sent.
// The workers exit via the `if !ok { return nil }` statement.
close(jobs)

// Wait for the workers to complete.
if err := g.Wait(); err != nil {
    return nil, err
}

// It is now safe to access the results in usersArray.

以上是使用 errgroup 實作 Go 工作池,goroutines 卡住的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除