>  기사  >  백엔드 개발  >  Go 생산자-소비자는 교착 상태를 방지합니다.

Go 생산자-소비자는 교착 상태를 방지합니다.

王林
王林앞으로
2024-02-05 22:48:111205검색

Go 生产者消费者避免死锁

질문 내용

소비자와 생산자에 대한 코드가 있습니다. 코드 검토를 위해 여기에 이 ​​질문을 했고 아이디어의 상당 부분이 이 스레드에서 파생되었지만 여기에 플레이그라운드의 코드가 있습니다.

  • 이 코드에는 동일한 채널을 공유하는 여러 생산자와 소비자가 있습니다.
  • 이 코드에는 오류 처리 메커니즘이 있습니다. 작업자(생산자 또는 소비자) 오류가 발생하면 모든 작업자가 중지되어야 합니다.

모든 소비자가 다운되었지만 생산자는 여전히 공유 채널에 데이터를 추가하는 교착 상태 시나리오가 걱정됩니다. 이 문제를 "완화"하기 위해 데이터 큐에 데이터를 추가하기 전에 컨텍스트 확인을 추가했습니다. 특히 go Playground의 85번째 줄을 추가했습니다.

그러나 producer가 85행에서 context.done()을 확인하면 교착 상태가 발생하고 컨텍스트를 취소하여 모든 consumers가 종료되고 producer가 컨텍스트에 데이터를 삽입하려고 시도하는 것이 가능합니까? 대기줄?

그렇다면 어떻게 완화할 수 있을까요?

다시 게시 코드:

package main

import (
    "context"
    "fmt"
    "sync"
)

func main() {
    a1 := []int{1, 2, 3, 4, 5}
    a2 := []int{5, 4, 3, 1, 1}
    a3 := []int{6, 7, 8, 9}
    a4 := []int{1, 2, 3, 4, 5}
    a5 := []int{5, 4, 3, 1, 1}
    a6 := []int{6, 7, 18, 9}
    arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}

    ctx, cancel := context.WithCancel(context.Background())
    ch1 := read(ctx, arrayOfArray)

    messageCh := make(chan int)
    errCh := make(chan error)

    producerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        producerWg.Add(1)
        producer(ctx, producerWg, ch1, messageCh, errCh)
    }

    consumerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        consumerWg.Add(1)
        consumer(ctx, consumerWg, messageCh, errCh)
    }

    firstError := handleAllErrors(ctx, cancel, errCh)

    producerWg.Wait()
    close(messageCh)

    consumerWg.Wait()
    close(errCh)

    fmt.Println(<-firstError)
}

func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
    ch := make(chan []int)

    go func() {
        defer close(ch)

        for i := 0; i < len(arrayOfArray); i++ {
            select {
            case <-ctx.Done():
                return
            case ch <- arrayOfArray[i]:
            }
        }
    }()

    return ch
}

func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
    go func() {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case arr, ok := <-in:
                if !ok {
                    return
                }

                for i := 0; i < len(arr); i++ {

                    // simulating an error.
                    //if arr[i] == 10 {
                    //  errCh <- fmt.Errorf("producer interrupted")
                    //}

                    select {
                    case <-ctx.Done():
                        return
                    case messageCh <- 2 * arr[i]:
                    }
                }
            }
        }
    }()
}

func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
    go func() {
        wg.Done()

        for {
            select {
            case <-ctx.Done():
                return
            case n, ok := <-messageCh:
                if !ok {
                    return
                }
                fmt.Println("consumed: ", n)

                // simulating erros
                //if n == 10 {
                //  errCh <- fmt.Errorf("output error during write")
                //}
            }
        }
    }()
}

func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
    firstErrCh := make(chan error, 1)
    isFirstError := true
    go func() {
        defer close(firstErrCh)
        for err := range errCh {
            select {
            case <-ctx.Done():
            default:
                cancel()
            }
            if isFirstError {
                firstErrCh <- err
                isFirstError = !isFirstError
            }
        }
    }()

    return firstErrCh
}

정답


아니요 문제 없습니다. 채널 쓰기를 select 문으로 감싸기 때문에 프로듀서 쓰기에 교착 상태가 발생하지 않으므로 소비자로 인해 채널 쓰기가 종료되더라도 그럴 수는 없지만, 여전히 컨텍스트 취소 조항에 도달하고 생산자를 종료합니다.

개념을 보여주기 위해 실행하면 리더 없이 채널에 쓰려고 시도하지만 교착 상태가 되지 않는 것을 확인할 수 있습니다.

으아악

이것은 놀이터 링크입니다.

일부 코드 단순화에 대해. 이것이 실제 코드라면 아마도 golang.org/x/sync/errgroup 中的 group 。或者从他们那里得到提示并利用 sync.once를 사용하여 모든 생산자와 소비자 를 고루틴을 생성하고 더 복잡한 오류 채널 배수 코드를 사용하지 않고도 오류를 처리할 수 있는 함수로 래핑했을 것입니다. 오류 처리 기능.

위 내용은 Go 생산자-소비자는 교착 상태를 방지합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 stackoverflow.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제