Maison > Article > développement back-end > Aller producteur-consommateur évite les impasses
J'ai un code sur le consommateur et le producteur. Bien que j'ai posé cette question ici pour la révision du code et qu'une grande partie de l'idée soit dérivée de ce fil, voici le code dans le terrain de jeu.
Je m'inquiète d'un scénario d'impasse dans lequel tous les consommateurs sont en panne mais les producteurs continuent d'ajouter des données au canal partagé. Pour "atténuer" ce problème, j'ai ajouté une vérification du contexte avant d'ajouter les données à la file d'attente de données - en particulier la ligne 85 dans le terrain de jeu Go.
Mais si le producteur vérifie context.done() à la ligne 85, est-il toujours possible de bloquer puis d'annuler le contexte, provoquant l'arrêt de tous les consommateurs, puis le producteur essaie d'insérer des données dans le file d'attente?
Si oui, comment y remédier.
Code de republication :
package main import ( "context" "fmt" "sync" ) func main() { a1 := []int{1, 2, 3, 4, 5} a2 := []int{5, 4, 3, 1, 1} a3 := []int{6, 7, 8, 9} a4 := []int{1, 2, 3, 4, 5} a5 := []int{5, 4, 3, 1, 1} a6 := []int{6, 7, 18, 9} arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6} ctx, cancel := context.WithCancel(context.Background()) ch1 := read(ctx, arrayOfArray) messageCh := make(chan int) errCh := make(chan error) producerWg := &sync.WaitGroup{} for i := 0; i < 3; i++ { producerWg.Add(1) producer(ctx, producerWg, ch1, messageCh, errCh) } consumerWg := &sync.WaitGroup{} for i := 0; i < 3; i++ { consumerWg.Add(1) consumer(ctx, consumerWg, messageCh, errCh) } firstError := handleAllErrors(ctx, cancel, errCh) producerWg.Wait() close(messageCh) consumerWg.Wait() close(errCh) fmt.Println(<-firstError) } func read(ctx context.Context, arrayOfArray [][]int) <-chan []int { ch := make(chan []int) go func() { defer close(ch) for i := 0; i < len(arrayOfArray); i++ { select { case <-ctx.Done(): return case ch <- arrayOfArray[i]: } } }() return ch } func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) { go func() { defer wg.Done() for { select { case <-ctx.Done(): return case arr, ok := <-in: if !ok { return } for i := 0; i < len(arr); i++ { // simulating an error. //if arr[i] == 10 { // errCh <- fmt.Errorf("producer interrupted") //} select { case <-ctx.Done(): return case messageCh <- 2 * arr[i]: } } } } }() } func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) { go func() { wg.Done() for { select { case <-ctx.Done(): return case n, ok := <-messageCh: if !ok { return } fmt.Println("consumed: ", n) // simulating erros //if n == 10 { // errCh <- fmt.Errorf("output error during write") //} } } }() } func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error { firstErrCh := make(chan error, 1) isFirstError := true go func() { defer close(firstErrCh) for err := range errCh { select { case <-ctx.Done(): default: cancel() } if isFirstError { firstErrCh <- err isFirstError = !isFirstError } } }() return firstErrCh }
Non, vous n'avez pas de problème, cela ne se retrouvera pas dans une impasse lors de l'écriture du producteur car vous enveloppez l'écriture du canal dans l'instruction select
donc même si l'écriture du canal est terminée en raison du consommateur Bien que cela ne puisse pas arriver, vous Je frapperai toujours la clause d'annulation du contexte et mettrai fin à votre producteur.
Juste pour démontrer le concept, vous pouvez l'exécuter et voir qu'il ne se bloque pas, bien qu'il essaie d'écrire sur la chaîne sans lecteur.
package main import ( "context" "fmt" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) go func() { time.Sleep(1 * time.Second) cancel() }() select { case ch <- struct{}{}: case <-ctx.Done(): fmt.Println("context canceled") } fmt.Println("bye!") }
C'est son lien vers le terrain de jeu.
À propos d'une certaine simplification du code. S'il s'agissait d'un code réel, j'utiliserais probablement simplement golang.org/x/sync/errgroup
中的 group
。或者从他们那里得到提示并利用 sync.once
et pour envelopper tous les producteurs et consommateurs avec une fonction qui génère une goroutine et peut gérer les erreurs sans avoir à utiliser un code de drain de canal d'erreur plus complexe dans le fonction de traitement des erreurs.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!