一次又一次运行后,我有时会遇到这个问题。我知道这与计数器相关。当调用sync.waitgroup的done()方法的次数多于调用add()方法的次数时,它将抛出此错误。
如何解决这个问题?
我的代码创建了大小为 4 的批次,并对每个批次进行一些处理,但我在解决此恐慌时遇到了问题。
package main import ( "fmt" "sync" ) func main() { // create input channel input := make(chan int) // create wait group var wg sync.waitgroup // start batcher goroutine wg.add(1) go batcher(input, &wg) // send input values to the batcher for i := 1; i <= 10; i++ { input <- i } // close input channel close(input) // wait for batcher goroutine to finish wg.wait() } func batcher(input chan int, wg *sync.waitgroup) { // create batch channel with buffer of size 4 batch := make(chan int, 4) // create channel to synchronize worker goroutines done := make(chan bool) // create wait group for worker goroutines var workerwg sync.waitgroup // start worker goroutines for i := 0; i < 4; i++ { workerwg.add(1) go worker(batch, &workerwg, done) } // read input values and send to batch for value := range input { batch <- value if len(batch) == 4 { // wait for worker goroutines to finish processing batch workerwg.wait() // send batch to worker goroutines for i := 0; i < 4; i++ { workerwg.add(1) go sendbatch(batch, &workerwg, done) } } } // wait for worker goroutines to finish processing remaining batch workerwg.wait() // close done channel to notify that all batches have been processed close(done) wg.done() } func sendbatch(batch chan int, workerwg *sync.waitgroup, done chan bool) { // process batch for value := range batch { fmt.println("processing value:", value) } // notify worker goroutines that batch has been processed workerwg.done() select { case done <- true: default: // done channel has been closed } } func worker(batch chan int, workerwg *sync.waitgroup, done chan bool) { // process batches received from batch channel for batch := range batch { // process batch fmt.println("processing batch:", batch) workerwg.done() } // notify batcher goroutine that worker goroutine has finished select { case done <- true: default: // done channel has been closed } }
编写批处理程序的基本代码:
package main import ( "fmt" "sync" ) func main() { input := make(chan int) output := make(chan []int) var wg sync.waitgroup wg.add(2) // start the batcher goroutine go func() { batch := []int{} for value := range input { batch = append(batch, value) if len(batch) == 4 { output <- batch batch = []int{} } } if len(batch) > 0 { output <- batch } close(output) wg.done() }() // start the worker goroutine go func() { for batch := range output { sum := 0 for _, value := range batch { sum += value } fmt.printf("sum of batch %v: %d\n", batch, sum) } wg.done() }() // send input values to the batcher for _, v := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} { input <- v } close(input) // wait for both goroutines to finish wg.wait() }
Sum of batch [1 2 3 4]: 10 Sum of batch [5 6 7 8]: 26 Sum of batch [9 10]: 19
早期的设计有点复杂,我会尝试扩展这个基本设计。
根据这段代码:
for i := 0; i < 4; i++ { workerwg.add(1) go worker(batch, &workerwg, done) }
我认为 workerwg.done()
应该移到循环之外:
func worker(batch chan int, workerWg *sync.WaitGroup, done chan bool) { + defer workerWg.Done() // process batches received from batch channel for batch := range batch { // process batch fmt.Println("Processing batch:", batch) - workerWg.Done() } // notify batcher goroutine that worker goroutine has finished select { case done <- true: default: // done channel has been closed } }
但是batch
在demo中并没有关闭。所以事实上,goroutine 将永远运行,直到程序结束。
不知道是否还有其他问题。设计太复杂了。复杂的代码难以理解并且容易出错。考虑重新设计它。
以上是如何解决此问题:恐慌:同步:负数 WaitGroup 计数器的详细内容。更多信息请关注PHP中文网其他相关文章!