Heim >Backend-Entwicklung >Golang >Durch die Verwendung von errgroup zur Implementierung des Go-Arbeitspools bleiben Goroutinen hängen

Durch die Verwendung von errgroup zur Implementierung des Go-Arbeitspools bleiben Goroutinen hängen

王林
王林nach vorne
2024-02-08 21:09:181047Durchsuche

使用 errgroup 实现 Go 工作池,goroutines 卡住

Der Herausgeber von PHP Apple stellt Ihnen heute eine Methode zur Verwendung von errgroup zur Implementierung des Go-Arbeitspools vor, die das Problem löst, dass Goroutinen stecken bleiben. Bei der gleichzeitigen Programmierung kann eine effiziente gleichzeitige Verarbeitung durch die Verwendung von Goroutinen erreicht werden. Wenn jedoch in einer bestimmten Goroutine ein Fehler auftritt oder hängen bleibt, wirkt sich dies auf die Ausführung des gesamten Programms aus. Durch die Verwendung des errgroup-Pakets können wir die Ausführung von Goroutinen elegant verwalten und Fehler behandeln, wenn Fehler auftreten, wodurch die Stabilität und Zuverlässigkeit des Programms gewährleistet wird. Werfen wir einen Blick darauf, wie dies umgesetzt wird.

Frageninhalt


Ich habe das Worker-Pool-Muster mithilfe von errgroup implementiert, damit Fehler in jeder Goroutine abgefangen werden können. Hier sind meine Daten:

jobs := make(chan usersinfo, totalusers)
    results := make(chan usersinfo, totalusers)

    g, gctx := errgroup.withcontext(ctx)

    for i := 1; i <= 4; i++ {
        g.go(func() error {
            err := workeruser(gctx, jobs, results)
            if err != nil {
                return err
            }
            return nil
        })
    }

    for _, user := range usersresp {
        jobs <- user
    }
    close(jobs)

    var usersarray []usersinfo
    for  i := 1; i <= totalusers; i++ {
        r := <-results
        usersarray = append(usersarray, r)
    }

    if err := g.wait(); err != nil {
        return nil, err
    }

Dann ist die Implementierung der Worker-Funktion wie folgt:

func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
  for {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case user, _ := <-jobs:
        userInfo, err := CallUserAPI(ctx, user)
        if err != nil {
            return err
        }
        results <- userInfo
    }
 }
}

calluserapi gibt einen 403 verbotenen Fehler zurück, der g.wait() aufrufen und bei einem Fehler ungleich Null alle Goroutinen sofort stoppen sollte. Aber das ist hier nicht der Fall, g.wait() wird nie aufgerufen.


Lösung


Es gibt mehrere Probleme:

  • Schleife

    for  i := 1; i <= totalusers; i++ {
          r := <-results
          usersarray = append(usersarray, r)
      }

    Warten Sie, bis die Mitarbeiter Ergebnisse für jeden Benutzer gesendet haben. Dies passiert nicht, wenn calluserapi einen Fehler zurückgibt.

  • Arbeiter bewältigt keine jobsgeschlossenen Situationen.

Der folgende Code kann diese beiden Probleme lösen:

Deklarieren Sie einen Typ, geben Sie an, welche Benutzer verarbeitet werden sollen und wo die Ergebnisse abgelegt werden sollen:

type job struct {
    user usersinfo
    result *usersinfo
}

Ändern Sie Arbeitsthreads, um diesen neuen Typ zu verwenden. Ändern Sie außerdem den Worker so, dass er beendet wird, wenn jobs geschlossen wird.

func workeruser(ctx context.context, jobs <-chan job) error {
    for {
        select {
        case <-ctx.done():
            return ctx.err()
        case job, ok := <-jobs:
            if !ok {
                // no more jobs, exit.
                return nil
            }
            var err error
            *job.result, err = calluserapi(ctx, job.user)
            if err != nil {
                return err
            }
        }
    }
}

Kleben Sie sie in der Haupt-Goroutine zusammen:

jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)

// Start the workers.
for i := 1; i <= 4; i++ {
    g.Go(func() error {
        return workerUser(gCtx, jobs)
    })
}

// Feed the workers.  
for i, user := range usersResp {
    jobs <- job{user: user, result: &usersArray[i]}
}

// Close the channel to indicate that no more jobs will be sent.
// The workers exit via the `if !ok { return nil }` statement.
close(jobs)

// Wait for the workers to complete.
if err := g.Wait(); err != nil {
    return nil, err
}

// It is now safe to access the results in usersArray.

Das obige ist der detaillierte Inhalt vonDurch die Verwendung von errgroup zur Implementierung des Go-Arbeitspools bleiben Goroutinen hängen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:stackoverflow.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen