首页  >  文章  >  后端开发  >  多个 goroutine 从同一通道读取

多个 goroutine 从同一通道读取

王林
王林转载
2024-02-09 16:30:10426浏览

多个 goroutine 从同一通道读取

php小编草莓在本文中将为大家介绍多个goroutine从同一通道读取的相关内容。在并发编程中,goroutine是Go语言中的轻量级线程,可以同时执行多个任务。通道是goroutine之间进行通信的重要方式。当多个goroutine需要从同一个通道读取数据时,我们需要注意一些问题,并采取相应的措施来确保程序的正确性和效率。在接下来的内容中,我们将详细解释这个过程,并提供一些实用的技巧和建议。

问题内容

考虑生成多个 goroutine 以从同一通道读取值。两个工作人员按预期生成,但只从通道中读取一项并停止读取。我期望 goroutine 继续从通道读取数据,直到将值发送到通道的 goroutine 关闭为止。尽管某些东西阻止了发送者发送,但生成项目的 goroutine 并未关闭。为什么每个工人只读取一个值并停止?

输出显示发送的两个值,每个工作 goroutine 各读取一个值。第三个值已发送,但未从任何一个工作线程中读取。

new worker
new worker
waiting
sending 0
sending 1
sending 2
running func 1
sending value out 1
running func 0
sending value out 0

去游乐场

package main

import (
    "fmt"
    "sync"
)

func workerPool(done <-chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    for i := 0; i < numberOfWorkers; i++ {
        fmt.Println("new worker")
        wg.Add(1)
        // fan out worker goroutines reading from in channel and
        // send output into out channel
        go func() {
            defer wg.Done()
            for {
                select {
                case <-done:
                    fmt.Println("recieved done signal")
                    return
                case data, ok := <-in:
                    if !ok {
                        fmt.Println("no more items")
                        return
                    }
                    // fan-in job execution multiplexing results into the results channel
                    fmt.Println("running func", data)
                    value := fn(data)
                    fmt.Println("sending value out", value)
                    out <- value
                }
            }
        }()
    }

    fmt.Println("waiting")
    wg.Wait()
    fmt.Println("done waiting")
    close(out)
    return out
}

func main() {
    done := make(chan bool)
    defer close(done)

    in := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println("sending", i)
            in <- i
        }
        close(in)
    }()

    out := workerPool(done, in, 2, func(i int) int {
        return i
    })

    for {
        select {
        case o, ok := <-out:
            if !ok {
                continue
            }

            fmt.Println("output", o)
        case <-done:
            return
        default:
        }
    }

}

解决方法

之前关于通道未缓冲的评论是正确的,但还存在其他同步问题。

无缓冲通道本质上意味着写入值时,必须在发生任何其他写入之前接收该值。

  1. workerpool 创建一个无缓冲通道 out 来存储结果,但只有在所有结果写入 out 后才返回。但由于从 out 通道的读取发生在 out 返回之后,并且 out 没有缓冲,因此 workerpool 在尝试写入时被阻塞,从而导致死锁。这就是为什么看起来每个工作人员只发送一个值;实际上,在发送第一个之后,所有工作人员都被阻止,因为没有任何东西可以接收该值(您可以通过在写入 out 后移动 print 语句来看到这一点)

修复选项包括使 out 有一个大小为 n = 结果数 的缓冲区(即 out := make(chan int, n))或使 out 不缓冲并在写入时从 out 进行读取。

  1. done 频道也没有被正确使用。 mainworkerpool 都依赖它来停止执行,但没有任何内容被写入其中!它也是无缓冲的,因此也会遇到上述死锁问题。

要解决此问题,您首先可以从 workerpool 中删除 case <-done: 并简单地通过 in 进行范围,因为它在 main 中关闭。然后可以将done设置为缓冲通道来解决死锁。

结合这些修复可以得到:

package main

import (
    "fmt"
    "sync"
)

func workerPool(done chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
    out := make(chan int, 100)
    var wg sync.WaitGroup

    for i := 0; i < numberOfWorkers; i++ {
        fmt.Println("new worker")
        wg.Add(1)
        // fan out worker goroutines reading from in channel and
        // send output into out channel
        go func() {
            defer wg.Done()
            for data := range in {
                // fan-in job execution multiplexing results into the results channel
                fmt.Println("running func", data)
                value := fn(data)
                fmt.Println("sending value out", value)
                out <- value

            }
            fmt.Println("no more items")
            return
        }()
    }

    fmt.Println("waiting")
    wg.Wait()
    fmt.Println("done waiting")
    close(out)
    done <- true
    close(done)
    return out
}

func main() {
    done := make(chan bool, 1)

    in := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println("sending", i)
            in <- i
        }
        close(in)
    }()

    out := workerPool(done, in, 2, func(i int) int {
        return i
    })

    for {
        select {
        case o, ok := <-out:
            if !ok {
                continue
            }

            fmt.Println("output", o)
        case <-done:
            return
        }
    }

}

这可以解决您的问题,但这不是使用频道的最佳方式!结构本身可以更改得更简单,而不必依赖缓冲通道。

以上是多个 goroutine 从同一通道读取的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文转载于:stackoverflow.com。如有侵权,请联系admin@php.cn删除