Heim >Backend-Entwicklung >Golang >Synchrone Pufferkanäle und Wartegruppen
Der PHP-Editor Zimo führt synchronisierte Pufferkanäle und Wartegruppen ein, eine häufig verwendete Technologie bei der gleichzeitigen Programmierung. Synchrone Pufferkanäle ermöglichen die Datenübertragung zwischen mehreren Threads und erreichen über Puffer eine Synchronisierung zwischen Threads. Mit der Wartegruppe wird eine Gruppe von Threads verwaltet, die darauf wartet, dass eine bestimmte Bedingung erfüllt wird, bevor sie gleichzeitig ausgeführt wird. Diese beiden Technologien können das Synchronisationsproblem zwischen Threads bei der Multithread-Programmierung effektiv lösen und die Parallelitätsleistung und Zuverlässigkeit des Programms verbessern.
Ich verwende waitgroup
与 buffered
通道时遇到问题。问题是 waitgroup
, um zu schließen, bevor der Kanal vollständig gelesen ist, wodurch mein Kanal halb gelesen wird und in der Mitte unterbrochen wird.
func main() { var wg sync.waitgroup var err error start := time.now() students := make([]studentdetails, 0) studentch := make(chan studentdetail, 10000) errorch := make(chan error, 1) wg.add(1) go s.getdetailstudents(rctx, studentch , errorch, &wg, s.link, false) go func(ch chan studentdetail, e chan error) { loop: for { select { case p, ok := <-ch: if ok { l.printf("links %s: [%s]\n", p.title, p.link) students = append(students, p) } else { l.print("closed channel") break loop } case err = <-e: if err != nil { break } } } }(studentch, errorch) wg.wait() close(studentch) close(errorch) l.warnln("closed: all wait-groups completed!") l.warnf("total items fetched: %d", len(students)) elapsed := time.since(start) l.warnf("operation took %s", elapsed) }
Das Problem ist, dass diese Funktion recursive
。我的意思是一些 http 调用来获取 students
ist und dann je nach Bedingung weitere Aufrufe durchführt.
func (s Student) getDetailStudents(rCtx context.Context, content chan<- studentDetail, errorCh chan<- error, wg *sync.WaitGroup, url string, subSection bool) { util.MustNotNil(rCtx) L := logger.GetLogger(rCtx) defer func() { L.Println("Closing all waitgroup!") wg.Done() }() wc := getWC() httpClient := wc.Registry.MustHTTPClient() res, err := httpClient.Get(url) if err != nil { L.Fatal(err) } defer res.Body.Close() if res.StatusCode != 200 { L.Errorf("status code error: %d %s", res.StatusCode, res.Status) errorCh <- errors.New("service_status_code") return } // parse response and return error if found some through errorCh as done above. // decide page subSection based on response if it is more. if !subSection { wg.Add(1) go s.getDetailStudents(rCtx, content, errorCh, wg, link, true) // L.Warnf("total pages found %d", pageSub.Length()+1) } // Find students from response list and parse each Student students := s.parseStudentItemList(rCtx, item) for _, student := range students { content <- student } L.Warnf("Calling HTTP Service for %q with total %d record", url, elementsSub.Length()) }
Ändern Sie Variablen, um die ursprüngliche Codebasis zu vermeiden.
Das Problem besteht darin, dass die Schüler nach dem Zufallsprinzip vorgelesen werden, sobald die Wartegruppe abgeschlossen ist. Ich möchte mit der Ausführung fortfahren, bis alle Schüler es gelesen haben, und wenn ein Fehler auftritt, sollte es abbrechen, sobald ein Fehler auftritt.
Sie müssen wissen, wann die empfangende Goroutine fertig ist. Die Wartegruppe tut dies für das Spawnen von Goroutinen. Ihr könnt also zwei Wartegruppen nutzen:
wg.Add(1) go s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false) wgReader.Add(1) go func(ch chan studentDetail, e chan error) { defer wgReader.Done() ... } wg.Wait() close(studentCh) close(errorCh) wgReader.Wait() // Wait for the readers to complete
Das obige ist der detaillierte Inhalt vonSynchrone Pufferkanäle und Wartegruppen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!