Maison >développement back-end >Golang >Aller producteur-consommateur évite les impasses

Aller producteur-consommateur évite les impasses

王林
王林avant
2024-02-05 22:48:111260parcourir

Go 生产者消费者避免死锁

Contenu de la question

J'ai un code sur le consommateur et le producteur. Bien que j'ai posé cette question ici pour la révision du code et qu'une grande partie de l'idée soit dérivée de ce fil, voici le code dans le terrain de jeu.

  • Ce code a plusieurs producteurs et consommateurs partageant le même canal.
  • Ce code dispose d'un mécanisme de gestion des erreurs. En cas d'erreur d'un travailleur (producteur ou consommateur), tous les travailleurs doivent s'arrêter.

Je m'inquiète d'un scénario d'impasse dans lequel tous les consommateurs sont en panne mais les producteurs continuent d'ajouter des données au canal partagé. Pour "atténuer" ce problème, j'ai ajouté une vérification du contexte avant d'ajouter les données à la file d'attente de données - en particulier la ligne 85 dans le terrain de jeu Go.

Mais si le producteur vérifie context.done() à la ligne 85, est-il toujours possible de bloquer puis d'annuler le contexte, provoquant l'arrêt de tous les consommateurs, puis le producteur essaie d'insérer des données dans le file d'attente?

Si oui, comment y remédier.

Code de republication :

package main

import (
    "context"
    "fmt"
    "sync"
)

func main() {
    a1 := []int{1, 2, 3, 4, 5}
    a2 := []int{5, 4, 3, 1, 1}
    a3 := []int{6, 7, 8, 9}
    a4 := []int{1, 2, 3, 4, 5}
    a5 := []int{5, 4, 3, 1, 1}
    a6 := []int{6, 7, 18, 9}
    arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}

    ctx, cancel := context.WithCancel(context.Background())
    ch1 := read(ctx, arrayOfArray)

    messageCh := make(chan int)
    errCh := make(chan error)

    producerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        producerWg.Add(1)
        producer(ctx, producerWg, ch1, messageCh, errCh)
    }

    consumerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        consumerWg.Add(1)
        consumer(ctx, consumerWg, messageCh, errCh)
    }

    firstError := handleAllErrors(ctx, cancel, errCh)

    producerWg.Wait()
    close(messageCh)

    consumerWg.Wait()
    close(errCh)

    fmt.Println(<-firstError)
}

func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
    ch := make(chan []int)

    go func() {
        defer close(ch)

        for i := 0; i < len(arrayOfArray); i++ {
            select {
            case <-ctx.Done():
                return
            case ch <- arrayOfArray[i]:
            }
        }
    }()

    return ch
}

func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
    go func() {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case arr, ok := <-in:
                if !ok {
                    return
                }

                for i := 0; i < len(arr); i++ {

                    // simulating an error.
                    //if arr[i] == 10 {
                    //  errCh <- fmt.Errorf("producer interrupted")
                    //}

                    select {
                    case <-ctx.Done():
                        return
                    case messageCh <- 2 * arr[i]:
                    }
                }
            }
        }
    }()
}

func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
    go func() {
        wg.Done()

        for {
            select {
            case <-ctx.Done():
                return
            case n, ok := <-messageCh:
                if !ok {
                    return
                }
                fmt.Println("consumed: ", n)

                // simulating erros
                //if n == 10 {
                //  errCh <- fmt.Errorf("output error during write")
                //}
            }
        }
    }()
}

func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
    firstErrCh := make(chan error, 1)
    isFirstError := true
    go func() {
        defer close(firstErrCh)
        for err := range errCh {
            select {
            case <-ctx.Done():
            default:
                cancel()
            }
            if isFirstError {
                firstErrCh <- err
                isFirstError = !isFirstError
            }
        }
    }()

    return firstErrCh
}

Réponse correcte


Non, vous n'avez pas de problème, cela ne se retrouvera pas dans une impasse lors de l'écriture du producteur car vous enveloppez l'écriture du canal dans l'instruction select donc même si l'écriture du canal est terminée en raison du consommateur Bien que cela ne puisse pas arriver, vous Je frapperai toujours la clause d'annulation du contexte et mettrai fin à votre producteur.

Juste pour démontrer le concept, vous pouvez l'exécuter et voir qu'il ne se bloque pas, bien qu'il essaie d'écrire sur la chaîne sans lecteur.

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    ch := make(chan struct{})

    go func() {
        time.Sleep(1 * time.Second)
        cancel()
    }()

    select {
    case ch <- struct{}{}:
    case <-ctx.Done():
        fmt.Println("context canceled")
    }
    fmt.Println("bye!")
}

C'est son lien vers le terrain de jeu.

À propos d'une certaine simplification du code. S'il s'agissait d'un code réel, j'utiliserais probablement simplement golang.org/x/sync/errgroup 中的 group 。或者从他们那里得到提示并利用 sync.once et pour envelopper tous les producteurs et consommateurs avec une fonction qui génère une goroutine et peut gérer les erreurs sans avoir à utiliser un code de drain de canal d'erreur plus complexe dans le fonction de traitement des erreurs.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer