Rumah  >  Artikel  >  pembangunan bahagian belakang  >  Go pengeluar-pengguna mengelakkan kebuntuan

Go pengeluar-pengguna mengelakkan kebuntuan

王林
王林ke hadapan
2024-02-05 22:48:111205semak imbas

Go 生产者消费者避免死锁

Kandungan soalan

Saya mempunyai kod tentang pengguna dan pengeluar. Walaupun saya bertanya soalan ini di sini untuk semakan kod, dan sebahagian besar idea itu diperoleh daripada benang ini, berikut ialah kod di taman permainan.

  • Kod ini mempunyai berbilang pengeluar dan pengguna berkongsi saluran yang sama.
  • Kod ini mempunyai mekanisme pengendalian ralat, jika ada kesilapan pekerja (pengeluar atau pengguna), semua pekerja harus berhenti.

Saya bimbang tentang senario kebuntuan di mana semua pengguna gagal tetapi pengeluar masih menambah data pada saluran kongsi. Untuk "mengurangkan" masalah ini, saya menambah semakan konteks sebelum menambah data pada baris gilir data - khususnya baris 85 di taman permainan pergi.

Tetapi jika producer menyemak context.done() pada baris 85, adakah masih mungkin untuk menemui jalan buntu dan kemudian membatalkan konteks, menyebabkan semua pengguna ditutup, dan kemudian producer cuba memasukkan data ke dalam beratur?

Jika ya, bagaimana untuk mengurangkannya.

Kod pos semula:

package main

import (
    "context"
    "fmt"
    "sync"
)

func main() {
    a1 := []int{1, 2, 3, 4, 5}
    a2 := []int{5, 4, 3, 1, 1}
    a3 := []int{6, 7, 8, 9}
    a4 := []int{1, 2, 3, 4, 5}
    a5 := []int{5, 4, 3, 1, 1}
    a6 := []int{6, 7, 18, 9}
    arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}

    ctx, cancel := context.WithCancel(context.Background())
    ch1 := read(ctx, arrayOfArray)

    messageCh := make(chan int)
    errCh := make(chan error)

    producerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        producerWg.Add(1)
        producer(ctx, producerWg, ch1, messageCh, errCh)
    }

    consumerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        consumerWg.Add(1)
        consumer(ctx, consumerWg, messageCh, errCh)
    }

    firstError := handleAllErrors(ctx, cancel, errCh)

    producerWg.Wait()
    close(messageCh)

    consumerWg.Wait()
    close(errCh)

    fmt.Println(<-firstError)
}

func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
    ch := make(chan []int)

    go func() {
        defer close(ch)

        for i := 0; i < len(arrayOfArray); i++ {
            select {
            case <-ctx.Done():
                return
            case ch <- arrayOfArray[i]:
            }
        }
    }()

    return ch
}

func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
    go func() {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case arr, ok := <-in:
                if !ok {
                    return
                }

                for i := 0; i < len(arr); i++ {

                    // simulating an error.
                    //if arr[i] == 10 {
                    //  errCh <- fmt.Errorf("producer interrupted")
                    //}

                    select {
                    case <-ctx.Done():
                        return
                    case messageCh <- 2 * arr[i]:
                    }
                }
            }
        }
    }()
}

func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
    go func() {
        wg.Done()

        for {
            select {
            case <-ctx.Done():
                return
            case n, ok := <-messageCh:
                if !ok {
                    return
                }
                fmt.Println("consumed: ", n)

                // simulating erros
                //if n == 10 {
                //  errCh <- fmt.Errorf("output error during write")
                //}
            }
        }
    }()
}

func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
    firstErrCh := make(chan error, 1)
    isFirstError := true
    go func() {
        defer close(firstErrCh)
        for err := range errCh {
            select {
            case <-ctx.Done():
            default:
                cancel()
            }
            if isFirstError {
                firstErrCh <- err
                isFirstError = !isFirstError
            }
        }
    }()

    return firstErrCh
}

Jawapan betul


Tidak, anda tiada masalah, ini tidak akan menemui jalan buntu pada penulisan penerbit kerana anda membungkus saluran tulis dalam kenyataan select jadi walaupun penulisan saluran ditamatkan kerana pengguna Walaupun itu tidak boleh berlaku, anda akan tetap menekan klausa pembatalan konteks dan menamatkan penerbit anda.

Hanya untuk menunjukkan konsep, anda boleh menjalankannya dan melihat bahawa ia tidak menemui jalan buntu, walaupun ia cuba menulis ke saluran tanpa pembaca.

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    ch := make(chan struct{})

    go func() {
        time.Sleep(1 * time.Second)
        cancel()
    }()

    select {
    case ch <- struct{}{}:
    case <-ctx.Done():
        fmt.Println("context canceled")
    }
    fmt.Println("bye!")
}

Ini ialah pautan taman permainannya.

Mengenai beberapa pemudahan kod. Jika ini adalah sebarang jenis kod kehidupan sebenar, saya mungkin hanya akan menggunakan golang.org/x/sync/errgroup 中的 group 。或者从他们那里得到提示并利用 sync.once dan untuk membalut semua pengeluar dan pengguna dengan fungsi yang menghasilkan goroutine dan boleh menangani ralat tanpa perlu menggunakan kod saluran saluran ralat yang lebih kompleks dalam fungsi pemprosesan ralat.

Atas ialah kandungan terperinci Go pengeluar-pengguna mengelakkan kebuntuan. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:stackoverflow.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam