首頁 >後端開發 >Golang >儘管將清單分割成由各個 goroutine 處理的較小的區塊,為什麼「moving_avg_concurrent2」的效能沒有隨著並發性的增加而提高?

儘管將清單分割成由各個 goroutine 處理的較小的區塊,為什麼「moving_avg_concurrent2」的效能沒有隨著並發性的增加而提高?

Linda Hamilton
Linda Hamilton原創
2024-12-23 16:38:13172瀏覽

Why is the performance of `moving_avg_concurrent2` not improving with increased concurrency, despite splitting the list into smaller chunks processed by individual goroutines?

為什麼 moving_avg_concurrent2 的效能無法隨著同時執行的增加而提升?

moving_avg_concurrent2 將清單拆分為較小的片段,並使用單一 goroutine 處理每個片段。出於某種原因(目前尚不清楚原因),由於某些原因,使用一個 goroutine 的函數比 moving_avg_serial4 更快,但使用多個 goroutine 的效能開始比 moving_avg_serial4 差。

為什麼 moving_avg_concurrent3 比 moving_avg_serial4 慢很多?

使用一個 goroutine 時,moving_avg_concurrent3 的效能比 moving_avg_serial4 差。雖然增加 num_goroutines 可以提高效能,但仍然比 moving_avg_serial4 差。

即使 goroutine 是輕量級的,它們也並非完全免費,是否可能產生的開銷如此之大,以至於速度甚至低於 moving_avg_serial4?

是的,雖然 goroutine 比較輕量級,但它們並不是免費的。當使用多個 goroutine 時,啟動、管理和調度這些 goroutine 的開銷可能會超過提升的並行度所獲得的好處。

程式碼

函數:

// 返回包含输入移动平均值的列表(已提供,即未优化)
func moving_avg_serial(input []float64, window_size int) []float64 {
    first_time := true
    var output = make([]float64, len(input))
    if len(input) > 0 {
        var buffer = make([]float64, window_size)
        // 初始化缓冲区为 NaN
        for i := range buffer {
            buffer[i] = math.NaN()
        }
        for i, val := range input {
            old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
            buffer[int((math.Mod(float64(i), float64(window_size))))] = val
            if !NaN_in_slice(buffer) && first_time {
                sum := 0.0
                for _, entry := range buffer {
                    sum += entry
                }
                output[i] = sum / float64(window_size)
                first_time = false
            } else if i > 0 && !math.IsNaN(output[i-1]) && !NaN_in_slice(buffer) {
                output[i] = output[i-1] + (val-old_val)/float64(window_size) // 无循环的解决方案
            } else {
                output[i] = math.NaN()
            }
        }
    } else { // 空输入
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// 返回包含输入移动平均值的列表
// 重新排列控制结构以利用短路求值
func moving_avg_serial4(input []float64, window_size int) []float64 {
    first_time := true
    var output = make([]float64, len(input))
    if len(input) > 0 {
        var buffer = make([]float64, window_size)
        // 初始化缓冲区为 NaN
        for i := range buffer {
            buffer[i] = math.NaN()
        }
        for i := range input {
            //            fmt.Printf("in mvg_avg4: i=%v\n", i)
            old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
            buffer[int((math.Mod(float64(i), float64(window_size))))] = input[i]
            if first_time && !NaN_in_slice(buffer) {
                sum := 0.0
                for j := range buffer {
                    sum += buffer[j]
                }
                output[i] = sum / float64(window_size)
                first_time = false
            } else if i > 0 && !math.IsNaN(output[i-1]) /* && !NaN_in_slice(buffer)*/ {
                output[i] = output[i-1] + (input[i]-old_val)/float64(window_size) // 无循环的解决方案
            } else {
                output[i] = math.NaN()
            }
        }
    } else { // 空输入
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// 返回包含输入移动平均值的列表
// 将列表拆分为较小的片段以使用 goroutine,但不使用串行版本,即我们仅在开头具有 NaN,因此希望减少一些开销
// 仍然不能扩展(随着大小和 num_goroutines 的增加,性能下降)
func moving_avg_concurrent2(input []float64, window_size, num_goroutines int) []float64 {
    var output = make([]float64, window_size-1, len(input))
    for i := 0; i < window_size-1; i++ {
        output[i] = math.NaN()
    }
    if len(input) > 0 {
        num_items := len(input) - (window_size - 1)
        var barrier_wg sync.WaitGroup
        n := num_items / num_goroutines
        go_avg := make([][]float64, num_goroutines)
        for i := 0; i < num_goroutines; i++ {
            go_avg[i] = make([]float64, 0, num_goroutines)
        }

        for i := 0; i < num_goroutines; i++ {
            barrier_wg.Add(1)
            go func(go_id int) {
                defer barrier_wg.Done()

                // 计算边界
                var start, stop int
                start = go_id*int(n) + (window_size - 1) // 开始索引
                // 结束索引
                if go_id != (num_goroutines - 1) {
                    stop = start + n // 结束索引
                } else {
                    stop = num_items + (window_size - 1) // 结束索引
                }

                loc_avg := moving_avg_serial4(input[start-(window_size-1):stop], window_size)

                loc_avg = make([]float64, stop-start)
                current_sum := 0.0
                for i := start - (window_size - 1); i < start+1; i++ {
                    current_sum += input[i]
                }
                loc_avg[0] = current_sum / float64(window_size)
                idx := 1

                for i := start + 1; i < stop; i++ {
                    loc_avg[idx] = loc_avg[idx-1] + (input[i]-input[i-(window_size)])/float64(window_size)
                    idx++
                }

                go_avg[go_id] = append(go_avg[go_id], loc_avg...)

            }(i)
        }
        barrier_wg.Wait()

        for i := 0; i < num_goroutines; i++ {
            output = append(output, go_avg[i]...)
        }

    } else { // 空输入
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// 返回包含输入移动平均值的列表
// 模式改变,我们选择主工作者模式并生成将由 goroutine 计算的每个窗口
func compute_window_avg(input, output []float64, start, end int) {
    sum := 0.0
    size := end - start
    for _, val := range input[start:end] {
        sum += val
    }
    output[end-1] = sum / float64(size)
}

func moving_avg_concurrent3(input []float64, window_size, num_goroutines int) []float64 {
    var output = make([]float64, window_size-1, len(input))
    for i := 0; i < window_size-1; i++ {
        output[i] = math.NaN()
    }
    if len(input) > 0 {
        num_windows := len(input) - (window_size - 1)
        var output = make([]float64, len(input))
        for i := 0; i < window_size-1; i++ {

以上是儘管將清單分割成由各個 goroutine 處理的較小的區塊,為什麼「moving_avg_concurrent2」的效能沒有隨著並發性的增加而提高?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn