首頁  >  文章  >  後端開發  >  Go 生產者消費者避免死鎖

Go 生產者消費者避免死鎖

王林
王林轉載
2024-02-05 22:48:111227瀏覽

Go 生产者消费者避免死锁

問題內容

我有一個關於消費者和生產者的程式碼。雖然我在這裡提出了這個問題以進行程式碼審查,而這個想法的很大一部分是從這個線程中衍生出來的,這裡是操場上的程式碼。

  • 此程式碼有多個生產者和消費者共用相同通道。
  • 此程式碼具有錯誤處理機制,如果任何工作人員(生產者或消費者)出錯,則所有工作人員都應停止。

我擔心死鎖場景,即所有消費者都關閉,但生產者仍在向共享通道添加資料。為了「緩解」這個問題,我在將資料新增至資料佇列之前新增了上下文檢查 - 特別是 go playground 中的第 85 行。

但是,如果生產者在第85 行檢查context.done(),則仍然可能出現死鎖,然後取消上下文,導致所有消費者關閉,然後生產者嘗試將資料插入佇列?

如果是這樣如何緩解。

重新發布程式碼:

package main

import (
    "context"
    "fmt"
    "sync"
)

func main() {
    a1 := []int{1, 2, 3, 4, 5}
    a2 := []int{5, 4, 3, 1, 1}
    a3 := []int{6, 7, 8, 9}
    a4 := []int{1, 2, 3, 4, 5}
    a5 := []int{5, 4, 3, 1, 1}
    a6 := []int{6, 7, 18, 9}
    arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}

    ctx, cancel := context.WithCancel(context.Background())
    ch1 := read(ctx, arrayOfArray)

    messageCh := make(chan int)
    errCh := make(chan error)

    producerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        producerWg.Add(1)
        producer(ctx, producerWg, ch1, messageCh, errCh)
    }

    consumerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        consumerWg.Add(1)
        consumer(ctx, consumerWg, messageCh, errCh)
    }

    firstError := handleAllErrors(ctx, cancel, errCh)

    producerWg.Wait()
    close(messageCh)

    consumerWg.Wait()
    close(errCh)

    fmt.Println(<-firstError)
}

func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
    ch := make(chan []int)

    go func() {
        defer close(ch)

        for i := 0; i < len(arrayOfArray); i++ {
            select {
            case <-ctx.Done():
                return
            case ch <- arrayOfArray[i]:
            }
        }
    }()

    return ch
}

func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
    go func() {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case arr, ok := <-in:
                if !ok {
                    return
                }

                for i := 0; i < len(arr); i++ {

                    // simulating an error.
                    //if arr[i] == 10 {
                    //  errCh <- fmt.Errorf("producer interrupted")
                    //}

                    select {
                    case <-ctx.Done():
                        return
                    case messageCh <- 2 * arr[i]:
                    }
                }
            }
        }
    }()
}

func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
    go func() {
        wg.Done()

        for {
            select {
            case <-ctx.Done():
                return
            case n, ok := <-messageCh:
                if !ok {
                    return
                }
                fmt.Println("consumed: ", n)

                // simulating erros
                //if n == 10 {
                //  errCh <- fmt.Errorf("output error during write")
                //}
            }
        }
    }()
}

func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
    firstErrCh := make(chan error, 1)
    isFirstError := true
    go func() {
        defer close(firstErrCh)
        for err := range errCh {
            select {
            case <-ctx.Done():
            default:
                cancel()
            }
            if isFirstError {
                firstErrCh <- err
                isFirstError = !isFirstError
            }
        }
    }()

    return firstErrCh
}

正確答案


不,你沒問題,這不會在生產者寫入時陷入死鎖,因為你將通道寫入包裝在select 語句中,所以即使通道寫入由於消費者已終止而無法發生,你'仍然會遇到上下文取消子句並終止您的生產者。

只是為了演示這個概念,您可以運行它並看到它沒有死鎖,儘管它正在嘗試在沒有讀取器的情況下進行通道寫入。

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    ch := make(chan struct{})

    go func() {
        time.Sleep(1 * time.Second)
        cancel()
    }()

    select {
    case ch <- struct{}{}:
    case <-ctx.Done():
        fmt.Println("context canceled")
    }
    fmt.Println("bye!")
}

這是它的遊樂場連結

關於一些程式碼簡化。如果這是任何類型的現實生活代碼,我可能只使用 golang.org/x/sync/errgroup 中的 group 。或從他們那裡得到提示並利用sync.once用一個生成goroutine 的函數包裝所有生產者和消費者,並且可以處理錯誤,而無需在錯誤中使用更複雜的錯誤通道排出程式碼處理函數。

以上是Go 生產者消費者避免死鎖的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除