首頁 >後端開發 >Golang >多個 goroutine 從同一通道讀取

多個 goroutine 從同一通道讀取

王林
王林轉載
2024-02-09 16:30:10514瀏覽

多个 goroutine 从同一通道读取

php小編草莓在本文中將為大家介紹多個goroutine從同一通道讀取的相關內容。在並發程式設計中,goroutine是Go語言中的輕量級線程,可以同時執行多個任務。通道是goroutine之間進行溝通的重要方式。當多個goroutine需要從同一個通道讀取資料時,我們需要注意一些問題,並採取相應的措施來確保程式的正確性和效率。在接下來的內容中,我們將詳細解釋這個過程,並提供一些實用的技巧和建議。

問題內容

考慮產生多個 goroutine 以從相同通道讀取值。兩個工作人員按預期生成,但只從通道中讀取一項並停止讀取。我期望 goroutine 繼續從通道讀取數據,直到將值發送到通道的 goroutine 關閉為止。儘管某些東西阻止了發送者發送,但生成項目的 goroutine 並未關閉。為什麼每個工人只讀取一個值並停止?

輸出顯示發送的兩個值,每個工作 goroutine 各讀取一個值。第三個值已發送,但未從任何一個工作線程中讀取。

new worker
new worker
waiting
sending 0
sending 1
sending 2
running func 1
sending value out 1
running func 0
sending value out 0

去遊樂場

package main

import (
    "fmt"
    "sync"
)

func workerPool(done <-chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    for i := 0; i < numberOfWorkers; i++ {
        fmt.Println("new worker")
        wg.Add(1)
        // fan out worker goroutines reading from in channel and
        // send output into out channel
        go func() {
            defer wg.Done()
            for {
                select {
                case <-done:
                    fmt.Println("recieved done signal")
                    return
                case data, ok := <-in:
                    if !ok {
                        fmt.Println("no more items")
                        return
                    }
                    // fan-in job execution multiplexing results into the results channel
                    fmt.Println("running func", data)
                    value := fn(data)
                    fmt.Println("sending value out", value)
                    out <- value
                }
            }
        }()
    }

    fmt.Println("waiting")
    wg.Wait()
    fmt.Println("done waiting")
    close(out)
    return out
}

func main() {
    done := make(chan bool)
    defer close(done)

    in := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println("sending", i)
            in <- i
        }
        close(in)
    }()

    out := workerPool(done, in, 2, func(i int) int {
        return i
    })

    for {
        select {
        case o, ok := <-out:
            if !ok {
                continue
            }

            fmt.Println("output", o)
        case <-done:
            return
        default:
        }
    }

}

解決方法

先前關於通道未緩衝的評論是正確的,但還有其他同步問題。

無緩衝通道本質上表示寫入值時,必須在發生任何其他寫入之前接收該值。

  1. workerpool 建立一個無緩衝通道 out 來儲存結果,但只有在所有結果寫入 out 後才會回傳。但由於從out 通道的讀取發生在out 返回之後,且out 沒有緩衝,因此workerpool 在嘗試寫入時被阻塞,從而導致死鎖。這就是為什麼看起來每個工作人員只發送一個值;實際上,在發送第一個之後,所有工作人員都被阻止,因為沒有任何東西可以接收該值(您可以透過在寫入out 後移動print 語句來看到這一點)

修復選項包含讓out 有一個大小為n = 結果數 的緩衝區(即out := make(chan int, n))或使out 不緩衝並在寫入時從out 進行讀取。

  1. done 頻道也沒有被正確使用。 mainworkerpool 都依賴它來停止執行,但沒有任何內容被寫入其中!它也是無緩衝的,因此也會遇到上述死鎖問題。

要解決此問題,您首先可以從workerpool 中刪除case <-done: 並簡單地透過in 進行範圍,因為它在main 中關閉。然後可以將done設定為緩衝通道來解決死鎖。

結合這些修復可以得到:

package main

import (
    "fmt"
    "sync"
)

func workerPool(done chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
    out := make(chan int, 100)
    var wg sync.WaitGroup

    for i := 0; i < numberOfWorkers; i++ {
        fmt.Println("new worker")
        wg.Add(1)
        // fan out worker goroutines reading from in channel and
        // send output into out channel
        go func() {
            defer wg.Done()
            for data := range in {
                // fan-in job execution multiplexing results into the results channel
                fmt.Println("running func", data)
                value := fn(data)
                fmt.Println("sending value out", value)
                out <- value

            }
            fmt.Println("no more items")
            return
        }()
    }

    fmt.Println("waiting")
    wg.Wait()
    fmt.Println("done waiting")
    close(out)
    done <- true
    close(done)
    return out
}

func main() {
    done := make(chan bool, 1)

    in := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println("sending", i)
            in <- i
        }
        close(in)
    }()

    out := workerPool(done, in, 2, func(i int) int {
        return i
    })

    for {
        select {
        case o, ok := <-out:
            if !ok {
                continue
            }

            fmt.Println("output", o)
        case <-done:
            return
        }
    }

}

這可以解決您的問題,但這不是使用頻道的最佳方式!結構本身可以更改得更簡單,而不必依賴緩衝通道。

以上是多個 goroutine 從同一通道讀取的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除