元の記事は VictoriaMetrics ブログに掲載されています: https://victoriametrics.com/blog/go-singleflight/
この投稿は、Go での同時実行性の処理に関するシリーズの一部です:
そのため、同じデータを要求する複数のリクエストが同時に受信されると、デフォルトの動作では、それらのリクエストはそれぞれ個別にデータベースに送信され、同じ情報を取得します。 。つまり、同じクエリを何度も実行することになり、正直に言うと非効率です。
データベースに不必要な負荷がかかり、すべてが遅くなる可能性がありますが、これを回避する方法があります。
その考え方は、最初のリクエストだけが実際にデータベースに送信されるということです。残りのリクエストは、最初のリクエストが完了するまで待機します。最初のリクエストからデータが返されると、他のリクエストでも同じ結果が得られるだけで、追加のクエリは必要ありません。
それで、この投稿の内容についてはかなり理解できましたね?
Go の singleflight パッケージは、今説明したことを正確に処理するために特別に構築されています。念のため言っておきますが、これは標準ライブラリの一部ではありませんが、Go チームによって保守および開発されています。
singleflight が行うことは、データベースからデータを取得するなどの操作を実際に実行するゴルーチンのうち 1 つだけを確実に実行することです。これにより、同じデータ部分 (「キー」と呼ばれる) に対して、常に 1 つの「実行中」(進行中の) 操作のみが許可されます。
したがって、その操作の実行中に他のゴルーチンが同じデータ (同じキー) を要求した場合、それらはただ待つだけになります。その後、最初の操作が完了すると、操作を再度実行することなく、他のすべての操作で同じ結果が得られます。
さて、話はこれくらいにして、singleflight が実際にどのように動作するかを簡単なデモで見てみましょう。
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
ここで何が起こっているのか:
5 つのゴルーチンが 60 ミリ秒の間隔でほぼ同時に同じデータをフェッチしようとする状況をシミュレートしています。シンプルにするために、データベースから取得したデータを模倣するために乱数を使用しています。
singleflight.Group を使用すると、最初のゴルーチンだけが実際に fetchData() を実行し、残りは結果を待つようになります。
行 v、err、shared := g.Do("key-fetch-data", fetchData) は、これらのリクエストを追跡するために一意のキー ("key-fetch-data") を割り当てます。したがって、最初のゴルーチンがまだデータをフェッチしている間に、別のゴルーチンが同じキーを要求した場合、新しい呼び出しを開始するのではなく、結果を待ちます。
最初の呼び出しが完了すると、出力に見られるように、待機中のゴルーチンは同じ結果を取得します。データを要求するゴルーチンが 5 つありましたが、fetchData は 2 回しか実行されませんでした。これは大幅な向上です。
共有フラグは、結果が複数のゴルーチン間で再利用されたことを確認します。
「しかし、最初のゴルーチンの共有フラグが true なのはなぜですか? 待機しているゴルーチンだけが == true を共有すると思ったのですが?」
はい、待機中のゴルーチンだけが == true を共有する必要があると考えている場合、これは少し直観に反するように感じるかもしれません。
実際のところ、g.Do の共有変数は、結果が複数の呼び出し元間で共有されたかどうかを示します。これは基本的に、「この結果は複数の発信者によって使用されました」と言っていることになります。これは関数を実行した人に関するものではなく、結果が複数のゴルーチン間で再利用されたことを示す信号です。
「キャッシュがあるのに、なぜシングルフライトが必要なのですか?」
簡単に言うと、キャッシュとシングルフライトはさまざまな問題を解決し、実際には非常にうまく連携して機能します。
外部キャッシュ (Redis や Memcached など) を使用したセットアップでは、singleflight により、データベースだけでなくキャッシュ自体にも追加の保護層が追加されます。
さらに、singleflight はキャッシュ ミス ストーム (「キャッシュ スタンピード」と呼ばれることもあります) からの保護にも役立ちます。
通常、リクエストがデータを要求するとき、データがキャッシュ内にあれば問題ありません。それはキャッシュ ヒットです。データがキャッシュにない場合、それはキャッシュミスです。キャッシュが再構築される前に 10,000 件のリクエストが一度にシステムにヒットしたと仮定すると、データベースは同時に 10,000 件の同一のクエリで突然停止する可能性があります。
このピーク時、singleflight は、10,000 件のリクエストのうち実際にデータベースにヒットするのは 1 つだけであることを保証します。
しかし、後の内部実装セクションでは、singleflight がグローバル ロックを使用して、すべてのゴルーチンの単一の競合点になる可能性があるインフライト呼び出しのマップを保護することがわかります。これにより、特に高い同時実行性を処理している場合、処理が遅くなる可能性があります。
以下のモデルは、複数の CPU を搭載したマシンでより適切に動作する可能性があります:
このセットアップでは、キャッシュミスが発生した場合にのみシングルフライトを使用します。
singleflight を使用するには、まず Group オブジェクトを作成します。これは、特定のキーにリンクされた進行中の関数呼び出しを追跡する中心的な構造です。
重複呼び出しを防ぐための 2 つの主要なメソッドがあります。
g.Do() の使用方法はデモですでに説明しました。変更されたラッパー関数で g.DoChan() を使用する方法を確認してみましょう。
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
正直に言うと、ここでの DoChan() の使用は、Do() と比べてあまり変わりません。これは、基本的に同じことをブロックしているチャネル受信操作 (
DoChan() が威力を発揮するのは、ゴルーチンをブロックせずに操作を開始して他の作業を実行したい場合です。たとえば、次のチャネルを使用すると、タイムアウトやキャンセルをより適切に処理できます。
package singleflight type Result struct { Val interface{} Err error Shared bool }
この例では、実際のシナリオで遭遇する可能性のあるいくつかの問題も引き起こします。
はい、singleflight は、進行中の実行を破棄できる group.Forget(key) メソッドを使用してこのような状況を処理する方法を提供します。
Forget() メソッドは、進行中の関数呼び出しを追跡する内部マップからキーを削除します。これはキーを「無効にする」ようなものなので、そのキーを使用して g.Do() を再度呼び出すと、前回の実行が終了するのを待つのではなく、あたかも新しいリクエストであるかのように関数が実行されます。
Forget() を使用するように例を更新し、関数が実際に何回呼び出されるかを確認してみましょう。
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
Goroutine 0 と Goroutine 1 はどちらも同じキー (「key-fetch-data」) で Do() を呼び出し、それらのリクエストは 1 つの実行に結合され、結果は 2 つのゴルーチン間で共有されます。
一方、Goroutine 2 は、Do() を実行する前に Forget() を呼び出します。これにより、「key-fetch-data」に関連付けられた以前の結果がすべてクリアされるため、関数の新しい実行がトリガーされます。要約すると、singleflight は便利ですが、次のような特殊なケースも発生する可能性があります。
シングルフライトの仕組み
基本的に、すべての一意のキーは、その実行を管理する構造体を取得します。ゴルーチンが Do() を呼び出し、キーがすでに存在していることが判明した場合、その呼び出しは最初の実行が完了するまでブロックされます。構造は次のとおりです。
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }ここでは 2 つの同期プリミティブが使用されています:
もう 1 つのメソッド group.DoChan() も同様に機能するため、ここでは group.Do() メソッドに焦点を当てます。 group.Forget() メソッドも、マップからキーを削除するだけなので簡単です。
group.Do() を呼び出すと、最初に呼び出しのマップ全体 (g.mu) がロックされます。
「それってパフォーマンス悪くないですか?」
そうですね、singleflight はキー全体をロックするため、あらゆる場合にパフォーマンスが理想的であるとは限りません (常に最初にベンチマークを行うことをお勧めします)。パフォーマンスの向上を目指している場合、または大規模な作業を行っている場合は、キーをシャーディングまたは分散することをお勧めします。単一のフライト グループを 1 つだけ使用する代わりに、「マルチフライト」を実行するのと同じように、複数のグループに負荷を分散できます
参考として、このリポジトリ shardedsingleflight をチェックしてください。
ロックを取得すると、指定されたキーに対して進行中または完了した呼び出しがすでに存在する場合、グループは内部マップ (g.m) を調べます。このマップは、進行中または完了した作業を追跡し、キーは対応するタスクにマッピングされます。
キーが見つかった場合 (別の goroutine がすでにタスクを実行している場合)、新しい呼び出しを開始する代わりに、カウンター (c.dups) をインクリメントして重複したリクエストを追跡します。その後、ゴルーチンはロックを解放し、関連付けられた WaitGroup で call.wg.Wait() を呼び出して元のタスクが完了するのを待ちます。
元のタスクが完了すると、このゴルーチンは結果を取得し、タスクの再実行を回避します。
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
他の goroutine がそのキーを処理していない場合、現在の goroutine がタスクの実行を担当します。
この時点で、新しい呼び出しオブジェクトを作成し、それをマップに追加し、その WaitGroup を初期化します。次に、ミューテックスのロックを解除し、ヘルパー メソッド g.doCall(c, key, fn) を介して自分自身でタスクの実行を開始します。タスクが完了すると、待機中のゴルーチンはすべて wg.Wait() 呼び出しによってブロック解除されます。
ここでは、エラーの処理方法を除いて、あまりワイルドなものはありません。考えられるシナリオは 3 つあります。
ここから、ヘルパー メソッド g.doCall() がもう少し賢くなり始めます。
「待って、runtime.Goexit() とは何ですか?」
コードの説明に入る前に、runtime.Goexit() は goroutine の実行を停止するために使用されます。
ゴルーチンが Goexit() を呼び出すと、ゴルーチンは停止しますが、遅延関数は通常と同様に後入れ先出し (LIFO) 順序で実行されます。これはパニックに似ていますが、いくつかの違いがあります。
さて、ここに興味深い癖があります (私たちのトピックとは直接関係ありませんが、言及する価値はあります)。メインの goroutine (main() 内など) で runtime.Goexit() を呼び出す場合は、次を確認してください。
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
何が起こるかというと、Goexit() はメインの goroutine を終了しますが、まだ実行中の他の goroutine がある場合、少なくとも 1 つの goroutine がアクティブである限り Go ランタイムは生き続けるため、プログラムは続行されます。ただし、ゴルーチンがなくなると、「ゴルーチンがありません」エラーでクラッシュします。これは、ちょっとした面白いケースです。
ここでコードに戻りますが、runtime.Goexit() が現在の goroutine を終了するだけで、recover() でキャッチできない場合、それが呼び出されているかどうかをどのように検出すればよいでしょうか?
重要なのは、runtime.Goexit() が呼び出されたとき、それ以降のコードは実行されないという事実にあります。
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
上記の場合、runtime.Goexit() の呼び出し後に、normalReturn = true という行が実行されることはありません。したがって、defer 内で、normalReturn がまだ false であるかどうかをチェックして、特別なメソッドが呼び出されたことを検出できます。
次のステップは、タスクがパニック状態にあるかどうかを判断することです。そのために、通常の戻り値としてrecover()を使用しますが、singleflightの実際のコードはもう少し微妙です:
package singleflight type Result struct { Val interface{} Err error Shared bool }
このコードは、recover ブロック内で直接 Recovery = true を設定するのではなく、最後の行として、recover() ブロックの後に Recovery を設定することで、少し派手になっています。
では、なぜこれが機能するのでしょうか?
runtime.Goexit() が呼び出されると、panic() と同様に goroutine 全体が終了します。ただし、panic() がリカバリされた場合、ゴルーチン全体ではなく、panic() とcover() の間の関数チェーンのみが終了します。
そのため、recovered = true は、recover() を含む defer の外側で設定され、関数が正常に完了したとき、またはパニックが回復したときの 2 つの場合にのみ実行されますが、runtime.Goexit() が呼び出されたときは実行されません。
今後、各ケースがどのように処理されるかについて説明します。
func fetchDataWrapperWithTimeout(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) select { case res := <-ch: if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) case <-time.After(50 * time.Millisecond): return fmt.Errorf("timeout waiting for result") } return nil }
タスクの実行中にパニックが発生した場合、パニックは捕らえられ、c.err に PanicError として保存されます。これには、パニック値とスタック トレースの両方が保持されます。 singleflight はパニックをキャッチして正常にクリーンアップしますが、それを飲み込むのではなく、その状態を処理した後にパニックを再スローします。
つまり、タスクを実行しているゴルーチン (操作を開始する最初のゴルーチン) でパニックが発生し、結果を待っている他のすべてのゴルーチンもパニックになります。
このパニックは開発者のコードで発生するため、適切に対処するのは私たちの責任です。
ここで、考慮する必要がある特殊なケースがまだあります。それは、他のゴルーチンが group.DoChan() メソッドを使用し、チャネル経由で結果を待っている場合です。この場合、singleflight はそれらのゴルーチンでパニックを起こすことはできません。代わりに、回復不能なパニック (go Panic(e)) と呼ばれるものが発生し、アプリケーションがクラッシュします。
最後に、タスクが runtime.Goexit() を呼び出した場合、ゴルーチンはすでにシャットダウン処理中であるため、それ以上のアクションを実行する必要はなく、干渉せずにそのまま終了します。
これでほぼ終わりです。これまで説明してきた特殊なケースを除いて、それほど複雑なことはありません。
こんにちは、私は VictoriaMetrics のソフトウェア エンジニア、Phuong Le です。上記の文体は、必ずしも学術的な正確さと完全に一致しているわけではないとしても、明快さと簡潔さに重点を置き、理解しやすい方法で概念を説明しています。
古いものを見つけた場合、またはご質問がある場合は、遠慮なくご連絡ください。 X(@func25)にDMを送ってください。
興味があるかもしれないその他の投稿:
サービスを監視し、メトリクスを追跡し、すべてがどのように実行されるかを確認したい場合は、VictoriaMetrics をチェックしてみるとよいでしょう。これは、インフラストラクチャを監視するための高速、オープンソース、コスト削減の方法です。
私たちは Gopher であり、Go とそのエコシステムに関する研究、実験、知識の共有が大好きな愛好家です。
以上がGo Singleflight は DB ではなくコードに溶け込みますの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。