소비자와 생산자에 대한 코드가 있습니다. 코드 검토를 위해 여기에 이 질문을 했고 아이디어의 상당 부분이 이 스레드에서 파생되었지만 여기에 플레이그라운드의 코드가 있습니다.
모든 소비자가 다운되었지만 생산자는 여전히 공유 채널에 데이터를 추가하는 교착 상태 시나리오가 걱정됩니다. 이 문제를 "완화"하기 위해 데이터 큐에 데이터를 추가하기 전에 컨텍스트 확인을 추가했습니다. 특히 go Playground의 85번째 줄을 추가했습니다.
그러나 producer가 85행에서 context.done()을 확인하면 교착 상태가 발생하고 컨텍스트를 취소하여 모든 consumers가 종료되고 producer가 컨텍스트에 데이터를 삽입하려고 시도하는 것이 가능합니까? 대기줄?
그렇다면 어떻게 완화할 수 있을까요?
다시 게시 코드:
package main import ( "context" "fmt" "sync" ) func main() { a1 := []int{1, 2, 3, 4, 5} a2 := []int{5, 4, 3, 1, 1} a3 := []int{6, 7, 8, 9} a4 := []int{1, 2, 3, 4, 5} a5 := []int{5, 4, 3, 1, 1} a6 := []int{6, 7, 18, 9} arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6} ctx, cancel := context.WithCancel(context.Background()) ch1 := read(ctx, arrayOfArray) messageCh := make(chan int) errCh := make(chan error) producerWg := &sync.WaitGroup{} for i := 0; i < 3; i++ { producerWg.Add(1) producer(ctx, producerWg, ch1, messageCh, errCh) } consumerWg := &sync.WaitGroup{} for i := 0; i < 3; i++ { consumerWg.Add(1) consumer(ctx, consumerWg, messageCh, errCh) } firstError := handleAllErrors(ctx, cancel, errCh) producerWg.Wait() close(messageCh) consumerWg.Wait() close(errCh) fmt.Println(<-firstError) } func read(ctx context.Context, arrayOfArray [][]int) <-chan []int { ch := make(chan []int) go func() { defer close(ch) for i := 0; i < len(arrayOfArray); i++ { select { case <-ctx.Done(): return case ch <- arrayOfArray[i]: } } }() return ch } func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) { go func() { defer wg.Done() for { select { case <-ctx.Done(): return case arr, ok := <-in: if !ok { return } for i := 0; i < len(arr); i++ { // simulating an error. //if arr[i] == 10 { // errCh <- fmt.Errorf("producer interrupted") //} select { case <-ctx.Done(): return case messageCh <- 2 * arr[i]: } } } } }() } func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) { go func() { wg.Done() for { select { case <-ctx.Done(): return case n, ok := <-messageCh: if !ok { return } fmt.Println("consumed: ", n) // simulating erros //if n == 10 { // errCh <- fmt.Errorf("output error during write") //} } } }() } func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error { firstErrCh := make(chan error, 1) isFirstError := true go func() { defer close(firstErrCh) for err := range errCh { select { case <-ctx.Done(): default: cancel() } if isFirstError { firstErrCh <- err isFirstError = !isFirstError } } }() return firstErrCh }
아니요 문제 없습니다. 채널 쓰기를 select
문으로 감싸기 때문에 프로듀서 쓰기에 교착 상태가 발생하지 않으므로 소비자로 인해 채널 쓰기가 종료되더라도 그럴 수는 없지만, 여전히 컨텍스트 취소 조항에 도달하고 생산자를 종료합니다.
개념을 보여주기 위해 실행하면 리더 없이 채널에 쓰려고 시도하지만 교착 상태가 되지 않는 것을 확인할 수 있습니다.
으아악이것은 놀이터 링크입니다.
일부 코드 단순화에 대해. 이것이 실제 코드라면 아마도 golang.org/x/sync/errgroup
中的 group
。或者从他们那里得到提示并利用 sync.once
및 를 사용하여 모든 생산자와 소비자 를 고루틴을 생성하고 더 복잡한 오류 채널 배수 코드를 사용하지 않고도 오류를 처리할 수 있는 함수로 래핑했을 것입니다. 오류 처리 기능.
위 내용은 Go 생산자-소비자는 교착 상태를 방지합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!