首頁  >  文章  >  後端開發  >  Go 中的扇出-扇南模式

Go 中的扇出-扇南模式

PHPz
PHPz原創
2024-07-29 06:32:53624瀏覽

Fanout-Fanin Pattern in Go

在之前的兩篇文章中,我們分別介紹了 Fanout 和 Fanin。通常情況下,我們將它們一起使用,因為我們有一個資料流,我們希望單獨對專案進行操作,並且可以使用並發安全地執行此操作。因此,我們扇出到多個工作線程,然後扇回到單一流。

例如,假設您有一個很大的日誌檔案。您可以將文件分成多個區塊,允許每個工作人員同時操作文件的不同部分,然後合併結果。

如果您遵循前兩篇文章,這種模式是顯而易見的。如果您不確定,請參閱上面的連結。

// produce is simulating our single input as a channel
func produce() chan int {
    ch := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            ch <- rand.Intn(50)
        }
        fmt.Printf("producer done\n")
        close(ch) // this is important!!!
    }()
    return ch
}

func worker(id int, jobs chan int, out chan OddEven, wg *sync.WaitGroup) {
    for value := range jobs {
        odd := "even"
        if (value & 1) == 1 {
            odd = "odd"
        }
        out <- OddEven{
            Number:  value,
            OddEven: odd,
        }
    }
    close(out) // remember this
    wg.Done()
}

// OddEven struct will be the result of the work done by each fanout thread
// and be the fanin data
type OddEven struct {
    Number  int
    OddEven string
}

func fanin(inputs []chan OddEven) chan OddEven {
    output := make(chan OddEven)

    var wg sync.WaitGroup
    for i, input := range inputs {
        wg.Add(1)
                // explicit params to capture loop vars
        go func(id int, input chan OddEven, output chan OddEven, wg *sync.WaitGroup) {
            for value := range input {
                output <- value
            }
            fmt.Printf("done merging source %d\n", id)
            wg.Done()
        }(i, input, output, &wg)
    }
    go func() {
        wg.Wait()
        close(output) // this is important!!!
    }()

    return output
}

func main() {
    // simulate the input data stream
    inputCh := produce()

    numWorkers := 3
    // fan-out to send data items to workers as individual jobs
    var wg sync.WaitGroup
    workerResults := make([]chan OddEven, numWorkers)
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        workerResults[i] = make(chan OddEven)
        go worker(i, inputCh, workerResults[i], &wg)
    }
    go func() {
        wg.Wait()
    }()

    // fan-in the results
    results := fanin(workerResults)

    done := make(chan bool)
    go func() {
        for value := range results {
            fmt.Printf("got %d is %s\n", value.Number, value.OddEven)
        }
        close(done)
    }()
    <-done
    fmt.Println("done")
}

有一個 Produce() 函數可以建立模擬的數位輸入流。

有一個工作函數在輸入通道上運行,直到沒有更多資料為止。它對每個值「處理」輸入資料(確定該值是奇數還是偶數),然後將結果結構傳送到輸出通道。

請注意,當每個工作人員完成後,它會關閉其結果通道。這對於防止死鎖是必要的,因為否則 fanin 操作將休眠等待 chan 上的更多資料。

主線程從 Produce 取得輸入流,然後啟動許多工作線程,為每個工作線程提供自己的通道,以便發送結果。

這些結果通道隨後被傳送到 fanin 操作。為了扇入,我們建立一個通道來接收輸出,然後為每個工作通道啟動一個 goroutine。每個 goroutine 只是在通道上進行迭代,直到沒有更多資料為止,然後終止。 請記住,我們關閉了工作執行緒中的結果通道,這就是允許 for 迴圈終止的原因

請注意,我們使用 WaitGroup 進行 fanin 過程。這讓我們知道所有結果通道的所有結果何時已組合到輸出通道中。當發生這種情況時,我們關閉輸出通道,以便任何消耗輸出的下游執行緒都可以終止。

有了輸出通道中的所有數據,主執行緒就可以繼續顯示結果了。請注意,我們使用布林通道來防止主執行緒在一切完成之前終止;否則,它將終止進程。

請注意,還有另一種方法可以使用 select 語句進行扇入。這裡使用的技術更乾淨一些,因為我們可以增加或減少工人的數量。

另請注意,我們還沒有解決有關 SIGTERM 或 SIGINT 等提前終止的問題。這增加了一點複雜性。

你會如何實現這個? 扇出/扇入模式還有其他實作。請在下面留下您的評論和想法?

謝謝!

這篇文章以及本系列所有文章的程式碼可以在這裡找到

以上是Go 中的扇出-扇南模式的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn