ホームページ  >  記事  >  バックエンド開発  >  Go プロデューサーとコンシューマーがデッドロックを回避する

Go プロデューサーとコンシューマーがデッドロックを回避する

王林
王林転載
2024-02-05 22:48:111167ブラウズ

Go 生产者消费者避免死锁

#質問内容

コンシューマーとプロデューサーに関するコードがあります。コードレビューのためにこの質問をここで行い、アイデアの大部分はこのスレッドから得られましたが、プレイグラウンドのコードは次のとおりです。

    このコードには、同じチャネルを共有する複数のプロデューサーとコンシューマーが含まれています。
  • このコードにはエラー処理メカニズムがあり、ワーカー (プロデューサーまたはコンシューマー) がエラーを起こした場合、すべてのワーカーが停止する必要があります。
すべてのコンシューマーがダウンしているにもかかわらず、プロデューサーが共有チャネルにデータを追加しているデッドロック シナリオが心配です。この問題を「軽減」するために、データをデータキューに追加する前に、特に go プレイグラウンドの 85 行目にコンテキスト チェックを追加しました。

ただし、

プロデューサ が 85 行目で context.done() をチェックしてからコンテキストをキャンセルし、すべての コンシューマ が閉じてから した場合には、依然としてデッドロックが発生する可能性があります。プロデューサーデータをキューに挿入しようとしていますか?

そうであれば、それを軽減する方法。

コードを再投稿:

package main

import (
    "context"
    "fmt"
    "sync"
)

func main() {
    a1 := []int{1, 2, 3, 4, 5}
    a2 := []int{5, 4, 3, 1, 1}
    a3 := []int{6, 7, 8, 9}
    a4 := []int{1, 2, 3, 4, 5}
    a5 := []int{5, 4, 3, 1, 1}
    a6 := []int{6, 7, 18, 9}
    arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}

    ctx, cancel := context.WithCancel(context.Background())
    ch1 := read(ctx, arrayOfArray)

    messageCh := make(chan int)
    errCh := make(chan error)

    producerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        producerWg.Add(1)
        producer(ctx, producerWg, ch1, messageCh, errCh)
    }

    consumerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        consumerWg.Add(1)
        consumer(ctx, consumerWg, messageCh, errCh)
    }

    firstError := handleAllErrors(ctx, cancel, errCh)

    producerWg.Wait()
    close(messageCh)

    consumerWg.Wait()
    close(errCh)

    fmt.Println(<-firstError)
}

func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
    ch := make(chan []int)

    go func() {
        defer close(ch)

        for i := 0; i < len(arrayOfArray); i++ {
            select {
            case <-ctx.Done():
                return
            case ch <- arrayOfArray[i]:
            }
        }
    }()

    return ch
}

func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
    go func() {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case arr, ok := <-in:
                if !ok {
                    return
                }

                for i := 0; i < len(arr); i++ {

                    // simulating an error.
                    //if arr[i] == 10 {
                    //  errCh <- fmt.Errorf("producer interrupted")
                    //}

                    select {
                    case <-ctx.Done():
                        return
                    case messageCh <- 2 * arr[i]:
                    }
                }
            }
        }
    }()
}

func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
    go func() {
        wg.Done()

        for {
            select {
            case <-ctx.Done():
                return
            case n, ok := <-messageCh:
                if !ok {
                    return
                }
                fmt.Println("consumed: ", n)

                // simulating erros
                //if n == 10 {
                //  errCh <- fmt.Errorf("output error during write")
                //}
            }
        }
    }()
}

func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
    firstErrCh := make(chan error, 1)
    isFirstError := true
    go func() {
        defer close(firstErrCh)
        for err := range errCh {
            select {
            case <-ctx.Done():
            default:
                cancel()
            }
            if isFirstError {
                firstErrCh <- err
                isFirstError = !isFirstError
            }
        }
    }()

    return firstErrCh
}


正解


いいえ、大丈夫です。チャネルの書き込みを

select ステートメントでラップしているため、プロデューサーの書き込みでデッドロックすることはありません。コンシューマーが終了したためにチャネル書き込みが実行できない場合でも、コンテキストキャンセル句にヒットしてプロデューサーを終了します。

概念を示すために、これを実行して、リーダーなしでチャネルに書き込もうとしているにもかかわらず、デッドロックしていないことを確認できます。

リーリー

これは

その遊び場のリンクです

コードの簡略化について。これが現実のコードであれば、おそらく

golang.org/x/sync/errgroupgroup を使用するでしょう。または、それらからヒントを得て、sync.once すべてのプロデューサーとコンシューマー をゴルーチンを生成し、エラー内でさらに使用せずにエラーを処理できる関数でラップします。 複雑なエラー チャネルの排気コード処理関数。

以上がGo プロデューサーとコンシューマーがデッドロックを回避するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はstackoverflow.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。