Home >Backend Development >Golang >Synchronous buffer channels and wait groups

Synchronous buffer channels and wait groups

PHPz
PHPzforward
2024-02-09 08:09:191182browse

Synchronous buffer channels and wait groups

php editor Zimo introduces synchronization buffer channels and waiting groups, which is a commonly used technology in concurrent programming. Synchronous buffer channels allow data transfer between multiple threads and achieve synchronization between threads through buffers. The wait group is used to manage a group of threads, waiting for a specific condition to be met before executing them simultaneously. These two technologies can effectively solve the synchronization problem between threads in multi-thread programming and improve the concurrency performance and reliability of the program.

Question content

I have a problem using waitgroup with buffered channels. The problem is that the waitgroup closes before the channel is fully read, which makes my channel read halfway through and break in the middle.

func main() {
    var wg sync.waitgroup
    var err error

    start := time.now()
    students := make([]studentdetails, 0)
    studentch := make(chan studentdetail, 10000)
    errorch := make(chan error, 1)

    wg.add(1)

    go s.getdetailstudents(rctx, studentch , errorch, &wg, s.link, false)
    go func(ch chan studentdetail, e chan error) {
    
    loop:
        for {
            select {
            case p, ok := <-ch:
                if ok {
                    l.printf("links %s: [%s]\n", p.title, p.link)
                    students = append(students, p)
                } else {
                    l.print("closed channel")
                    break loop
                }
            case err = <-e:
                if err != nil {
                    break
                }
            }
        }
    }(studentch, errorch)
    wg.wait()
    close(studentch)
    close(errorch)
    l.warnln("closed: all wait-groups completed!")
    l.warnf("total items fetched: %d", len(students))

    elapsed := time.since(start)
    l.warnf("operation took %s", elapsed)
}

The problem is that this function is recursive. What I mean is some http calls to get students and then some more calls based on conditions.

func (s Student) getDetailStudents(rCtx context.Context, content chan<- studentDetail, errorCh chan<- error, wg *sync.WaitGroup, url string, subSection bool) {
    util.MustNotNil(rCtx)
    L := logger.GetLogger(rCtx)
    defer func() {
        L.Println("Closing all waitgroup!")
        wg.Done()
    }()

    wc := getWC()
    httpClient := wc.Registry.MustHTTPClient()
    res, err := httpClient.Get(url)
    if err != nil {
        L.Fatal(err)
    }
    defer res.Body.Close()
    if res.StatusCode != 200 {
        L.Errorf("status code error: %d %s", res.StatusCode, res.Status)
        errorCh <- errors.New("service_status_code")
        return
    }

    // parse response and return error if found some through errorCh as done above.
    // decide page subSection based on response if it is more.
    if !subSection {
        wg.Add(1)
        go s.getDetailStudents(rCtx, content, errorCh, wg, link, true)
        // L.Warnf("total pages found %d", pageSub.Length()+1)
    }

    // Find students from response list and parse each Student
    students := s.parseStudentItemList(rCtx, item)
    for _, student := range students {
        content <- student
    }
 
    L.Warnf("Calling HTTP Service for %q with total %d record", url, elementsSub.Length())
}

Change variables to avoid original code base.

The problem is that once the waiting group is completed, students are randomly read. I want to keep executing until all students have read, and if an error occurs, it should break as soon as it encounters an error.

Solution

You need to know when the receiving goroutine is finished. The waitgroup does this for spawning goroutines. So you can use two wait groups:

wg.Add(1)
go s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
wgReader.Add(1)
go func(ch chan studentDetail, e chan error) {
    defer wgReader.Done()
    ...
}
wg.Wait()
close(studentCh)
close(errorCh)
wgReader.Wait() // Wait for the readers to complete

The above is the detailed content of Synchronous buffer channels and wait groups. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:stackoverflow.com. If there is any infringement, please contact admin@php.cn delete