原文发布在VictoriaMetrics博客上:https://victoriametrics.com/blog/go-singleflight/
这篇文章是关于 Go 中处理并发的系列文章的一部分:
因此,当您同时收到多个请求相同的数据时,默认行为是每个请求都会单独访问数据库以获取相同的信息。这意味着您最终会多次执行相同的查询,老实说,这效率很低。
它最终会给数据库带来不必要的负载,这可能会减慢一切,但有一种方法可以解决这个问题。
这个想法是只有第一个请求实际上才会发送到数据库。其余请求等待第一个请求完成。一旦数据从初始请求返回,其他请求就会得到相同的结果 - 不需要额外的查询。
那么,现在您已经很清楚这篇文章的内容了,对吧?
Go 中的 singleflight 包是专门为处理我们刚才讨论的问题而构建的。请注意,它不是标准库的一部分,但由 Go 团队维护和开发。
singleflight 的作用是确保只有一个 goroutine 实际运行该操作,例如从数据库获取数据。它只允许在任何给定时刻对同一条数据(称为“密钥”)执行一次“进行中”(正在进行的)操作。
因此,如果其他 goroutine 在该操作仍在进行时请求相同的数据(相同的键),它们只会等待。然后,当第一个操作完成时,所有其他操作都会得到相同的结果,而无需再次运行该操作。
好了,说得够多了,让我们深入了解一下 singleflight 的实际工作原理:
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
这里发生了什么:
我们正在模拟这样的情况:5 个 goroutine 几乎同时尝试获取相同的数据,间隔 60 毫秒。为了简单起见,我们使用随机数来模拟从数据库中获取的数据。
使用 singleflight.Group,我们确保只有第一个 goroutine 实际运行 fetchData(),其余的 goroutine 等待结果。
行 v, err, shared := g.Do("key-fetch-data", fetchData) 分配一个唯一的键 ("key-fetch-data") 来跟踪这些请求。因此,如果另一个 goroutine 请求相同的键,而第一个 goroutine 仍在获取数据,它会等待结果而不是开始新的调用。
一旦第一个调用完成,任何等待的 goroutine 都会得到相同的结果,正如我们在输出中看到的那样。虽然我们有 5 个 goroutine 请求数据,但 fetchData 只运行了两次,这是一个巨大的提升。
共享标志确认结果已在多个 goroutine 之间重用。
“但是为什么第一个 goroutine 的共享标志为 true?我以为只有等待的 goroutine 才会共享 == true?”
是的,如果您认为只有等待的 goroutine 应该共享 == true,这可能会感觉有点违反直觉。
问题是,g.Do 中的共享变量告诉您结果是否在多个调用者之间共享。它基本上是在说:“嘿,这个结果被多个调用者使用了。”这与谁运行该函数无关,它只是一个信号,表明结果在多个 goroutine 之间重用。
“我有缓存,为什么我需要单次飞行?”
简短的答案是:缓存和 singleflight 解决不同的问题,而且它们实际上可以很好地协同工作。
在使用外部缓存(如 Redis 或 Memcached)的设置中,singleflight 增加了额外的保护层,不仅为您的数据库,也为缓存本身。
此外,singleflight 有助于防止缓存未命中风暴(有时称为“缓存踩踏”)。
通常,当请求请求数据时,如果数据在缓存中,那就太好了 - 这是缓存命中。如果数据不在缓存中,则为缓存未命中。假设在重建缓存之前有 10,000 个请求同时到达系统,数据库可能会突然同时受到 10,000 个相同查询的冲击。
在此高峰期间,singleflight 确保这 10,000 个请求中只有一个真正到达数据库。
但是稍后,在内部实现部分,我们将看到 singleflight 使用全局锁来保护正在进行的调用的映射,这可能成为每个 goroutine 的单点争用。这可能会减慢速度,尤其是在处理高并发时。
下面的模型可能更适合具有多个 CPU 的机器:
在此设置中,我们仅在发生缓存未命中时使用 singleflight。
要使用 singleflight,您首先创建一个 Group 对象,它是跟踪链接到特定键的正在进行的函数调用的核心结构。
它有两个有助于防止重复调用的关键方法:
我们已经在演示中了解了如何使用 g.Do(),让我们看看如何使用经过修改的包装函数的 g.DoChan() :
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
说实话,这里使用 DoChan() 与 Do() 相比并没有太大变化,因为我们仍在等待通道接收操作 (
DoChan() 的闪光点是当你想要启动一个操作并在不阻塞 goroutine 的情况下执行其他操作时。例如,您可以使用通道更干净地处理超时或取消:
package singleflight type Result struct { Val interface{} Err error Shared bool }
此示例还提出了您在现实场景中可能遇到的一些问题:
是的,singleflight 提供了一种使用 group.Forget(key) 方法来处理此类情况的方法,它可以让您放弃正在进行的执行。
Forget() 方法从跟踪正在进行的函数调用的内部映射中删除一个键。这有点像“使键无效”,因此如果您使用该键再次调用 g.Do(),它将像新请求一样执行该函数,而不是等待上一次执行完成。
让我们更新示例以使用 Forget() 并查看该函数实际被调用了多少次:
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
Goroutine 0 和 Goroutine 1 都使用相同的键(“key-fetch-data”)调用 Do(),它们的请求合并为一次执行,结果在两个 Goroutine 之间共享。
Goroutine 2,另一方面,在运行 Do() 之前调用 Forget()。这会清除与“key-fetch-data”相关的任何先前结果,因此它会触发该函数的新执行。
总而言之,虽然 singleflight 很有用,但它仍然可能存在一些边缘情况,例如:
如果您已经注意到我们讨论过的所有问题,让我们深入到下一部分来讨论 singleflight 的实际工作原理。
通过使用singleflight,你可能已经对它的内部工作原理有了一个基本的了解,singleflight的整个实现只有大约150行代码。
基本上,每个唯一的键都有一个管理其执行的结构。如果 goroutine 调用 Do() 并发现 key 已经存在,则该调用将被阻塞,直到第一次执行完成,结构如下:
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
这里使用了两个同步原语:
这里我们将重点关注 group.Do() 方法,因为另一个方法 group.DoChan() 的工作方式类似。 group.Forget() 方法也很简单,因为它只是从地图中删除键。
当你调用 group.Do() 时,它所做的第一件事就是锁定整个调用映射 (g.mu)。
“这对性能不是很不利吗?”
是的,它可能并不适合每种情况下的性能(总是先进行基准测试),因为 singleflight 锁定了整个密钥。如果您的目标是获得更好的性能或大规模工作,一个好的方法是分片或分发密钥。您可以将负载分散到多个组,而不是仅使用一个单一飞行组,有点像“多重飞行”
作为参考,请查看此存储库:shardedsingleflight。
现在,一旦获得锁,该组就会查看内部映射 (g.m),如果已经有对给定密钥的正在进行或已完成的调用。该地图跟踪任何正在进行或已完成的工作,并将键映射到相应的任务。
如果找到该键(另一个 goroutine 已经在运行该任务),我们只需增加一个计数器(c.dups)来跟踪重复请求,而不是开始新的调用。然后,goroutine 释放锁并通过在关联的 WaitGroup 上调用 call.wg.Wait() 来等待原始任务完成。
当原始任务完成时,这个 goroutine 会获取结果并避免再次运行该任务。
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
如果没有其他 Goroutine 正在处理该键,则当前 Goroutine 负责执行该任务。
此时,我们创建一个新的调用对象,将其添加到映射中,并初始化其 WaitGroup。然后,我们解锁互斥体并继续通过辅助方法 g.doCall(c, key, fn) 自己执行任务。当任务完成时,任何等待的 goroutine 都会被 wg.Wait() 调用解除阻塞。
这里没什么太疯狂的,除了我们如何处理错误之外,还有三种可能的情况:
这是辅助方法 g.doCall() 中事情开始变得更加聪明的地方。
“等等,什么是runtime.Goexit()?”
在深入代码之前,让我快速解释一下,runtime.Goexit() 用于停止 goroutine 的执行。
当 goroutine 调用 Goexit() 时,它会停止,并且任何延迟函数仍然按照后进先出 (LIFO) 顺序运行,就像正常情况一样。它与恐慌类似,但有一些区别:
现在,这是一个有趣的怪癖(与我们的主题没有直接关系,但值得一提)。如果你在主协程中调用runtime.Goexit()(比如在main()内部),请检查一下:
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
发生的情况是 Goexit() 终止了主 goroutine,但如果还有其他 goroutine 仍在运行,程序会继续运行,因为只要至少有一个 goroutine 处于活动状态,Go 运行时就会保持活动状态。然而,一旦没有剩下 goroutines,它就会因“no goroutine”错误而崩溃,这是一个有趣的小角落案例。
现在,回到我们的代码,如果runtime.Goexit()仅终止当前的goroutine并且无法被recover()捕获,我们如何检测它是否被调用?
关键在于,当调用runtime.Goexit()时,其后面的任何代码都不会被执行。
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
在上面的情况下,调用runtime.Goexit()之后,normalReturn = true这一行永远不会被执行。因此,在 defer 内部,我们可以检查 normalReturn 是否仍然为 false,以检测是否调用了特殊方法。
下一步是确定任务是否出现恐慌。为此,我们使用recover()作为正常返回,尽管singleflight中的实际代码有点微妙:
package singleflight type Result struct { Val interface{} Err error Shared bool }
这段代码不是直接在recover块内设置recovered = true,而是通过在recover()块之后将recovery设置为最后一行来获得一点奇特的效果。
那么,为什么这会起作用?
当调用runtime.Goexit()时,它会终止整个goroutine,就像panic()一样。然而,如果panic()被恢复,只有panic()和recover()之间的函数链被终止,而不是整个goroutine。
这就是为什么在包含recover()的defer之外设置recovered = true,它只在两种情况下执行:当函数正常完成时或当恐慌恢复时,但在调用runtime.Goexit()时不会执行。
接下来,我们将讨论如何处理每个案例。
func fetchDataWrapperWithTimeout(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) select { case res := <-ch: if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) case <-time.After(50 * time.Millisecond): return fmt.Errorf("timeout waiting for result") } return nil }
如果任务在执行过程中发生紧急情况,则会捕获紧急情况并将其保存在 c.err 中作为紧急错误,其中包含紧急值和堆栈跟踪。 singleflight 捕捉到恐慌并优雅地清理,但它不会吞掉它,它会在处理其状态后重新抛出恐慌。
这意味着执行任务的 Goroutine(第一个开始执行操作的 Goroutine)会发生恐慌,并且所有其他等待结果的 Goroutine 也会发生恐慌。
由于这种恐慌发生在开发人员的代码中,因此我们有责任妥善处理它。
现在,我们仍然需要考虑一种特殊情况:当其他 goroutine 使用 group.DoChan() 方法并通过通道等待结果时。在这种情况下,singleflight 不能在这些 goroutine 中发生恐慌。相反,它会执行所谓的不可恢复的恐慌(gopanic(e)),这会使我们的应用程序崩溃。
最后,如果任务调用了runtime.Goexit(),则不需要采取任何进一步的操作,因为goroutine已经处于关闭过程中,我们只是让它发生而不干扰。
差不多就是这样,除了我们讨论过的特殊情况之外,没有什么太复杂的。
大家好,我是 Phuong Le,VictoriaMetrics 的软件工程师。上述写作风格注重清晰和简单,以易于理解的方式解释概念,即使它并不总是与学术精度完全一致。
如果您发现任何过时的内容或有疑问,请随时与我们联系。您可以在 X(@func25) 上给我留言。
您可能感兴趣的其他一些帖子:
如果您想监控您的服务、跟踪指标并了解一切的执行情况,您可能需要查看 VictoriaMetrics。这是一种快速、开源且节省成本的方式来监控您的基础设施。
我们是 Gophers,热爱研究、实验和分享 Go 及其生态系统知识的爱好者。
以上是Go Singleflight 融入您的代码中,而不是您的数据库中的详细内容。更多信息请关注PHP中文网其他相关文章!