Heim  >  Artikel  >  Backend-Entwicklung  >  Go Producer-Consumer vermeidet Deadlocks

Go Producer-Consumer vermeidet Deadlocks

王林
王林nach vorne
2024-02-05 22:48:111231Durchsuche

Go 生产者消费者避免死锁

Frageninhalt

Ich habe einen Code über Verbraucher und Hersteller. Obwohl ich diese Frage hier zur Codeüberprüfung gestellt habe und ein großer Teil der Idee aus diesem Thread abgeleitet wurde, ist hier der Code auf dem Spielplatz.

  • Bei diesem Code teilen sich mehrere Produzenten und Verbraucher denselben Kanal.
  • Dieser Code verfügt über einen Fehlerbehandlungsmechanismus. Wenn ein Arbeiter (Produzent oder Verbraucher) einen Fehler macht, sollten alle Arbeiter anhalten.

Ich mache mir Sorgen über ein Deadlock-Szenario, bei dem alle Verbraucher ausfallen, die Produzenten aber immer noch Daten zum gemeinsamen Kanal hinzufügen. Um dieses Problem zu „mildern“, habe ich eine Kontextprüfung hinzugefügt, bevor ich die Daten zur Datenwarteschlange hinzufüge – insbesondere Zeile 85 im Go-Playground.

Aber wenn der Produzent context.done() in Zeile 85 überprüft, ist es dann immer noch möglich, den Kontext zu blockieren und dann abzubrechen, was dazu führt, dass alle Konsumenten heruntergefahren werden und der Produzent dann versucht, Daten in die einzufügen Warteschlange?

Wenn ja, wie kann man es lindern?

Repost-Code:

package main

import (
    "context"
    "fmt"
    "sync"
)

func main() {
    a1 := []int{1, 2, 3, 4, 5}
    a2 := []int{5, 4, 3, 1, 1}
    a3 := []int{6, 7, 8, 9}
    a4 := []int{1, 2, 3, 4, 5}
    a5 := []int{5, 4, 3, 1, 1}
    a6 := []int{6, 7, 18, 9}
    arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}

    ctx, cancel := context.WithCancel(context.Background())
    ch1 := read(ctx, arrayOfArray)

    messageCh := make(chan int)
    errCh := make(chan error)

    producerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        producerWg.Add(1)
        producer(ctx, producerWg, ch1, messageCh, errCh)
    }

    consumerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        consumerWg.Add(1)
        consumer(ctx, consumerWg, messageCh, errCh)
    }

    firstError := handleAllErrors(ctx, cancel, errCh)

    producerWg.Wait()
    close(messageCh)

    consumerWg.Wait()
    close(errCh)

    fmt.Println(<-firstError)
}

func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
    ch := make(chan []int)

    go func() {
        defer close(ch)

        for i := 0; i < len(arrayOfArray); i++ {
            select {
            case <-ctx.Done():
                return
            case ch <- arrayOfArray[i]:
            }
        }
    }()

    return ch
}

func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
    go func() {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case arr, ok := <-in:
                if !ok {
                    return
                }

                for i := 0; i < len(arr); i++ {

                    // simulating an error.
                    //if arr[i] == 10 {
                    //  errCh <- fmt.Errorf("producer interrupted")
                    //}

                    select {
                    case <-ctx.Done():
                        return
                    case messageCh <- 2 * arr[i]:
                    }
                }
            }
        }
    }()
}

func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
    go func() {
        wg.Done()

        for {
            select {
            case <-ctx.Done():
                return
            case n, ok := <-messageCh:
                if !ok {
                    return
                }
                fmt.Println("consumed: ", n)

                // simulating erros
                //if n == 10 {
                //  errCh <- fmt.Errorf("output error during write")
                //}
            }
        }
    }()
}

func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
    firstErrCh := make(chan error, 1)
    isFirstError := true
    go func() {
        defer close(firstErrCh)
        for err := range errCh {
            select {
            case <-ctx.Done():
            default:
                cancel()
            }
            if isFirstError {
                firstErrCh <- err
                isFirstError = !isFirstError
            }
        }
    }()

    return firstErrCh
}

Richtige Antwort


Nein, Sie haben kein Problem, dies wird beim Schreiben des Produzenten nicht in einen Deadlock geraten, weil Sie das Schreiben des Kanals in die select-Anweisung einschließen, also selbst wenn das Schreiben des Kanals aufgrund des Verbrauchers beendet wird. Das kann nicht passieren, Sie Ich werde immer noch die Kontextaufhebungsklausel treffen und Ihren Produzenten kündigen.

Nur um das Konzept zu demonstrieren: Sie können es ausführen und sehen, dass es nicht zum Stillstand kommt, obwohl versucht wird, ohne Reader in den Kanal zu schreiben.

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    ch := make(chan struct{})

    go func() {
        time.Sleep(1 * time.Second)
        cancel()
    }()

    select {
    case ch <- struct{}{}:
    case <-ctx.Done():
        fmt.Println("context canceled")
    }
    fmt.Println("bye!")
}

Das ist sein Spielplatz-Link.

Über eine Vereinfachung des Codes. Wenn dies irgendeine Art von echtem Code wäre, würde ich wahrscheinlich einfach golang.org/x/sync/errgroup 中的 group 。或者从他们那里得到提示并利用 sync.once und verwenden, um alle Produzenten und Konsumenten mit einer Funktion zu umhüllen, die eine Goroutine erzeugt und Fehler verarbeiten kann, ohne einen komplexeren Fehlerkanal-Drain-Code verwenden zu müssen Fehlerverarbeitungsfunktion.

Das obige ist der detaillierte Inhalt vonGo Producer-Consumer vermeidet Deadlocks. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:stackoverflow.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen