原文发布在VictoriaMetrics博客上:https://victoriametrics.com/blog/go-singleflight/
这篇文章是关于 Go 中处理并发的系列文章的一部分:
- Gosync.Mutex:正常和饥饿模式
- Gosync.WaitGroup 和对齐问题
- Gosync.Pool 及其背后的机制
- Gosync.Cond,最被忽视的同步机制
- Gosync.Map:适合正确工作的正确工具
- Go Sync.Once 很简单...真的吗?
- Go Singleflight 融入您的代码,而不是您的数据库(我们在这里)
因此,当您同时收到多个请求相同的数据时,默认行为是每个请求都会单独访问数据库以获取相同的信息。这意味着您最终会多次执行相同的查询,老实说,这效率很低。
它最终会给数据库带来不必要的负载,这可能会减慢一切,但有一种方法可以解决这个问题。
这个想法是只有第一个请求实际上才会发送到数据库。其余请求等待第一个请求完成。一旦数据从初始请求返回,其他请求就会得到相同的结果 - 不需要额外的查询。
那么,现在您已经很清楚这篇文章的内容了,对吧?
单程航班
Go 中的 singleflight 包是专门为处理我们刚才讨论的问题而构建的。请注意,它不是标准库的一部分,但由 Go 团队维护和开发。
singleflight 的作用是确保只有一个 goroutine 实际运行该操作,例如从数据库获取数据。它只允许在任何给定时刻对同一条数据(称为“密钥”)执行一次“进行中”(正在进行的)操作。
因此,如果其他 goroutine 在该操作仍在进行时请求相同的数据(相同的键),它们只会等待。然后,当第一个操作完成时,所有其他操作都会得到相同的结果,而无需再次运行该操作。
好了,说得够多了,让我们深入了解一下 singleflight 的实际工作原理:
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i <p>这里发生了什么:</p> <p>我们正在模拟这样的情况:5 个 goroutine 几乎同时尝试获取相同的数据,间隔 60 毫秒。为了简单起见,我们使用随机数来模拟从数据库中获取的数据。</p> <p>使用 singleflight.Group,我们确保只有第一个 goroutine 实际运行 fetchData(),其余的 goroutine 等待结果。</p> <p>行 v, err, shared := g.Do("key-fetch-data", fetchData) 分配一个唯一的键 ("key-fetch-data") 来跟踪这些请求。因此,如果另一个 goroutine 请求相同的键,而第一个 goroutine 仍在获取数据,它会等待结果而不是开始新的调用。</p> <figure><p><img src="/static/imghwm/default1.png" data-src="https://img.php.cn/upload/article/000/000/000/173078082880779.jpg?x-oss-process=image/resize,p_40" class="lazy" alt="Go Singleflight Melts in Your Code, Not in Your DB"></p> <figcaption>单次飞行演示</figcaption></figure> <p>一旦第一个调用完成,任何等待的 goroutine 都会得到相同的结果,正如我们在输出中看到的那样。虽然我们有 5 个 goroutine 请求数据,但 fetchData 只运行了两次,这是一个巨大的提升。</p> <p>共享标志确认结果已在多个 goroutine 之间重用。</p> <blockquote> <p><em>“但是为什么第一个 goroutine 的共享标志为 true?我以为只有等待的 goroutine 才会共享 == true?”</em></p> </blockquote> <p>是的,如果您认为只有等待的 goroutine 应该共享 == true,这可能会感觉有点违反直觉。</p> <p>问题是,g.Do 中的共享变量告诉您结果是否在多个调用者之间共享。它基本上是在说:“嘿,这个结果被多个调用者使用了。”这与谁运行该函数无关,它只是一个信号,表明结果在多个 goroutine 之间重用。</p> <blockquote> <p><em>“我有缓存,为什么我需要单次飞行?”</em></p> </blockquote> <p>简短的答案是:缓存和 singleflight 解决不同的问题,而且它们实际上可以很好地协同工作。</p> <p>在使用外部缓存(如 Redis 或 Memcached)的设置中,singleflight 增加了额外的保护层,不仅为您的数据库,也为缓存本身。</p> <figure><p><img src="/static/imghwm/default1.png" data-src="https://img.php.cn/upload/article/000/000/000/173078082988490.jpg?x-oss-process=image/resize,p_40" class="lazy" alt="Go Singleflight Melts in Your Code, Not in Your DB"></p> <figcaption>Singleflight 与缓存系统一起工作</figcaption></figure> <p>此外,singleflight 有助于防止缓存未命中风暴(有时称为“缓存踩踏”)。</p> <p>通常,当请求请求数据时,如果数据在缓存中,那就太好了 - 这是缓存命中。如果数据不在缓存中,则为缓存未命中。假设在重建缓存之前有 10,000 个请求同时到达系统,数据库可能会突然同时受到 10,000 个相同查询的冲击。</p><p>在此高峰期间,singleflight 确保这 10,000 个请求中只有一个真正到达数据库。</p> <p>但是稍后,在内部实现部分,我们将看到 singleflight 使用全局锁来保护正在进行的调用的映射,这可能成为每个 goroutine 的单点争用。这可能会减慢速度,尤其是在处理高并发时。</p> <p>下面的模型可能更适合具有多个 CPU 的机器:</p> <figure><p><img src="/static/imghwm/default1.png" data-src="https://img.php.cn/upload/article/000/000/000/173078083076065.jpg?x-oss-process=image/resize,p_40" class="lazy" alt="Go Singleflight Melts in Your Code, Not in Your DB"></p> <figcaption>缓存未命中时的单次飞行</figcaption></figure> <p>在此设置中,我们仅在发生缓存未命中时使用 singleflight。</p> <h3> 单次航班运营 </h3> <p>要使用 singleflight,您首先创建一个 Group 对象,它是跟踪链接到特定键的正在进行的函数调用的核心结构。 </p> <p>它有两个有助于防止重复调用的关键方法:</p>
- group.Do(key, func):运行函数,同时抑制重复请求。当您调用 Do 时,您传入一个键和一个函数,如果该键没有发生其他执行,则该函数将运行。如果同一个键已经有一个执行正在进行,您的调用将阻塞,直到第一个执行完成并返回相同的结果。
- group.DoChan(key, func):与 group.Do 类似,但它不是阻塞,而是为您提供一个通道(
我们已经在演示中了解了如何使用 g.Do(),让我们看看如何使用经过修改的包装函数的 g.DoChan() :
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i <pre class="brush:php;toolbar:false">// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <p>说实话,这里使用 DoChan() 与 Do() 相比并没有太大变化,因为我们仍在等待通道接收操作 ( </p><p>DoChan() 的闪光点是当你想要启动一个操作并在不阻塞 goroutine 的情况下执行其他操作时。例如,您可以使用通道更干净地处理超时或取消:<br> </p> <pre class="brush:php;toolbar:false">package singleflight type Result struct { Val interface{} Err error Shared bool }
此示例还提出了您在现实场景中可能遇到的一些问题:
- 由于网络响应缓慢、数据库无响应等原因,第一个 Goroutine 可能会比预期花费更长的时间。在这种情况下,所有其他等待的 Goroutine 的卡住时间都会比您希望的要长。超时可以在这里提供帮助,但任何新请求仍然会在第一个请求之后等待。
- 您获取的数据可能会经常更改,因此当第一个请求完成时,结果可能已经过时。这意味着我们需要一种方法来使密钥无效并触发新的执行。
是的,singleflight 提供了一种使用 group.Forget(key) 方法来处理此类情况的方法,它可以让您放弃正在进行的执行。
Forget() 方法从跟踪正在进行的函数调用的内部映射中删除一个键。这有点像“使键无效”,因此如果您使用该键再次调用 g.Do(),它将像新请求一样执行该函数,而不是等待上一次执行完成。
让我们更新示例以使用 Forget() 并查看该函数实际被调用了多少次:
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i <p>Goroutine 0 和 Goroutine 1 都使用相同的键(“key-fetch-data”)调用 Do(),它们的请求合并为一次执行,结果在两个 Goroutine 之间共享。</p> <p>Goroutine 2,另一方面,在运行 Do() 之前调用 Forget()。这会清除与“key-fetch-data”相关的任何先前结果,因此它会触发该函数的新执行。</p> <p>总而言之,虽然 singleflight 很有用,但它仍然可能存在一些边缘情况,例如:</p>
- 如果第一个 goroutine 被阻塞的时间太长,所有等待它的其他 goroutine 也会被卡住。在这种情况下,使用超时上下文或带有超时的 select 语句可能是更好的选择。
- 如果第一个请求返回错误或恐慌,相同的错误或恐慌将传播到等待结果的所有其他 goroutine。
如果您已经注意到我们讨论过的所有问题,让我们深入到下一部分来讨论 singleflight 的实际工作原理。
单次飞行如何运作
通过使用singleflight,你可能已经对它的内部工作原理有了一个基本的了解,singleflight的整个实现只有大约150行代码。
基本上,每个唯一的键都有一个管理其执行的结构。如果 goroutine 调用 Do() 并发现 key 已经存在,则该调用将被阻塞,直到第一次执行完成,结构如下:
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <p>这里使用了两个同步原语:</p>
- 组互斥锁 (g.mu):该互斥锁保护整个键映射,而不是每个键一个锁,它确保添加或删除键是线程安全的。
- WaitGroup (g.call.wg):WaitGroup 用于等待与特定键关联的第一个 goroutine 完成其工作。
这里我们将重点关注 group.Do() 方法,因为另一个方法 group.DoChan() 的工作方式类似。 group.Forget() 方法也很简单,因为它只是从地图中删除键。
当你调用 group.Do() 时,它所做的第一件事就是锁定整个调用映射 (g.mu)。
“这对性能不是很不利吗?”
是的,它可能并不适合每种情况下的性能(总是先进行基准测试),因为 singleflight 锁定了整个密钥。如果您的目标是获得更好的性能或大规模工作,一个好的方法是分片或分发密钥。您可以将负载分散到多个组,而不是仅使用一个单一飞行组,有点像“多重飞行”
作为参考,请查看此存储库:shardedsingleflight。
现在,一旦获得锁,该组就会查看内部映射 (g.m),如果已经有对给定密钥的正在进行或已完成的调用。该地图跟踪任何正在进行或已完成的工作,并将键映射到相应的任务。
如果找到该键(另一个 goroutine 已经在运行该任务),我们只需增加一个计数器(c.dups)来跟踪重复请求,而不是开始新的调用。然后,goroutine 释放锁并通过在关联的 WaitGroup 上调用 call.wg.Wait() 来等待原始任务完成。
当原始任务完成时,这个 goroutine 会获取结果并避免再次运行该任务。
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i <p>如果没有其他 Goroutine 正在处理该键,则当前 Goroutine 负责执行该任务。</p> <p>此时,我们创建一个新的调用对象,将其添加到映射中,并初始化其 WaitGroup。然后,我们解锁互斥体并继续通过辅助方法 g.doCall(c, key, fn) 自己执行任务。当任务完成时,任何等待的 goroutine 都会被 wg.Wait() 调用解除阻塞。</p> <p>这里没什么太疯狂的,除了我们如何处理错误之外,还有三种可能的情况:</p>
- 如果函数发生恐慌,我们会捕获它,将其包装在一个恐慌错误中,然后引发恐慌。
- 如果函数返回 errGoexit,我们调用 runtime.Goexit() 来正确退出 goroutine。
- 如果这只是一个正常错误,我们会在调用时设置该错误。
这是辅助方法 g.doCall() 中事情开始变得更加聪明的地方。
“等等,什么是runtime.Goexit()?”
在深入代码之前,让我快速解释一下,runtime.Goexit() 用于停止 goroutine 的执行。
当 goroutine 调用 Goexit() 时,它会停止,并且任何延迟函数仍然按照后进先出 (LIFO) 顺序运行,就像正常情况一样。它与恐慌类似,但有一些区别:
- 它不会触发恐慌,所以你无法用recover()捕获它。
- 只有调用 Goexit() 的 goroutine 被终止,所有其他 goroutine 都保持正常运行。
现在,这是一个有趣的怪癖(与我们的主题没有直接关系,但值得一提)。如果你在主协程中调用runtime.Goexit()(比如在main()内部),请检查一下:
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i <p>发生的情况是 Goexit() 终止了主 goroutine,但如果还有其他 goroutine 仍在运行,程序会继续运行,因为只要至少有一个 goroutine 处于活动状态,Go 运行时就会保持活动状态。然而,一旦没有剩下 goroutines,它就会因“no goroutine”错误而崩溃,这是一个有趣的小角落案例。</p> <p>现在,回到我们的代码,如果runtime.Goexit()仅终止当前的goroutine并且无法被recover()捕获,我们如何检测它是否被调用?</p> <p>关键在于,当调用runtime.Goexit()时,其后面的任何代码都不会被执行。<br> </p> <pre class="brush:php;toolbar:false">// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <p>在上面的情况下,调用runtime.Goexit()之后,normalReturn = true这一行永远不会被执行。因此,在 defer 内部,我们可以检查 normalReturn 是否仍然为 false,以检测是否调用了特殊方法。</p> <p>下一步是确定任务是否出现恐慌。为此,我们使用recover()作为正常返回,尽管singleflight中的实际代码有点微妙:<br> </p> <pre class="brush:php;toolbar:false">package singleflight type Result struct { Val interface{} Err error Shared bool }
这段代码不是直接在recover块内设置recovered = true,而是通过在recover()块之后将recovery设置为最后一行来获得一点奇特的效果。
那么,为什么这会起作用?
当调用runtime.Goexit()时,它会终止整个goroutine,就像panic()一样。然而,如果panic()被恢复,只有panic()和recover()之间的函数链被终止,而不是整个goroutine。
这就是为什么在包含recover()的defer之外设置recovered = true,它只在两种情况下执行:当函数正常完成时或当恐慌恢复时,但在调用runtime.Goexit()时不会执行。
接下来,我们将讨论如何处理每个案例。
func fetchDataWrapperWithTimeout(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) select { case res := <p>如果任务在执行过程中发生紧急情况,则会捕获紧急情况并将其保存在 c.err 中作为紧急错误,其中包含紧急值和堆栈跟踪。 singleflight 捕捉到恐慌并优雅地清理,但它不会吞掉它,它会在处理其状态后重新抛出恐慌。 </p><p>这意味着执行任务的 Goroutine(第一个开始执行操作的 Goroutine)会发生恐慌,并且所有其他等待结果的 Goroutine 也会发生恐慌。</p> <p>由于这种恐慌发生在开发人员的代码中,因此我们有责任妥善处理它。</p> <p>现在,我们仍然需要考虑一种特殊情况:当其他 goroutine 使用 group.DoChan() 方法并通过通道等待结果时。在这种情况下,singleflight 不能在这些 goroutine 中发生恐慌。相反,它会执行所谓的<strong>不可恢复的恐慌</strong>(gopanic(e)),这会使我们的应用程序崩溃。</p> <p>最后,如果任务调用了runtime.Goexit(),则不需要采取任何进一步的操作,因为goroutine已经处于关闭过程中,我们只是让它发生而不干扰。</p> <p>差不多就是这样,除了我们讨论过的特殊情况之外,没有什么太复杂的。</p> <h2> 保持联系 </h2> <p>大家好,我是 Phuong Le,VictoriaMetrics 的软件工程师。上述写作风格注重清晰和简单,以易于理解的方式解释概念,即使它并不总是与学术精度完全一致。</p> <p>如果您发现任何过时的内容或有疑问,请随时与我们联系。您可以在 X(@func25) 上给我留言。</p> <p>您可能感兴趣的其他一些帖子:</p>
- Go I/O 读取器、写入器和动态数据。
- Go 数组如何工作以及如何使用 For-Range
- Go 中的切片:变大或回家
- Go Maps 解释:键值对实际上是如何存储的
- Golang Defer:从基础到陷阱
- 供应商,或 go mod 供应商:这是什么?
我们是谁
如果您想监控您的服务、跟踪指标并了解一切的执行情况,您可能需要查看 VictoriaMetrics。这是一种快速、开源且节省成本的方式来监控您的基础设施。
我们是 Gophers,热爱研究、实验和分享 Go 及其生态系统知识的爱好者。
以上是Go Singleflight 融入您的代码中,而不是您的数据库中的详细内容。更多信息请关注PHP中文网其他相关文章!

在Go中,使用互斥锁和锁是确保线程安全的关键。1)使用sync.Mutex进行互斥访问,2)使用sync.RWMutex处理读写操作,3)使用原子操作进行性能优化。掌握这些工具及其使用技巧对于编写高效、可靠的并发程序至关重要。

如何优化并发Go代码的性能?使用Go的内置工具如gotest、gobench和pprof进行基准测试和性能分析。1)使用testing包编写基准测试,评估并发函数的执行速度。2)通过pprof工具进行性能分析,识别程序中的瓶颈。3)调整垃圾收集设置以减少其对性能的影响。4)优化通道操作和限制goroutine数量以提高效率。通过持续的基准测试和性能分析,可以有效提升并发Go代码的性能。

避免并发Go程序中错误处理的常见陷阱的方法包括:1.确保错误传播,2.处理超时,3.聚合错误,4.使用上下文管理,5.错误包装,6.日志记录,7.测试。这些策略有助于有效处理并发环境中的错误。

IndimitInterfaceImplementationingingoembodiesducktybybyallowingTypestoSatoSatiSatiSatiSatiSatiSatsatSatiSatplicesWithouTexpliclIctDeclaration.1)itpromotesflemotesflexibility andmodularitybybyfocusingion.2)挑战挑战InclocteSincludeUpdatingMethodSignateSignatiSantTrackingImplections.3)工具li

在Go编程中,有效管理错误的方法包括:1)使用错误值而非异常,2)采用错误包装技术,3)定义自定义错误类型,4)复用错误值以提高性能,5)谨慎使用panic和recover,6)确保错误消息清晰且一致,7)记录错误处理策略,8)将错误视为一等公民,9)使用错误通道处理异步错误。这些做法和模式有助于编写更健壮、可维护和高效的代码。

在Go中实现并发可以通过使用goroutines和channels来实现。1)使用goroutines来并行执行任务,如示例中同时享受音乐和观察朋友。2)通过channels在goroutines之间安全传递数据,如生产者和消费者模式。3)避免过度使用goroutines和死锁,合理设计系统以优化并发程序。

Gooffersmultipleapproachesforbuildingconcurrentdatastructures,includingmutexes,channels,andatomicoperations.1)Mutexesprovidesimplethreadsafetybutcancauseperformancebottlenecks.2)Channelsofferscalabilitybutmayblockiffullorempty.3)Atomicoperationsareef

go'serrorhandlingisexplicit,治疗eRROSASRETRATERTHANEXCEPTIONS,与pythonandjava.1)go'sapphifeensuresererrawaresserrorawarenessbutcanleadtoverbosecode.2)pythonandjavauseexeexceptionseforforforforforcleanerCodebutmaymobisserrors.3)


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

MinGW - 适用于 Windows 的极简 GNU
这个项目正在迁移到osdn.net/projects/mingw的过程中,你可以继续在那里关注我们。MinGW:GNU编译器集合(GCC)的本地Windows移植版本,可自由分发的导入库和用于构建本地Windows应用程序的头文件;包括对MSVC运行时的扩展,以支持C99功能。MinGW的所有软件都可以在64位Windows平台上运行。

记事本++7.3.1
好用且免费的代码编辑器

Dreamweaver Mac版
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

SublimeText3 英文版
推荐:为Win版本,支持代码提示!