Rumah >pembangunan bahagian belakang >Golang >Berbilang goroutine membaca dari saluran yang sama

Berbilang goroutine membaca dari saluran yang sama

王林
王林ke hadapan
2024-02-09 16:30:10515semak imbas

多个 goroutine 从同一通道读取

editor php Strawberry akan memperkenalkan kepada anda kandungan yang berkaitan dengan pelbagai bacaan goroutine dari saluran yang sama dalam artikel ini. Dalam pengaturcaraan serentak, goroutine ialah utas ringan dalam bahasa Go yang boleh melaksanakan berbilang tugas pada masa yang sama. Saluran ialah cara penting untuk berkomunikasi antara goroutine. Apabila berbilang goroutine perlu membaca data dari saluran yang sama, kita perlu memberi perhatian kepada beberapa isu dan mengambil langkah yang sepadan untuk memastikan ketepatan dan kecekapan program. Dalam perkara berikut, kami akan menerangkan proses secara terperinci dan memberikan beberapa petua dan nasihat praktikal.

Kandungan soalan

Pertimbangkan untuk melahirkan berbilang goroutin untuk membaca nilai dari saluran yang sama. Kedua-dua pekerja dijana seperti yang diharapkan, tetapi hanya membaca satu item daripada saluran dan berhenti membaca. Saya menjangkakan goroutine akan terus membaca data dari saluran sehingga goroutine yang menghantar nilai ke saluran ditutup. Walaupun ada sesuatu yang menghalang pengirim daripada menghantar, goroutine yang melahirkan projek itu tidak ditutup. Mengapa setiap pekerja hanya membaca satu nilai dan berhenti?

Output menunjukkan dua nilai yang dihantar, satu dibaca oleh setiap goroutine pekerja. Nilai ketiga dihantar tetapi tidak dibaca dari mana-mana benang pekerja.

new worker
new worker
waiting
sending 0
sending 1
sending 2
running func 1
sending value out 1
running func 0
sending value out 0

Pergi ke taman permainan

package main

import (
    "fmt"
    "sync"
)

func workerPool(done <-chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    for i := 0; i < numberOfWorkers; i++ {
        fmt.Println("new worker")
        wg.Add(1)
        // fan out worker goroutines reading from in channel and
        // send output into out channel
        go func() {
            defer wg.Done()
            for {
                select {
                case <-done:
                    fmt.Println("recieved done signal")
                    return
                case data, ok := <-in:
                    if !ok {
                        fmt.Println("no more items")
                        return
                    }
                    // fan-in job execution multiplexing results into the results channel
                    fmt.Println("running func", data)
                    value := fn(data)
                    fmt.Println("sending value out", value)
                    out <- value
                }
            }
        }()
    }

    fmt.Println("waiting")
    wg.Wait()
    fmt.Println("done waiting")
    close(out)
    return out
}

func main() {
    done := make(chan bool)
    defer close(done)

    in := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println("sending", i)
            in <- i
        }
        close(in)
    }()

    out := workerPool(done, in, 2, func(i int) int {
        return i
    })

    for {
        select {
        case o, ok := <-out:
            if !ok {
                continue
            }

            fmt.Println("output", o)
        case <-done:
            return
        default:
        }
    }

}

Penyelesaian

Ulasan sebelumnya tentang saluran tidak ditimbal adalah betul, tetapi terdapat isu penyegerakan lain.

Saluran tidak buffer pada dasarnya bermakna apabila nilai ditulis, nilai itu mesti diterima sebelum sebarang penulisan lain boleh berlaku.

  1. workerpool 创建一个无缓冲通道 out 来存储结果,但只有在所有结果写入 out 后才返回。但由于从 out 通道的读取发生在 out 返回之后,并且 out 没有缓冲,因此 workerpool 在尝试写入时被阻塞,从而导致死锁。这就是为什么看起来每个工作人员只发送一个值;实际上,在发送第一个之后,所有工作人员都被阻止,因为没有任何东西可以接收该值(您可以通过在写入 outAlihkan kenyataan cetakan kemudian untuk melihat ini)

Betulkan pilihan termasuk membuat out 有一个大小为 n = 结果数 的缓冲区(即 out := make(chan int, n))或使 out 不缓冲并在写入时从 out melakukan bacaan.

  1. done 频道也没有被正确使用。 mainworkerpool kedua-duanya bergantung padanya untuk menghentikan pelaksanaan, tetapi tiada apa yang ditulis kepadanya! Ia juga tidak dibuffer dan oleh itu mengalami masalah kebuntuan yang disebutkan di atas.

Untuk menyelesaikan isu ini, anda boleh terlebih dahulu menyelesaikan kebuntuan daripada workerpool 中删除 case <-done: 并简单地通过 in 进行范围,因为它在 main 中关闭。然后可以将donemenetapkan kepada saluran penimbal.

Gabungkan pembaikan ini untuk mendapatkan:

package main

import (
    "fmt"
    "sync"
)

func workerPool(done chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
    out := make(chan int, 100)
    var wg sync.WaitGroup

    for i := 0; i < numberOfWorkers; i++ {
        fmt.Println("new worker")
        wg.Add(1)
        // fan out worker goroutines reading from in channel and
        // send output into out channel
        go func() {
            defer wg.Done()
            for data := range in {
                // fan-in job execution multiplexing results into the results channel
                fmt.Println("running func", data)
                value := fn(data)
                fmt.Println("sending value out", value)
                out <- value

            }
            fmt.Println("no more items")
            return
        }()
    }

    fmt.Println("waiting")
    wg.Wait()
    fmt.Println("done waiting")
    close(out)
    done <- true
    close(done)
    return out
}

func main() {
    done := make(chan bool, 1)

    in := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println("sending", i)
            in <- i
        }
        close(in)
    }()

    out := workerPool(done, in, 2, func(i int) int {
        return i
    })

    for {
        select {
        case o, ok := <-out:
            if !ok {
                continue
            }

            fmt.Println("output", o)
        case <-done:
            return
        }
    }

}

Ini mungkin menyelesaikan masalah anda, tetapi ini bukan cara terbaik untuk menggunakan saluran! Struktur itu sendiri boleh diubah dengan lebih mudah tanpa perlu bergantung pada saluran penimbal.

Atas ialah kandungan terperinci Berbilang goroutine membaca dari saluran yang sama. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:stackoverflow.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam