首页  >  文章  >  后端开发  >  Go Singleflight 融入您的代码中,而不是您的数据库中

Go Singleflight 融入您的代码中,而不是您的数据库中

Linda Hamilton
Linda Hamilton原创
2024-11-05 12:27:02508浏览

原文发布在VictoriaMetrics博客上:https://victoriametrics.com/blog/go-singleflight/

这篇文章是关于 Go 中处理并发的系列文章的一部分:

  • Gosync.Mutex:正常和饥饿模式
  • Gosync.WaitGroup 和对齐问题
  • Gosync.Pool 及其背后的机制
  • Gosync.Cond,最被忽视的同步机制
  • Gosync.Map:适合正确工作的正确工具
  • Go Sync.Once 很简单...真的吗?
  • Go Singleflight 融入您的代码,而不是您的数据库(我们在这里)

Go Singleflight Melts in Your Code, Not in Your DB

Go Singleflight 融入您的代码,而不是您的数据库

因此,当您同时收到多个请求相同的数据时,默认行为是每个请求都会单独访问数据库以获取相同的信息。这意味着您最终会多次执行相同的查询,老实说,这效率很低。

Go Singleflight Melts in Your Code, Not in Your DB

多个相同的请求到达数据库

它最终会给数据库带来不必要的负载,这可能会减慢一切,但有一种方法可以解决这个问题。

这个想法是只有第一个请求实际上才会发送到数据库。其余请求等待第一个请求完成。一旦数据从初始请求返回,其他请求就会得到相同的结果 - 不需要额外的查询。

Go Singleflight Melts in Your Code, Not in Your DB

singleflight 如何抑制重复请求

那么,现在您已经很清楚这篇文章的内容了,对吧?

单程航班

Go 中的 singleflight 包是专门为处理我们刚才讨论的问题而构建的。请注意,它不是标准库的一部分,但由 Go 团队维护和开发。

singleflight 的作用是确保只有一个 goroutine 实际运行该操作,例如从数据库获取数据。它只允许在任何给定时刻对同一条数据(称为“密钥”)执行一次“进行中”(正在进行的)操作。

因此,如果其他 goroutine 在该操作仍在进行时请求相同的数据(相同的键),它们只会等待。然后,当第一个操作完成时,所有其他操作都会得到相同的结果,而无需再次运行该操作。

好了,说得够多了,让我们深入了解一下 singleflight 的实际工作原理:

var callCount atomic.Int32
var wg sync.WaitGroup

// Simulate a function that fetches data from a database
func fetchData() (interface{}, error) {
    callCount.Add(1)
    time.Sleep(100 * time.Millisecond)
    return rand.Intn(100), nil
}

// Wrap the fetchData function with singleflight
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    time.Sleep(time.Duration(id) * 40 * time.Millisecond)
    v, err, shared := g.Do("key-fetch-data", fetchData)
    if err != nil {
        return err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared)
    return nil
}

func main() {
    var g singleflight.Group

    // 5 goroutines to fetch the same data
    const numGoroutines = 5
    wg.Add(numGoroutines)

    for i := 0; i < numGoroutines; i++ {
        go fetchDataWrapper(&g, i)
    }

    wg.Wait()
    fmt.Printf("Function was called %d times\n", callCount.Load())
}

// Output:
// Goroutine 0: result: 90, shared: true
// Goroutine 2: result: 90, shared: true
// Goroutine 1: result: 90, shared: true
// Goroutine 3: result: 13, shared: true
// Goroutine 4: result: 13, shared: true
// Function was called 2 times

这里发生了什么:

我们正在模拟这样的情况:5 个 goroutine 几乎同时尝试获取相同的数据,间隔 60 毫秒。为了简单起见,我们使用随机数来模拟从数据库中获取的数据。

使用 singleflight.Group,我们确保只有第一个 goroutine 实际运行 fetchData(),其余的 goroutine 等待结果。

行 v, err, shared := g.Do("key-fetch-data", fetchData) 分配一个唯一的键 ("key-fetch-data") 来跟踪这些请求。因此,如果另一个 goroutine 请求相同的键,而第一个 goroutine 仍在获取数据,它会等待结果而不是开始新的调用。

Go Singleflight Melts in Your Code, Not in Your DB

单次飞行演示

一旦第一个调用完成,任何等待的 goroutine 都会得到相同的结果,正如我们在输出中看到的那样。虽然我们有 5 个 goroutine 请求数据,但 fetchData 只运行了两次,这是一个巨大的提升。

共享标志确认结果已在多个 goroutine 之间重用。

“但是为什么第一个 goroutine 的共享标志为 true?我以为只有等待的 goroutine 才会共享 == true?”

是的,如果您认为只有等待的 goroutine 应该共享 == true,这可能会感觉有点违反直觉。

问题是,g.Do 中的共享变量告诉您​​结果是否在多个调用者之间共享。它基本上是在说:“嘿,这个结果被多个调用者使用了。”这与谁运行该函数无关,它只是一个信号,表明结果在多个 goroutine 之间重用。

“我有缓存,为什么我需要单次飞行?”

简短的答案是:缓存和 singleflight 解决不同的问题,而且它们实际上可以很好地协同工作。

在使用外部缓存(如 Redis 或 Memcached)的设置中,singleflight 增加了额外的保护层,不仅为您的数据库,也为缓存本身。

Go Singleflight Melts in Your Code, Not in Your DB

Singleflight 与缓存系统一起工作

此外,singleflight 有助于防止缓存未命中风暴(有时称为“缓存踩踏”)。

通常,当请求请求数据时,如果数据在缓存中,那就太好了 - 这是缓存命中。如果数据不在缓存中,则为缓存未命中。假设在重建缓存之前有 10,000 个请求同时到达系统,数据库可能会突然同时受到 10,000 个相同查询的冲击。

在此高峰期间,singleflight 确保这 10,000 个请求中只有一个真正到达数据库。

但是稍后,在内部实现部分,我们将看到 singleflight 使用全局锁来保护正在进行的调用的映射,这可能成为每个 goroutine 的单点争用。这可能会减慢速度,尤其是在处理高并发时。

下面的模型可能更适合具有多个 CPU 的机器:

Go Singleflight Melts in Your Code, Not in Your DB

缓存未命中时的单次飞行

在此设置中,我们仅在发生缓存未命中时使用 singleflight。

单次航班运营

要使用 singleflight,您首先创建一个 Group 对象,它是跟踪链接到特定键的正在进行的函数调用的核心结构。

它有两个有助于防止重复调用的关键方法:

  • group.Do(key, func):运行函数,同时抑制重复请求。当您调用 Do 时,您传入一个键和一个函数,如果该键没有发生其他执行,则该函数将运行。如果同一个键已经有一个执行正在进行,您的调用将阻塞,直到第一个执行完成并返回相同的结果。
  • group.DoChan(key, func):与 group.Do 类似,但它不是阻塞,而是为您提供一个通道(

我们已经在演示中了解了如何使用 g.Do(),让我们看看如何使用经过修改的包装函数的 g.DoChan() :

var callCount atomic.Int32
var wg sync.WaitGroup

// Simulate a function that fetches data from a database
func fetchData() (interface{}, error) {
    callCount.Add(1)
    time.Sleep(100 * time.Millisecond)
    return rand.Intn(100), nil
}

// Wrap the fetchData function with singleflight
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    time.Sleep(time.Duration(id) * 40 * time.Millisecond)
    v, err, shared := g.Do("key-fetch-data", fetchData)
    if err != nil {
        return err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared)
    return nil
}

func main() {
    var g singleflight.Group

    // 5 goroutines to fetch the same data
    const numGoroutines = 5
    wg.Add(numGoroutines)

    for i := 0; i < numGoroutines; i++ {
        go fetchDataWrapper(&g, i)
    }

    wg.Wait()
    fmt.Printf("Function was called %d times\n", callCount.Load())
}

// Output:
// Goroutine 0: result: 90, shared: true
// Goroutine 2: result: 90, shared: true
// Goroutine 1: result: 90, shared: true
// Goroutine 3: result: 13, shared: true
// Goroutine 4: result: 13, shared: true
// Function was called 2 times
// Wrap the fetchData function with singleflight using DoChan
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    ch := g.DoChan("key-fetch-data", fetchData)

    res := <-ch
    if res.Err != nil {
        return res.Err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared)
    return nil
}

说实话,这里使用 DoChan() 与 Do() 相比并没有太大变化,因为我们仍在等待通道接收操作 (

DoChan() 的闪光点是当你想要启动一个操作并在不阻塞 goroutine 的情况下执行其他操作时。例如,您可以使用通道更干净地处理超时或取消:

package singleflight

type Result struct {
    Val    interface{}
    Err    error
    Shared bool
}

此示例还提出了您在现实场景中可能遇到的一些问题:

  • 由于网络响应缓慢、数据库无响应等原因,第一个 Goroutine 可能会比预期花费更长的时间。在这种情况下,所有其他等待的 Goroutine 的卡住时间都会比您希望的要长。超时可以在这里提供帮助,但任何新请求仍然会在第一个请求之后等待。
  • 您获取的数据可能会经常更改,因此当第一个请求完成时,结果可能已经过时。这意味着我们需要一种方法来使密钥无效并触发新的执行。

是的,singleflight 提供了一种使用 group.Forget(key) 方法来处理此类情况的方法,它可以让您放弃正在进行的执行。

Forget() 方法从跟踪正在进行的函数调用的内部映射中删除一个键。这有点像“使键无效”,因此如果您使用该键再次调用 g.Do(),它将像新请求一样执行该函数,而不是等待上一次执行完成。

让我们更新示例以使用 Forget() 并查看该函数实际被调用了多少次:

var callCount atomic.Int32
var wg sync.WaitGroup

// Simulate a function that fetches data from a database
func fetchData() (interface{}, error) {
    callCount.Add(1)
    time.Sleep(100 * time.Millisecond)
    return rand.Intn(100), nil
}

// Wrap the fetchData function with singleflight
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    time.Sleep(time.Duration(id) * 40 * time.Millisecond)
    v, err, shared := g.Do("key-fetch-data", fetchData)
    if err != nil {
        return err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared)
    return nil
}

func main() {
    var g singleflight.Group

    // 5 goroutines to fetch the same data
    const numGoroutines = 5
    wg.Add(numGoroutines)

    for i := 0; i < numGoroutines; i++ {
        go fetchDataWrapper(&g, i)
    }

    wg.Wait()
    fmt.Printf("Function was called %d times\n", callCount.Load())
}

// Output:
// Goroutine 0: result: 90, shared: true
// Goroutine 2: result: 90, shared: true
// Goroutine 1: result: 90, shared: true
// Goroutine 3: result: 13, shared: true
// Goroutine 4: result: 13, shared: true
// Function was called 2 times

Goroutine 0 和 Goroutine 1 都使用相同的键(“key-fetch-data”)调用 Do(),它们的请求合并为一次执行,结果在两个 Goroutine 之间共享。

Goroutine 2,另一方面,在运行 Do() 之前调用 Forget()。这会清除与“key-fetch-data”相关的任何先前结果,因此它会触发该函数的新执行。

总而言之,虽然 singleflight 很有用,但它仍然可能存在一些边缘情况,例如:

  • 如果第一个 goroutine 被阻塞的时间太长,所有等待它的其他 goroutine 也会被卡住。在这种情况下,使用超时上下文或带有超时的 select 语句可能是更好的选择。
  • 如果第一个请求返回错误或恐慌,相同的错误或恐慌将传播到等待结果的所有其他 goroutine。

如果您已经注意到我们讨论过的所有问题,让我们深入到下一部分来讨论 singleflight 的实际工作原理。

单次飞行如何运作

通过使用singleflight,你可能已经对它的内部工作原理有了一个基本的了解,singleflight的整个实现只有大约150行代码。

基本上,每个唯一的键都有一个管理其执行的结构。如果 goroutine 调用 Do() 并发现 key 已经存在,则该调用将被阻塞,直到第一次执行完成,结构如下:

// Wrap the fetchData function with singleflight using DoChan
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    ch := g.DoChan("key-fetch-data", fetchData)

    res := <-ch
    if res.Err != nil {
        return res.Err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared)
    return nil
}

这里使用了两个同步原语:

  • 组互斥锁 (g.mu):该互斥锁保护整个键映射,而不是每个键一个锁,它确保添加或删除键是线程安全的。
  • WaitGroup (g.call.wg):WaitGroup 用于等待与特定键关联的第一个 goroutine 完成其工作。

这里我们将重点关注 group.Do() 方法,因为另一个方法 group.DoChan() 的工作方式类似。 group.Forget() 方法也很简单,因为它只是从地图中删除键。

当你调用 group.Do() 时,它所做的第一件事就是锁定整个调用映射 (g.mu)。

“这对性能不是很不利吗?”

是的,它可能并不适合每种情况下的性能(总是先进行基准测试),因为 singleflight 锁定了整个密钥。如果您的目标是获得更好的性能或大规模工作,一个好的方法是分片或分发密钥。您可以将负载分散到多个组,而不是仅使用一个单一飞行组,有点像“多重飞行”

作为参考,请查看此存储库:shardedsingleflight。

现在,一旦获得锁,该组就会查看内部映射 (g.m),如果已经有对给定密钥的正在进行或已完成的调用。该地图跟踪任何正在进行或已完成的工作,并将键映射到相应的任务。

如果找到该键(另一个 goroutine 已经在运行该任务),我们只需增加一个计数器(c.dups)来跟踪重复请求,而不是开始新的调用。然后,goroutine 释放锁并通过在关联的 WaitGroup 上调用 call.wg.Wait() 来等待原始任务完成。

当原始任务完成时,这个 goroutine 会获取结果并避免再次运行该任务。

var callCount atomic.Int32
var wg sync.WaitGroup

// Simulate a function that fetches data from a database
func fetchData() (interface{}, error) {
    callCount.Add(1)
    time.Sleep(100 * time.Millisecond)
    return rand.Intn(100), nil
}

// Wrap the fetchData function with singleflight
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    time.Sleep(time.Duration(id) * 40 * time.Millisecond)
    v, err, shared := g.Do("key-fetch-data", fetchData)
    if err != nil {
        return err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared)
    return nil
}

func main() {
    var g singleflight.Group

    // 5 goroutines to fetch the same data
    const numGoroutines = 5
    wg.Add(numGoroutines)

    for i := 0; i < numGoroutines; i++ {
        go fetchDataWrapper(&g, i)
    }

    wg.Wait()
    fmt.Printf("Function was called %d times\n", callCount.Load())
}

// Output:
// Goroutine 0: result: 90, shared: true
// Goroutine 2: result: 90, shared: true
// Goroutine 1: result: 90, shared: true
// Goroutine 3: result: 13, shared: true
// Goroutine 4: result: 13, shared: true
// Function was called 2 times

如果没有其他 Goroutine 正在处理该键,则当前 Goroutine 负责执行该任务。

此时,我们创建一个新的调用对象,将其添加到映射中,并初始化其 WaitGroup。然后,我们解锁互斥体并继续通过辅助方法 g.doCall(c, key, fn) 自己执行任务。当任务完成时,任何等待的 goroutine 都会被 wg.Wait() 调用解除阻塞。

这里没什么太疯狂的,除了我们如何处理错误之外,还有三种可能的情况:

  • 如果函数发生恐慌,我们会捕获它,将其包装在一个恐慌错误中,然后引发恐慌。
  • 如果函数返回 errGoexit,我们调用 runtime.Goexit() 来正确退出 goroutine。
  • 如果这只是一个正常错误,我们会在调用时设置该错误。

这是辅助方法 g.doCall() 中事情开始变得更加聪明的地方。

“等等,什么是runtime.Goexit()?”

在深入代码之前,让我快速解释一下,runtime.Goexit() 用于停止 goroutine 的执行。

当 goroutine 调用 Goexit() 时,它会停止,并且任何延迟函数仍然按照后进先出 (LIFO) 顺序运行,就像正常情况一样。它与恐慌类似,但有一些区别:

  • 它不会触发恐慌,所以你无法用recover()捕获它。
  • 只有调用 Goexit() 的 goroutine 被终止,所有其他 goroutine 都保持正常运行。

现在,这是一个有趣的怪癖(与我们的主题没有直接关系,但值得一提)。如果你在主协程中调用runtime.Goexit()(比如在main()内部),请检查一下:

var callCount atomic.Int32
var wg sync.WaitGroup

// Simulate a function that fetches data from a database
func fetchData() (interface{}, error) {
    callCount.Add(1)
    time.Sleep(100 * time.Millisecond)
    return rand.Intn(100), nil
}

// Wrap the fetchData function with singleflight
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    time.Sleep(time.Duration(id) * 40 * time.Millisecond)
    v, err, shared := g.Do("key-fetch-data", fetchData)
    if err != nil {
        return err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared)
    return nil
}

func main() {
    var g singleflight.Group

    // 5 goroutines to fetch the same data
    const numGoroutines = 5
    wg.Add(numGoroutines)

    for i := 0; i < numGoroutines; i++ {
        go fetchDataWrapper(&g, i)
    }

    wg.Wait()
    fmt.Printf("Function was called %d times\n", callCount.Load())
}

// Output:
// Goroutine 0: result: 90, shared: true
// Goroutine 2: result: 90, shared: true
// Goroutine 1: result: 90, shared: true
// Goroutine 3: result: 13, shared: true
// Goroutine 4: result: 13, shared: true
// Function was called 2 times

发生的情况是 Goexit() 终止了主 goroutine,但如果还有其他 goroutine 仍在运行,程序会继续运行,因为只要至少有一个 goroutine 处于活动状态,Go 运行时就会保持活动状态。然而,一旦没有剩下 goroutines,它就会因“no goroutine”错误而崩溃,这是一个有趣的小角落案例。

现在,回到我们的代码,如果runtime.Goexit()仅终止当前的goroutine并且无法被recover()捕获,我们如何检测它是否被调用?

关键在于,当调用runtime.Goexit()时,其后面的任何代码都不会被执行。

// Wrap the fetchData function with singleflight using DoChan
func fetchDataWrapper(g *singleflight.Group, id int) error {
    defer wg.Done()

    ch := g.DoChan("key-fetch-data", fetchData)

    res := <-ch
    if res.Err != nil {
        return res.Err
    }

    fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared)
    return nil
}

在上面的情况下,调用runtime.Goexit()之后,normalReturn = true这一行永远不会被执行。因此,在 defer 内部,我们可以检查 normalReturn 是否仍然为 false,以检测是否调用了特殊方法。

下一步是确定任务是否出现恐慌。为此,我们使用recover()作为正常返回,尽管singleflight中的实际代码有点微妙:

package singleflight

type Result struct {
    Val    interface{}
    Err    error
    Shared bool
}

这段代码不是直接在recover块内设置recovered = true,而是通过在recover()块之后将recovery设置为最后一行来获得一点奇特的效果。

那么,为什么这会起作用?

当调用runtime.Goexit()时,它会终止整个goroutine,就像panic()一样。然而,如果panic()被恢复,只有panic()和recover()之间的函数链被终止,而不是整个goroutine。

Go Singleflight Melts in Your Code, Not in Your DB

singleflight中panic和runtime.Goexit()的处理

这就是为什么在包含recover()的defer之外设置recovered = true,它只在两种情况下执行:当函数正常完成时或当恐慌恢复时,但在调用runtime.Goexit()时不会执行。

接下来,我们将讨论如何处理每个案例。

func fetchDataWrapperWithTimeout(g *singleflight.Group, id int) error {
    defer wg.Done()

    ch := g.DoChan("key-fetch-data", fetchData)
    select {
    case res := <-ch:
        if res.Err != nil {
            return res.Err
        }
        fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared)
    case <-time.After(50 * time.Millisecond):
        return fmt.Errorf("timeout waiting for result")
    }

  return nil
}

如果任务在执行过程中发生紧急情况,则会捕获紧急情况并将其保存在 c.err 中作为紧急错误,其中包含紧急值和堆栈跟踪。 singleflight 捕捉到恐慌并优雅地清理,但它不会吞掉它,它会在处理其状态后重新抛出恐慌。

这意味着执行任务的 Goroutine(第一个开始执行操作的 Goroutine)会发生恐慌,并且所有其他等待结果的 Goroutine 也会发生恐慌。

由于这种恐慌发生在开发人员的代码中,因此我们有责任妥善处理它。

现在,我们仍然需要考虑一种特殊情况:当其他 goroutine 使用 group.DoChan() 方法并通过通道等待结果时。在这种情况下,singleflight 不能在这些 goroutine 中发生恐慌。相反,它会执行所谓的不可恢复的恐慌(gopanic(e)),这会使我们的应用程序崩溃。

最后,如果任务调用了runtime.Goexit(),则不需要采取任何进一步的操作,因为goroutine已经处于关闭过程中,我们只是让它发生而不干扰。

差不多就是这样,除了我们讨论过的特殊情况之外,没有什么太复杂的。

保持联系

大家好,我是 Phuong Le,VictoriaMetrics 的软件工程师。上述写作风格注重清晰和简单,以易于理解的方式解释概念,即使它并不总是与学术精度完全一致。

如果您发现任何过时的内容或有疑问,请随时与我们联系。您可以在 X(@func25) 上给我留言。

您可能感兴趣的其他一些帖子:

  • Go I/O 读取器、写入器和动态数据。
  • Go 数组如何工作以及如何使用 For-Range
  • Go 中的切片:变大或回家
  • Go Maps 解释:键值对实际上是如何存储的
  • Golang Defer:从基础到陷阱
  • 供应商,或 go mod 供应商:这是什么?

我们是谁

如果您想监控您的服务、跟踪指标并了解一切的执行情况,您可能需要查看 VictoriaMetrics。这是一种快速、开源且节省成本的方式来监控您的基础设施。

我们是 Gophers,热爱研究、实验和分享 Go 及其生态系统知识的爱好者。

以上是Go Singleflight 融入您的代码中,而不是您的数据库中的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn