php小編草莓在本文中將為大家介紹多個goroutine從同一通道讀取的相關內容。在並發程式設計中,goroutine是Go語言中的輕量級線程,可以同時執行多個任務。通道是goroutine之間進行溝通的重要方式。當多個goroutine需要從同一個通道讀取資料時,我們需要注意一些問題,並採取相應的措施來確保程式的正確性和效率。在接下來的內容中,我們將詳細解釋這個過程,並提供一些實用的技巧和建議。
考慮產生多個 goroutine 以從相同通道讀取值。兩個工作人員按預期生成,但只從通道中讀取一項並停止讀取。我期望 goroutine 繼續從通道讀取數據,直到將值發送到通道的 goroutine 關閉為止。儘管某些東西阻止了發送者發送,但生成項目的 goroutine 並未關閉。為什麼每個工人只讀取一個值並停止?
輸出顯示發送的兩個值,每個工作 goroutine 各讀取一個值。第三個值已發送,但未從任何一個工作線程中讀取。
new worker new worker waiting sending 0 sending 1 sending 2 running func 1 sending value out 1 running func 0 sending value out 0
去遊樂場
package main import ( "fmt" "sync" ) func workerPool(done <-chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int { out := make(chan int) var wg sync.WaitGroup for i := 0; i < numberOfWorkers; i++ { fmt.Println("new worker") wg.Add(1) // fan out worker goroutines reading from in channel and // send output into out channel go func() { defer wg.Done() for { select { case <-done: fmt.Println("recieved done signal") return case data, ok := <-in: if !ok { fmt.Println("no more items") return } // fan-in job execution multiplexing results into the results channel fmt.Println("running func", data) value := fn(data) fmt.Println("sending value out", value) out <- value } } }() } fmt.Println("waiting") wg.Wait() fmt.Println("done waiting") close(out) return out } func main() { done := make(chan bool) defer close(done) in := make(chan int) go func() { for i := 0; i < 10; i++ { fmt.Println("sending", i) in <- i } close(in) }() out := workerPool(done, in, 2, func(i int) int { return i }) for { select { case o, ok := <-out: if !ok { continue } fmt.Println("output", o) case <-done: return default: } } }
先前關於通道未緩衝的評論是正確的,但還有其他同步問題。
無緩衝通道本質上表示寫入值時,必須在發生任何其他寫入之前接收該值。
workerpool
建立一個無緩衝通道 out
來儲存結果,但只有在所有結果寫入 out 後才會回傳。但由於從out 通道的讀取發生在out
返回之後,且out
沒有緩衝,因此workerpool
在嘗試寫入時被阻塞,從而導致死鎖。這就是為什麼看起來每個工作人員只發送一個值;實際上,在發送第一個之後,所有工作人員都被阻止,因為沒有任何東西可以接收該值(您可以透過在寫入out
後移動print 語句來看到這一點)修復選項包含讓out
有一個大小為n = 結果數
的緩衝區(即out := make(chan int, n)
)或使out
不緩衝並在寫入時從out
進行讀取。
done
頻道也沒有被正確使用。 main
和 workerpool
都依賴它來停止執行,但沒有任何內容被寫入其中!它也是無緩衝的,因此也會遇到上述死鎖問題。 要解決此問題,您首先可以從workerpool
中刪除case <-done:
並簡單地透過in
進行範圍,因為它在main
中關閉。然後可以將done
設定為緩衝通道來解決死鎖。
結合這些修復可以得到:
package main import ( "fmt" "sync" ) func workerPool(done chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int { out := make(chan int, 100) var wg sync.WaitGroup for i := 0; i < numberOfWorkers; i++ { fmt.Println("new worker") wg.Add(1) // fan out worker goroutines reading from in channel and // send output into out channel go func() { defer wg.Done() for data := range in { // fan-in job execution multiplexing results into the results channel fmt.Println("running func", data) value := fn(data) fmt.Println("sending value out", value) out <- value } fmt.Println("no more items") return }() } fmt.Println("waiting") wg.Wait() fmt.Println("done waiting") close(out) done <- true close(done) return out } func main() { done := make(chan bool, 1) in := make(chan int) go func() { for i := 0; i < 10; i++ { fmt.Println("sending", i) in <- i } close(in) }() out := workerPool(done, in, 2, func(i int) int { return i }) for { select { case o, ok := <-out: if !ok { continue } fmt.Println("output", o) case <-done: return } } }
這可以解決您的問題,但這不是使用頻道的最佳方式!結構本身可以更改得更簡單,而不必依賴緩衝通道。
以上是多個 goroutine 從同一通道讀取的詳細內容。更多資訊請關注PHP中文網其他相關文章!