首頁  >  文章  >  後端開發  >  Go Singleflight 融入您的程式碼中,而不是您的資料庫中

Go Singleflight 融入您的程式碼中,而不是您的資料庫中

Linda Hamilton
Linda Hamilton原創
2024-11-05 12:27:02588瀏覽

原文發佈在VictoriaMetrics部落格:https://victoriametrics.com/blog/go-singleflight/

這篇文章是關於 Go 中處理並發的系列文章的一部分:

  • Gosync.Mutex:正常與飢餓模式
  • Gosync.WaitGroup 與對齊問題
  • Gosync.Pool 及其背後的機制
  • Gosync.Cond,最被忽略的同步機制
  • Gosync.Map:適合正確工作的正確工具
  • Go Sync.Once 很簡單...真的嗎?
  • Go Singleflight 融入您的程式碼,而不是您的資料庫(我們在這裡)

Go Singleflight Melts in Your Code, Not in Your DB

Go Singleflight 融入您的程式碼,而不是您的資料庫

因此,當您同時收到多個請求相同的資料時,預設行為是每個請求都會單獨存取資料庫以獲取相同的資訊。這意味著您最終會執行多次相同的查詢,老實說,這效率很低。

Go Singleflight Melts in Your Code, Not in Your DB

多個相同的請求到達資料庫

它最終會為資料庫帶來不必要的負載,這可能會減慢一切,但有一種方法可以解決這個問題。

這個想法是只有第一個請求實際上才會傳送到資料庫。其餘請求等待第一個請求完成。一旦資料從初始請求返回,其他請求就會得到相同的結果 - 不需要額外的查詢。

Go Singleflight Melts in Your Code, Not in Your DB

singleflight 如何抑制重複請求

那麼,現在您已經很清楚這篇文章的內容了,對吧?

單程航班

Go 中的 singleflight 套件是專門為處理我們剛才討論的問題而建造的。請注意,它不是標準庫的一部分,但由 Go 團隊維護和開發。

singleflight 的作用是確保只有一個 goroutine 實際執行該操作,例如從資料庫取得資料。它只允許在任何給定時刻對同一條資料(稱為“密鑰”)執行一次“進行中”(正在進行的)操作。

因此,如果其他 goroutine 在該操作仍在進行時請求相同的資料(相同的鍵),它們只會等待。然後,當第一個操作完成時,所有其他操作都會得到相同的結果,而無需再次執行該操作。

好了,說得夠多了,讓我們深入了解 singleflight 的實際運作原理:

var callCount atomic.Int32
var wg sync.WaitGroup

// Simulate a function that fetches data from a database
func fetchData() (interface{}, error) {
    callCount.Add(1)
    time.Sleep(100 * time.Millisecond)
    return rand.Intn(100), nil
}

// Wrap the fetchData function with singleflight
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    time.Sleep(time.Duration(id) * 40 * time.Millisecond)
    v, err, shared := g.Do("key-fetch-data", fetchData)
    if err != nil {
        return err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared)
    return nil
}

func main() {
    var g singleflight.Group

    // 5 goroutines to fetch the same data
    const numGoroutines = 5
    wg.Add(numGoroutines)

    for i := 0; i < numGoroutines; i++ {
        go fetchDataWrapper(&g, i)
    }

    wg.Wait()
    fmt.Printf("Function was called %d times\n", callCount.Load())
}

// Output:
// Goroutine 0: result: 90, shared: true
// Goroutine 2: result: 90, shared: true
// Goroutine 1: result: 90, shared: true
// Goroutine 3: result: 13, shared: true
// Goroutine 4: result: 13, shared: true
// Function was called 2 times

這裡發生了什麼事:

我們正在模擬這樣的情況:5 個 goroutine 幾乎同時嘗試獲取相同的數據,間隔 60 毫秒。為了簡單起見,我們使用隨機數來模擬從資料庫中獲得的資料。

使用 singleflight.Group,我們確保只有第一個 goroutine 實際運行 fetchData(),其餘的 goroutine 等待結果。

行 v, err, shared := g.Do("key-fetch-data", fetchData) 分配一個唯一的鍵 ("key-fetch-data") 來追蹤這些請求。因此,如果另一個 goroutine 請求相同的鍵,而第一個 goroutine 仍在獲取數據,它會等待結果而不是開始新的呼叫。

Go Singleflight Melts in Your Code, Not in Your DB

單次飛行示範

一旦第一個呼叫完成,任何等待的 goroutine 都會得到相同的結果,正如我們在輸出中看到的那樣。雖然我們有 5 個 goroutine 請求數據,但 fetchData 只運行了兩次,這是一個巨大的提升。

共享標誌確認結果已在多個 goroutine 之間重複使用。

「但是為什麼第一個 goroutine 的共享標誌為 true?我以為只有等待的 goroutine 才會共用 == true?」

是的,如果您認為只有等待的 goroutine 應該共享 == true,這可能會感覺有點違反直覺。

問題是,g.Do 中的共享變數告訴您結果是否在多個呼叫者之間共用。它基本上是在說:「嘿,這個結果被多個呼叫者使用了。」這與誰運行該函數無關,它只是一個信號,表明結果在多個 goroutine 之間重複使用。

「我有緩存,為什麼我需要單次飛行?」

簡短的答案是:快取和 singleflight 解決不同的問題,而且它們實際上可以很好地協同工作。

在使用外部快取(如 Redis 或 Memcached)的設定中,singleflight 增加了額外的保護層,不僅為您的資料庫,也為快取本身。

Go Singleflight Melts in Your Code, Not in Your DB

Singleflight 與快取系統一起工作

此外,singleflight 有助於防止快取未命中風暴(有時稱為「快取踩踏」)。

通常,當請求請求資料時,如果資料在快取中,那就太好了 - 這是快取命中。如果資料不在快取中,則為快取未命中。假設在重建快取之前有 10,000 個請求同時到達系統,資料庫可能會突然同時受到 10,000 個相同查詢的衝擊。

在此高峰期間,singleflight 確保這 10,000 個請求中只有一個真正到達資料庫。

但是稍後,在內部實作部分,我們將看到 singleflight 使用全域鎖來保護正在進行的呼叫的映射,這可能會成為每個 goroutine 的單點爭用。這可能會減慢速度,尤其是在處理高並發時。

下面的模型可能更適合具有多個 CPU 的機器:

Go Singleflight Melts in Your Code, Not in Your DB

緩存未命中時的單次飛行

在此設定中,我們只在發生快取未命中時使用 singleflight。

單次航班營運

要使用 singleflight,您首先建立一個 Group 對象,它是追蹤連結到特定鍵的正在進行的函數呼叫的核心結構。

它有兩個有助於防止重複呼叫的關鍵方法:

  • group.Do(key, func):執行函數,同時抑制重複請求。當您呼叫 Do 時,您傳入一個鍵和一個函數,如果該鍵沒有發生其他執行,則該函數將運行。如果同一個鍵已經有一個執行正在進行,您的呼叫將阻塞,直到第一個執行完成並傳回相同的結果。
  • group.DoChan(key, func):與 group.Do 類似,但它不是阻塞,而是為您提供一個通道(

我們已經在示範中了解如何使用 g.Do(),讓我們看看如何使用經過修改的包裝函數的 g.DoChan() :

var callCount atomic.Int32
var wg sync.WaitGroup

// Simulate a function that fetches data from a database
func fetchData() (interface{}, error) {
    callCount.Add(1)
    time.Sleep(100 * time.Millisecond)
    return rand.Intn(100), nil
}

// Wrap the fetchData function with singleflight
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    time.Sleep(time.Duration(id) * 40 * time.Millisecond)
    v, err, shared := g.Do("key-fetch-data", fetchData)
    if err != nil {
        return err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared)
    return nil
}

func main() {
    var g singleflight.Group

    // 5 goroutines to fetch the same data
    const numGoroutines = 5
    wg.Add(numGoroutines)

    for i := 0; i < numGoroutines; i++ {
        go fetchDataWrapper(&g, i)
    }

    wg.Wait()
    fmt.Printf("Function was called %d times\n", callCount.Load())
}

// Output:
// Goroutine 0: result: 90, shared: true
// Goroutine 2: result: 90, shared: true
// Goroutine 1: result: 90, shared: true
// Goroutine 3: result: 13, shared: true
// Goroutine 4: result: 13, shared: true
// Function was called 2 times
// Wrap the fetchData function with singleflight using DoChan
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    ch := g.DoChan("key-fetch-data", fetchData)

    res := <-ch
    if res.Err != nil {
        return res.Err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared)
    return nil
}

說實話,這裡使用 DoChan() 與 Do() 相比並沒有太大變化,因為我們仍在等待通道接收操作 (

DoChan() 的閃光點是當你想要啟動一個操作並在不阻塞 goroutine 的情況下執行其他操作。例如,您可以使用通道更乾淨地處理逾時或取消:

package singleflight

type Result struct {
    Val    interface{}
    Err    error
    Shared bool
}

此範例也提出了您在現實場景中可能遇到的一些問題:

  • 由於網路反應緩慢、資料庫無回應等原因,第一個 Goroutine 可能會比預期花費更長的時間。在這種情況下,所有其他等待的 Goroutine 的卡住時間都會比您希望的要長。超時可以在這裡提供幫助,但任何新請求仍然會在第一個請求之後等待。
  • 您取得的資料可能會經常更改,因此當第一個請求完成時,結果可能已經過時。這意味著我們需要一種方法來使金鑰無效並觸發新的執行。

是的,singleflight 提供了一種使用 group.Forget(key) 方法來處理此類情況的方法,它可以讓您放棄正在進行的執行。

Forget() 方法從追蹤正在進行的函數呼叫的內部映射中刪除一個鍵。這有點像“使鍵無效”,因此如果您使用該鍵再次呼叫 g.Do(),它將像新請求一樣執行該函數,而不是等待上一次執行完成。

讓我們更新範例以使用 Forget() 並查看該函數實際被呼叫了多少次:

var callCount atomic.Int32
var wg sync.WaitGroup

// Simulate a function that fetches data from a database
func fetchData() (interface{}, error) {
    callCount.Add(1)
    time.Sleep(100 * time.Millisecond)
    return rand.Intn(100), nil
}

// Wrap the fetchData function with singleflight
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    time.Sleep(time.Duration(id) * 40 * time.Millisecond)
    v, err, shared := g.Do("key-fetch-data", fetchData)
    if err != nil {
        return err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared)
    return nil
}

func main() {
    var g singleflight.Group

    // 5 goroutines to fetch the same data
    const numGoroutines = 5
    wg.Add(numGoroutines)

    for i := 0; i < numGoroutines; i++ {
        go fetchDataWrapper(&g, i)
    }

    wg.Wait()
    fmt.Printf("Function was called %d times\n", callCount.Load())
}

// Output:
// Goroutine 0: result: 90, shared: true
// Goroutine 2: result: 90, shared: true
// Goroutine 1: result: 90, shared: true
// Goroutine 3: result: 13, shared: true
// Goroutine 4: result: 13, shared: true
// Function was called 2 times

Goroutine 0 和 Goroutine 1 都使用相同的鍵(“key-fetch-data”)呼叫 Do(),它們的請求合併為一次執行,結果在兩個 Goroutine 之間共用。

Goroutine 2,另一方面,在執行 Do() 之前呼叫 Forget()。這會清除與「key-fetch-data」相關的任何先前結果,因此它會觸發該函數的新執行。

總而言之,雖然 singleflight 很有用,但它仍然可能存在一些邊緣情況,例如:

  • 如果第一個 goroutine 被阻塞的時間太長,所有等待它的其他 goroutine 也會被卡住。在這種情況下,使用逾時上下文或帶有逾時的 select 語句可能是更好的選擇。
  • 如果第一個請求回傳錯誤或恐慌,相同的錯誤或恐慌將傳播到等待結果的所有其他 goroutine。

如果您已經注意到我們討論過的所有問題,讓我們深入到下一部分來討論 singleflight 的實際工作原理。

單次飛行如何運作

透過使用singleflight,你可能已經對它的內部運作有了基本的了解,singleflight的整個實作只有大約150行程式碼。

基本上,每個唯一的鍵都有一個管理其執行的結構。如果 goroutine 呼叫 Do() 並發現 key 已經存在,則該呼叫將被阻塞,直到第一次執行完成,結構如下:

// Wrap the fetchData function with singleflight using DoChan
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    ch := g.DoChan("key-fetch-data", fetchData)

    res := <-ch
    if res.Err != nil {
        return res.Err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared)
    return nil
}

這裡使用了兩個同步原語:

  • 群組互斥鎖 (g.mu):此互斥鎖保護整個鍵映射,而不是每個鍵一個鎖,它確保新增或刪除鍵是執行緒安全的。
  • WaitGroup (g.call.wg):WaitGroup 用於等待與特定鍵關聯的第一個 goroutine 完成其工作。

這裡我們將重點放在 group.Do() 方法,因為另一個方法 group.DoChan() 的工作方式類似。 group.Forget() 方法也很簡單,因為它只是從地圖中刪除鍵。

當你呼叫 group.Do() 時,它所做的第一件事就是鎖定整個呼叫映射 (g.mu)。

「這對效能不是很不利嗎?」

是的,它可能不適合每種情況下的效能(總是先進行基準測試),因為 singleflight 鎖定了整個金鑰。如果您的目標是獲得更好的效能或大規模工作,一個好的方法是分片或分發金鑰。您可以將負載分散到多個組,而不是僅使用單一飛行組,有點像「多重飛行」

作為參考,請查看此儲存庫:shardedsingleflight。

現在,一旦獲得鎖,該群組就會查看內部映射 (g.m),如果已經有對給定密鑰的正在進行或已完成的呼叫。該地圖追蹤任何正在進行或已完成的工作,並將鍵映射到相應的任務。

如果找到該鍵(另一個 goroutine 已經在運行該任務),我們只需增加一個計數器(c.dups)來追蹤重複請求,而不是開始新的呼叫。然後,goroutine 釋放鎖定並透過在關聯的 WaitGroup 上呼叫 call.wg.Wait() 來等待原始任務完成。

當原始任務完成時,這個 goroutine 會取得結果並避免再次執行該任務。

var callCount atomic.Int32
var wg sync.WaitGroup

// Simulate a function that fetches data from a database
func fetchData() (interface{}, error) {
    callCount.Add(1)
    time.Sleep(100 * time.Millisecond)
    return rand.Intn(100), nil
}

// Wrap the fetchData function with singleflight
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    time.Sleep(time.Duration(id) * 40 * time.Millisecond)
    v, err, shared := g.Do("key-fetch-data", fetchData)
    if err != nil {
        return err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared)
    return nil
}

func main() {
    var g singleflight.Group

    // 5 goroutines to fetch the same data
    const numGoroutines = 5
    wg.Add(numGoroutines)

    for i := 0; i < numGoroutines; i++ {
        go fetchDataWrapper(&g, i)
    }

    wg.Wait()
    fmt.Printf("Function was called %d times\n", callCount.Load())
}

