我有一个关于消费者和生产者的代码。虽然我在这里提出了这个问题以进行代码审查,并且这个想法的很大一部分是从这个线程中衍生出来的,这里是操场上的代码。
我担心死锁场景,即所有消费者都关闭,但生产者仍在向共享通道添加数据。为了“缓解”这个问题,我在将数据添加到数据队列之前添加了上下文检查 - 特别是 go playground 中的第 85 行。
但是,如果生产者在第 85 行检查 context.done(),则仍然可能出现死锁,然后取消上下文,导致所有消费者关闭,然后生产者尝试将数据插入队列?
如果是这样如何缓解。
重新发布代码:
package main import ( "context" "fmt" "sync" ) func main() { a1 := []int{1, 2, 3, 4, 5} a2 := []int{5, 4, 3, 1, 1} a3 := []int{6, 7, 8, 9} a4 := []int{1, 2, 3, 4, 5} a5 := []int{5, 4, 3, 1, 1} a6 := []int{6, 7, 18, 9} arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6} ctx, cancel := context.WithCancel(context.Background()) ch1 := read(ctx, arrayOfArray) messageCh := make(chan int) errCh := make(chan error) producerWg := &sync.WaitGroup{} for i := 0; i < 3; i++ { producerWg.Add(1) producer(ctx, producerWg, ch1, messageCh, errCh) } consumerWg := &sync.WaitGroup{} for i := 0; i < 3; i++ { consumerWg.Add(1) consumer(ctx, consumerWg, messageCh, errCh) } firstError := handleAllErrors(ctx, cancel, errCh) producerWg.Wait() close(messageCh) consumerWg.Wait() close(errCh) fmt.Println(<-firstError) } func read(ctx context.Context, arrayOfArray [][]int) <-chan []int { ch := make(chan []int) go func() { defer close(ch) for i := 0; i < len(arrayOfArray); i++ { select { case <-ctx.Done(): return case ch <- arrayOfArray[i]: } } }() return ch } func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) { go func() { defer wg.Done() for { select { case <-ctx.Done(): return case arr, ok := <-in: if !ok { return } for i := 0; i < len(arr); i++ { // simulating an error. //if arr[i] == 10 { // errCh <- fmt.Errorf("producer interrupted") //} select { case <-ctx.Done(): return case messageCh <- 2 * arr[i]: } } } } }() } func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) { go func() { wg.Done() for { select { case <-ctx.Done(): return case n, ok := <-messageCh: if !ok { return } fmt.Println("consumed: ", n) // simulating erros //if n == 10 { // errCh <- fmt.Errorf("output error during write") //} } } }() } func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error { firstErrCh := make(chan error, 1) isFirstError := true go func() { defer close(firstErrCh) for err := range errCh { select { case <-ctx.Done(): default: cancel() } if isFirstError { firstErrCh <- err isFirstError = !isFirstError } } }() return firstErrCh }
不,你没问题,这不会在生产者写入时陷入死锁,因为你将通道写入包装在 select
语句中,所以即使通道写入由于消费者已终止而无法发生,你'仍然会遇到上下文取消子句并终止您的生产者。
只是为了演示这个概念,您可以运行它并看到它没有死锁,尽管它正在尝试在没有读取器的情况下进行通道写入。
package main import ( "context" "fmt" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) go func() { time.Sleep(1 * time.Second) cancel() }() select { case ch <- struct{}{}: case <-ctx.Done(): fmt.Println("context canceled") } fmt.Println("bye!") }
这是它的游乐场链接。
关于一些代码简化。如果这是任何类型的现实生活代码,我可能只使用 golang.org/x/sync/errgroup
中的 group
。或者从他们那里得到提示并利用 sync.once
和 用一个生成 goroutine 的函数包装所有生产者和消费者,并且可以处理错误,而无需在错误中使用更复杂的错误通道排出代码处理函数。
以上是Go 生产者消费者避免死锁的详细内容。更多信息请关注PHP中文网其他相关文章!