Maison >développement back-end >Golang >Canaux tampons synchrones et groupes d'attente

Canaux tampons synchrones et groupes d'attente

PHPz
PHPzavant
2024-02-09 08:09:191141parcourir

Canaux tampons synchrones et groupes dattente

L'éditeur PHP Zimo introduit les canaux tampons synchronisés et les groupes d'attente, qui sont une technologie couramment utilisée dans la programmation simultanée. Les canaux tampons synchrones permettent le transfert de données entre plusieurs threads et réalisent la synchronisation entre les threads via des tampons. Le groupe d'attente permet de gérer un groupe de threads, en attendant qu'une condition spécifique soit remplie avant de les exécuter simultanément. Ces deux technologies peuvent résoudre efficacement le problème de synchronisation entre les threads dans la programmation multi-thread et améliorer les performances de concurrence et la fiabilité du programme.

Contenu de la question

J'utilise waitgroupbuffered 通道时遇到问题。问题是 waitgroup pour fermer avant que la chaîne ne soit entièrement lue, ce qui rend ma chaîne à moitié lue et s'interrompt au milieu.

func main() {
    var wg sync.waitgroup
    var err error

    start := time.now()
    students := make([]studentdetails, 0)
    studentch := make(chan studentdetail, 10000)
    errorch := make(chan error, 1)

    wg.add(1)

    go s.getdetailstudents(rctx, studentch , errorch, &wg, s.link, false)
    go func(ch chan studentdetail, e chan error) {
    
    loop:
        for {
            select {
            case p, ok := <-ch:
                if ok {
                    l.printf("links %s: [%s]\n", p.title, p.link)
                    students = append(students, p)
                } else {
                    l.print("closed channel")
                    break loop
                }
            case err = <-e:
                if err != nil {
                    break
                }
            }
        }
    }(studentch, errorch)
    wg.wait()
    close(studentch)
    close(errorch)
    l.warnln("closed: all wait-groups completed!")
    l.warnf("total items fetched: %d", len(students))

    elapsed := time.since(start)
    l.warnf("operation took %s", elapsed)
}

Le problème est que cette fonction est recursive。我的意思是一些 http 调用来获取 students et effectue ensuite plus d'appels en fonction de la condition.

func (s Student) getDetailStudents(rCtx context.Context, content chan<- studentDetail, errorCh chan<- error, wg *sync.WaitGroup, url string, subSection bool) {
    util.MustNotNil(rCtx)
    L := logger.GetLogger(rCtx)
    defer func() {
        L.Println("Closing all waitgroup!")
        wg.Done()
    }()

    wc := getWC()
    httpClient := wc.Registry.MustHTTPClient()
    res, err := httpClient.Get(url)
    if err != nil {
        L.Fatal(err)
    }
    defer res.Body.Close()
    if res.StatusCode != 200 {
        L.Errorf("status code error: %d %s", res.StatusCode, res.Status)
        errorCh <- errors.New("service_status_code")
        return
    }

    // parse response and return error if found some through errorCh as done above.
    // decide page subSection based on response if it is more.
    if !subSection {
        wg.Add(1)
        go s.getDetailStudents(rCtx, content, errorCh, wg, link, true)
        // L.Warnf("total pages found %d", pageSub.Length()+1)
    }

    // Find students from response list and parse each Student
    students := s.parseStudentItemList(rCtx, item)
    for _, student := range students {
        content <- student
    }
 
    L.Warnf("Calling HTTP Service for %q with total %d record", url, elementsSub.Length())
}

Modifiez les variables pour éviter la base de code d'origine.

Le problème est qu'une fois le groupe d'attente terminé, les élèves sont lus au hasard. Je souhaite continuer à exécuter jusqu'à ce que tous les étudiants aient lu, et si une erreur se produit, elle devrait s'interrompre dès qu'elle rencontre une erreur.

Solution de contournement

Vous devez savoir quand la goroutine de réception est terminée. Le groupe d'attente fait cela pour générer des goroutines. Vous pouvez donc utiliser deux groupes d'attente :

wg.Add(1)
go s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
wgReader.Add(1)
go func(ch chan studentDetail, e chan error) {
    defer wgReader.Done()
    ...
}
wg.Wait()
close(studentCh)
close(errorCh)
wgReader.Wait() // Wait for the readers to complete

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer