Heim  >  Artikel  >  Backend-Entwicklung  >  Synchrone Pufferkanäle und Wartegruppen

Synchrone Pufferkanäle und Wartegruppen

PHPz
PHPznach vorne
2024-02-09 08:09:191078Durchsuche

Synchrone Pufferkanäle und Wartegruppen

Der PHP-Editor Zimo führt synchronisierte Pufferkanäle und Wartegruppen ein, eine häufig verwendete Technologie bei der gleichzeitigen Programmierung. Synchrone Pufferkanäle ermöglichen die Datenübertragung zwischen mehreren Threads und erreichen über Puffer eine Synchronisierung zwischen Threads. Mit der Wartegruppe wird eine Gruppe von Threads verwaltet, die darauf wartet, dass eine bestimmte Bedingung erfüllt wird, bevor sie gleichzeitig ausgeführt wird. Diese beiden Technologien können das Synchronisationsproblem zwischen Threads bei der Multithread-Programmierung effektiv lösen und die Parallelitätsleistung und Zuverlässigkeit des Programms verbessern.

Frageninhalt

Ich verwende waitgroupbuffered 通道时遇到问题。问题是 waitgroup, um zu schließen, bevor der Kanal vollständig gelesen ist, wodurch mein Kanal halb gelesen wird und in der Mitte unterbrochen wird.

func main() {
    var wg sync.waitgroup
    var err error

    start := time.now()
    students := make([]studentdetails, 0)
    studentch := make(chan studentdetail, 10000)
    errorch := make(chan error, 1)

    wg.add(1)

    go s.getdetailstudents(rctx, studentch , errorch, &wg, s.link, false)
    go func(ch chan studentdetail, e chan error) {
    
    loop:
        for {
            select {
            case p, ok := <-ch:
                if ok {
                    l.printf("links %s: [%s]\n", p.title, p.link)
                    students = append(students, p)
                } else {
                    l.print("closed channel")
                    break loop
                }
            case err = <-e:
                if err != nil {
                    break
                }
            }
        }
    }(studentch, errorch)
    wg.wait()
    close(studentch)
    close(errorch)
    l.warnln("closed: all wait-groups completed!")
    l.warnf("total items fetched: %d", len(students))

    elapsed := time.since(start)
    l.warnf("operation took %s", elapsed)
}

Das Problem ist, dass diese Funktion recursive。我的意思是一些 http 调用来获取 students ist und dann je nach Bedingung weitere Aufrufe durchführt.

func (s Student) getDetailStudents(rCtx context.Context, content chan<- studentDetail, errorCh chan<- error, wg *sync.WaitGroup, url string, subSection bool) {
    util.MustNotNil(rCtx)
    L := logger.GetLogger(rCtx)
    defer func() {
        L.Println("Closing all waitgroup!")
        wg.Done()
    }()

    wc := getWC()
    httpClient := wc.Registry.MustHTTPClient()
    res, err := httpClient.Get(url)
    if err != nil {
        L.Fatal(err)
    }
    defer res.Body.Close()
    if res.StatusCode != 200 {
        L.Errorf("status code error: %d %s", res.StatusCode, res.Status)
        errorCh <- errors.New("service_status_code")
        return
    }

    // parse response and return error if found some through errorCh as done above.
    // decide page subSection based on response if it is more.
    if !subSection {
        wg.Add(1)
        go s.getDetailStudents(rCtx, content, errorCh, wg, link, true)
        // L.Warnf("total pages found %d", pageSub.Length()+1)
    }

    // Find students from response list and parse each Student
    students := s.parseStudentItemList(rCtx, item)
    for _, student := range students {
        content <- student
    }
 
    L.Warnf("Calling HTTP Service for %q with total %d record", url, elementsSub.Length())
}

Ändern Sie Variablen, um die ursprüngliche Codebasis zu vermeiden.

Das Problem besteht darin, dass die Schüler nach dem Zufallsprinzip vorgelesen werden, sobald die Wartegruppe abgeschlossen ist. Ich möchte mit der Ausführung fortfahren, bis alle Schüler es gelesen haben, und wenn ein Fehler auftritt, sollte es abbrechen, sobald ein Fehler auftritt.

Workaround

Sie müssen wissen, wann die empfangende Goroutine fertig ist. Die Wartegruppe tut dies für das Spawnen von Goroutinen. Ihr könnt also zwei Wartegruppen nutzen:

wg.Add(1)
go s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
wgReader.Add(1)
go func(ch chan studentDetail, e chan error) {
    defer wgReader.Done()
    ...
}
wg.Wait()
close(studentCh)
close(errorCh)
wgReader.Wait() // Wait for the readers to complete

Das obige ist der detaillierte Inhalt vonSynchrone Pufferkanäle und Wartegruppen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:stackoverflow.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen