我有一個關於消費者和生產者的程式碼。雖然我在這裡提出了這個問題以進行程式碼審查,而這個想法的很大一部分是從這個線程中衍生出來的,這裡是操場上的程式碼。
我擔心死鎖場景,即所有消費者都關閉,但生產者仍在向共享通道添加資料。為了「緩解」這個問題,我在將資料新增至資料佇列之前新增了上下文檢查 - 特別是 go playground 中的第 85 行。
但是,如果生產者在第85 行檢查context.done(),則仍然可能出現死鎖,然後取消上下文,導致所有消費者關閉,然後生產者嘗試將資料插入佇列?
如果是這樣如何緩解。
重新發布程式碼:
package main import ( "context" "fmt" "sync" ) func main() { a1 := []int{1, 2, 3, 4, 5} a2 := []int{5, 4, 3, 1, 1} a3 := []int{6, 7, 8, 9} a4 := []int{1, 2, 3, 4, 5} a5 := []int{5, 4, 3, 1, 1} a6 := []int{6, 7, 18, 9} arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6} ctx, cancel := context.WithCancel(context.Background()) ch1 := read(ctx, arrayOfArray) messageCh := make(chan int) errCh := make(chan error) producerWg := &sync.WaitGroup{} for i := 0; i < 3; i++ { producerWg.Add(1) producer(ctx, producerWg, ch1, messageCh, errCh) } consumerWg := &sync.WaitGroup{} for i := 0; i < 3; i++ { consumerWg.Add(1) consumer(ctx, consumerWg, messageCh, errCh) } firstError := handleAllErrors(ctx, cancel, errCh) producerWg.Wait() close(messageCh) consumerWg.Wait() close(errCh) fmt.Println(<-firstError) } func read(ctx context.Context, arrayOfArray [][]int) <-chan []int { ch := make(chan []int) go func() { defer close(ch) for i := 0; i < len(arrayOfArray); i++ { select { case <-ctx.Done(): return case ch <- arrayOfArray[i]: } } }() return ch } func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) { go func() { defer wg.Done() for { select { case <-ctx.Done(): return case arr, ok := <-in: if !ok { return } for i := 0; i < len(arr); i++ { // simulating an error. //if arr[i] == 10 { // errCh <- fmt.Errorf("producer interrupted") //} select { case <-ctx.Done(): return case messageCh <- 2 * arr[i]: } } } } }() } func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) { go func() { wg.Done() for { select { case <-ctx.Done(): return case n, ok := <-messageCh: if !ok { return } fmt.Println("consumed: ", n) // simulating erros //if n == 10 { // errCh <- fmt.Errorf("output error during write") //} } } }() } func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error { firstErrCh := make(chan error, 1) isFirstError := true go func() { defer close(firstErrCh) for err := range errCh { select { case <-ctx.Done(): default: cancel() } if isFirstError { firstErrCh <- err isFirstError = !isFirstError } } }() return firstErrCh }
不,你沒問題,這不會在生產者寫入時陷入死鎖,因為你將通道寫入包裝在select
語句中,所以即使通道寫入由於消費者已終止而無法發生,你'仍然會遇到上下文取消子句並終止您的生產者。
只是為了演示這個概念,您可以運行它並看到它沒有死鎖,儘管它正在嘗試在沒有讀取器的情況下進行通道寫入。
package main import ( "context" "fmt" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) go func() { time.Sleep(1 * time.Second) cancel() }() select { case ch <- struct{}{}: case <-ctx.Done(): fmt.Println("context canceled") } fmt.Println("bye!") }
這是它的遊樂場連結。
關於一些程式碼簡化。如果這是任何類型的現實生活代碼,我可能只使用 golang.org/x/sync/errgroup
中的 group
。或從他們那裡得到提示並利用sync.once
和用一個生成goroutine 的函數包裝所有生產者和消費者,並且可以處理錯誤,而無需在錯誤中使用更複雜的錯誤通道排出程式碼處理函數。
以上是Go 生產者消費者避免死鎖的詳細內容。更多資訊請關注PHP中文網其他相關文章!