// Output:
// Goroutine 0: result: 90, shared: true
// Goroutine 2: result: 90, shared: true
// Goroutine 1: result: 90, shared: true
// Goroutine 3: result: 13, shared: true
// Goroutine 4: result: 13, shared: true
// Function was called 2 times

如果沒有其他 Goroutine 正在處理該鍵,則當前 Goroutine 負責執行該任務。

此時,我們建立一個新的呼叫對象,將其新增至映射中,並初始化其 WaitGroup。然後,我們解鎖互斥體並繼續透過輔助方法 g.doCall(c, key, fn) 自行執行任務。當任務完成時,任何等待的 goroutine 都會被 wg.Wait() 呼叫解除阻塞。

這裡沒什麼太瘋狂的,除了我們如何處理錯誤之外,還有三種可能的情況:

  • 如果函數發生恐慌,我們會捕捉它,將其包裝在一個恐慌錯誤中,然後引發恐慌。
  • 如果函數回傳 errGoexit,我們呼叫 runtime.Goexit() 來正確退出 goroutine。
  • 如果這只是一個正常錯誤,我們會在呼叫時設定該錯誤。

這是輔助方法 g.doCall() 中事情開始變得更加聰明的地方。

「等等,什麼是runtime.Goexit()?」

在深入程式碼之前,讓我快速解釋一下,runtime.Goexit() 用來停止 goroutine 的執行。

當 goroutine 呼叫 Goexit() 時,它會停止,並且任何延遲函數仍然按照後進先出 (LIFO) 順序運行,就像正常情況一樣。它與恐慌類似,但有一些區別:

  • 它不會引發恐慌,所以你無法用recover()捕捉它。
  • 只有呼叫 Goexit() 的 goroutine 被終止,所有其他 goroutine 都保持正常運作。

現在,這是一個有趣的怪癖(與我們的主題沒有直接關係,但值得一提)。如果你在主協程中呼叫runtime.Goexit()(例如在main()內部),請檢查一下:

var callCount atomic.Int32
var wg sync.WaitGroup

// Simulate a function that fetches data from a database
func fetchData() (interface{}, error) {
    callCount.Add(1)
    time.Sleep(100 * time.Millisecond)
    return rand.Intn(100), nil
}

// Wrap the fetchData function with singleflight
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    time.Sleep(time.Duration(id) * 40 * time.Millisecond)
    v, err, shared := g.Do("key-fetch-data", fetchData)
    if err != nil {
        return err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared)
    return nil
}

func main() {
    var g singleflight.Group

    // 5 goroutines to fetch the same data
    const numGoroutines = 5
    wg.Add(numGoroutines)

    for i := 0; i < numGoroutines; i++ {
        go fetchDataWrapper(&g, i)
    }

    wg.Wait()
    fmt.Printf("Function was called %d times\n", callCount.Load())
}

// Output:
// Goroutine 0: result: 90, shared: true
// Goroutine 2: result: 90, shared: true
// Goroutine 1: result: 90, shared: true
// Goroutine 3: result: 13, shared: true
// Goroutine 4: result: 13, shared: true
// Function was called 2 times

發生的情況是 Goexit() 終止了主 goroutine,但如果還有其他 goroutine 仍在運行,程式會繼續運行,因為只要至少有一個 goroutine 處於活動狀態,Go 運行時就會保持活動狀態。然而,一旦沒有剩下 goroutines,它就會因“no goroutine”錯誤而崩潰,這是一個有趣的小角落案例。

現在,回到我們的程式碼,如果runtime.Goexit()僅終止目前的goroutine並且無法被recover()捕獲,我們如何偵測它是否被呼叫?

關鍵在於,當呼叫runtime.Goexit()時,其後面的任何程式碼都不會被執行。

// Wrap the fetchData function with singleflight using DoChan
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    ch := g.DoChan("key-fetch-data", fetchData)

    res := <-ch
    if res.Err != nil {
        return res.Err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared)
    return nil
}

在上面的情況下,呼叫runtime.Goexit()之後,normalReturn = true這一行永遠不會被執行。因此,在 defer 內部,我們可以檢查 normalReturn 是否仍然為 false,以偵測是否呼叫了特殊方法。

下一步是確定任務是否出現恐慌。為此,我們使用recover()作為正常返回,儘管singleflight中的實際程式碼有點微妙:

package singleflight

type Result struct {
    Val    interface{}
    Err    error
    Shared bool
}

這段程式碼不是直接在recover區塊內設定recovered = true,而是透過在recover()區塊之後將recovery設定為最後一行來獲得一點奇特的效果。

那麼,為什麼這會起作用?

當調用runtime.Goexit()時,它會終止整個goroutine,就像panic()一樣。然而,如果panic()被恢復,只有panic()和recover()之間的函數鏈被終止,而不是整個goroutine。

Go Singleflight Melts in Your Code, Not in Your DB

singleflight中panic和runtime.Goexit()的處理

這就是為什麼在包含recover()的defer之外設定recovered = true,它只在兩種情況下執行:當函數正常完成時或當恐慌恢復時,但在呼叫runtime.Goexit()時不會執行。

接下來,我們將討論如何處理每個案例。

func fetchDataWrapperWithTimeout(g *singleflight.Group, id int) error {
    defer wg.Done()

    ch := g.DoChan("key-fetch-data", fetchData)
    select {
    case res := <-ch:
        if res.Err != nil {
            return res.Err
        }
        fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared)
    case <-time.After(50 * time.Millisecond):
        return fmt.Errorf("timeout waiting for result")
    }

  return nil
}

如果任務在執行過程中發生緊急情況,則會捕獲緊急情況並將其保存在 c.err 中作為緊急錯誤,其中包含緊急值和堆疊追蹤。 singleflight 捕捉到恐慌並優雅地清理,但它不會吞掉它,它會在處理其狀態後重新拋出恐慌。

這意味著執行任務的 Goroutine(第一個開始執行操作的 Goroutine)會發生恐慌,並且所有其他等待結果的 Goroutine 也會發生恐慌。

由於這種恐慌發生在開發人員的程式碼中,因此我們有責任妥善處理它。

現在,我們仍然需要考慮一種特殊情況:當其他 goroutine 使用 group.DoChan() 方法並透過通道等待結果時。在這種情況下,singleflight 不能在這些 goroutine 中發生恐慌。相反,它會執行所謂的不可恢復的恐慌(gopanic(e)),這會使我們的應用程式崩潰。

最後,如果任務呼叫了runtime.Goexit(),則不需要採取任何進一步的操作,因為goroutine已經處於關閉過程中,我們只是讓它發生而不干擾。

差不多就是這樣,除了我們討論過的特殊情況之外,沒有什麼太複雜的。

保持聯繫

大家好,我是 Phuong Le,VictoriaMetrics 的軟體工程師。上述寫作風格著重於清晰和簡單,以易於理解的方式解釋概念,即使它並不總是與學術精度完全一致。

如果您發現任何過時的內容或有疑問,請隨時與我們聯繫。您可以在 X(@func25) 上留言給我。

您可能感興趣的其他一些帖子:

  • Go I/O 讀取器、寫入器和動態資料。
  • Go 陣列如何運作以及如何使用 For-Range
  • Go 中的切片:變大或回家
  • Go Maps 解釋:鍵值對實際上是如何儲存的
  • Golang Defer:從基礎到陷阱
  • 供應商,或 go mod 供應商:這是什麼?

我們是誰

如果您想監控您的服務、追蹤指標並了解一切的執行情況,您可能需要查看 VictoriaMetrics。這是一種快速、開源且節省成本的方式來監控您的基礎設施。

我們是 Gophers,熱愛研究、實驗和分享 Go 及其生態系統知識的愛好者。

以上是Go Singleflight 融入您的程式碼中,而不是您的資料庫中的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn