Rumah >pembangunan bahagian belakang >Golang >Saluran penimbal segerak dan kumpulan tunggu

Saluran penimbal segerak dan kumpulan tunggu

PHPz
PHPzke hadapan
2024-02-09 08:09:191140semak imbas

Saluran penimbal segerak dan kumpulan tunggu

Editor PHP Zimo memperkenalkan saluran penimbal yang disegerakkan dan kumpulan menunggu, yang merupakan teknologi yang biasa digunakan dalam pengaturcaraan serentak. Saluran penimbal segerak membenarkan pemindahan data antara berbilang utas dan mencapai penyegerakan antara utas melalui penimbal. Kumpulan tunggu digunakan untuk mengurus kumpulan utas, menunggu syarat tertentu dipenuhi sebelum melaksanakannya secara serentak. Kedua-dua teknologi ini boleh menyelesaikan masalah penyegerakan antara utas dalam pengaturcaraan berbilang benang dengan berkesan dan meningkatkan prestasi serentak dan kebolehpercayaan program.

Kandungan soalan

Saya menggunakan waitgroupbuffered 通道时遇到问题。问题是 waitgroup untuk menutup sebelum saluran dibaca sepenuhnya, yang menjadikan saluran saya separuh dibaca dan pecah di tengah.

func main() {
    var wg sync.waitgroup
    var err error

    start := time.now()
    students := make([]studentdetails, 0)
    studentch := make(chan studentdetail, 10000)
    errorch := make(chan error, 1)

    wg.add(1)

    go s.getdetailstudents(rctx, studentch , errorch, &wg, s.link, false)
    go func(ch chan studentdetail, e chan error) {
    
    loop:
        for {
            select {
            case p, ok := <-ch:
                if ok {
                    l.printf("links %s: [%s]\n", p.title, p.link)
                    students = append(students, p)
                } else {
                    l.print("closed channel")
                    break loop
                }
            case err = <-e:
                if err != nil {
                    break
                }
            }
        }
    }(studentch, errorch)
    wg.wait()
    close(studentch)
    close(errorch)
    l.warnln("closed: all wait-groups completed!")
    l.warnf("total items fetched: %d", len(students))

    elapsed := time.since(start)
    l.warnf("operation took %s", elapsed)
}

Masalahnya ialah fungsi ini recursive。我的意思是一些 http 调用来获取 students dan kemudian membuat lebih banyak panggilan berdasarkan syarat.

func (s Student) getDetailStudents(rCtx context.Context, content chan<- studentDetail, errorCh chan<- error, wg *sync.WaitGroup, url string, subSection bool) {
    util.MustNotNil(rCtx)
    L := logger.GetLogger(rCtx)
    defer func() {
        L.Println("Closing all waitgroup!")
        wg.Done()
    }()

    wc := getWC()
    httpClient := wc.Registry.MustHTTPClient()
    res, err := httpClient.Get(url)
    if err != nil {
        L.Fatal(err)
    }
    defer res.Body.Close()
    if res.StatusCode != 200 {
        L.Errorf("status code error: %d %s", res.StatusCode, res.Status)
        errorCh <- errors.New("service_status_code")
        return
    }

    // parse response and return error if found some through errorCh as done above.
    // decide page subSection based on response if it is more.
    if !subSection {
        wg.Add(1)
        go s.getDetailStudents(rCtx, content, errorCh, wg, link, true)
        // L.Warnf("total pages found %d", pageSub.Length()+1)
    }

    // Find students from response list and parse each Student
    students := s.parseStudentItemList(rCtx, item)
    for _, student := range students {
        content <- student
    }
 
    L.Warnf("Calling HTTP Service for %q with total %d record", url, elementsSub.Length())
}

Tukar pembolehubah untuk mengelakkan asas kod asal.

Masalahnya ialah apabila kumpulan menunggu selesai, pelajar dibaca secara rawak. Saya mahu terus melaksanakan sehingga semua pelajar telah membaca, dan jika ralat berlaku, ralat itu akan pecah sebaik sahaja ia menghadapi ralat.

Penyelesaian

Anda perlu tahu bila goroutine penerima selesai. Kumpulan menunggu melakukan ini untuk pemijahan goroutine. Jadi anda boleh menggunakan dua kumpulan menunggu:

wg.Add(1)
go s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
wgReader.Add(1)
go func(ch chan studentDetail, e chan error) {
    defer wgReader.Done()
    ...
}
wg.Wait()
close(studentCh)
close(errorCh)
wgReader.Wait() // Wait for the readers to complete

Atas ialah kandungan terperinci Saluran penimbal segerak dan kumpulan tunggu. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:stackoverflow.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam
Artikel sebelumnya:Ketahui tentang "go mod vendor"Artikel seterusnya:Ketahui tentang "go mod vendor"