私は、あれこれキャッシュする必要がある状況に陥ることがよくあります。多くの場合、これらの値は一定期間キャッシュされます。おそらくこのパターンに精通しているでしょう。キャッシュから値を取得しようとしましたが、成功した場合は、その値を呼び出し元に返して終了します。値が存在しない場合は、(おそらくデータベースから) 値を取得するか、計算してキャッシュに入れます。ほとんどの場合、これはうまく機能します。ただし、キャッシュ エントリに使用しているキーが頻繁にアクセスされ、データの計算操作に時間がかかる場合、複数の並列リクエストが同時にキャッシュ ミスを起こす状況に陥ります。これらのリクエストはすべて、ソースから独立してロードし、値をキャッシュに保存します。これによりリソースが無駄になり、サービス拒否につながる可能性もあります。
例を挙げて説明しましょう。キャッシュには redis を使用し、その上にシンプルな Go http サーバーを使用します。完全なコードは次のとおりです:
package main import ( "errors" "log" "net/http" "time" "github.com/redis/go-redis/v9" ) type handler struct { rdb *redis.Client cacheTTL time.Duration } func (ch *handler) simple(w http.ResponseWriter, r *http.Request) { cacheKey := "my_cache_key" // we'll use 200 to signify a cache hit & 201 to signify a miss responseCode := http.StatusOK cachedData, err := ch.rdb.Get(r.Context(), cacheKey).Result() if err != nil { if !errors.Is(err, redis.Nil) { log.Println("could not reach redis", err.Error()) http.Error(w, "could not reach redis", http.StatusInternalServerError) return } // cache miss - fetch & store res := longRunningOperation() responseCode = http.StatusCreated err = ch.rdb.Set(r.Context(), cacheKey, res, ch.cacheTTL).Err() if err != nil { log.Println("failed to set cache value", err.Error()) http.Error(w, "failed to set cache value", http.StatusInternalServerError) return } cachedData = res } w.WriteHeader(responseCode) _, _ = w.Write([]byte(cachedData)) } func longRunningOperation() string { time.Sleep(time.Millisecond * 500) return "hello" } func main() { ttl := time.Second * 3 rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) handler := &handler{ rdb: rdb, cacheTTL: ttl, } http.HandleFunc("/simple", handler.simple) if err := http.ListenAndServe(":8080", nil); err != nil { log.Fatalf("Could not start server: %s\n", err.Error()) } }
/simple エンドポイントに負荷をかけて、何が起こるかを見てみましょう。これにはベジータを使用します。
ベジータ攻撃を実行します -duration=30s -rate=500 -targets=./targets_simple.txt > res_simple.bin。ベジータは、30 秒間毎秒 500 件のリクエストを行うことになります。それぞれ 100 ミリ秒にわたるバケットを含む HTTP 結果コードのヒストグラムとしてグラフ化します。結果は次のグラフです。
実験を開始すると、キャッシュは空です。そこには値が格納されていません。大量のリクエストがサーバーに到達すると、最初の殺到が起こります。それらはすべてキャッシュをチェックして何も見つからず、longRunningOperation を呼び出してキャッシュに保存します。 longRunningOperation が完了するまでに最大 500 ミリ秒かかるため、最初の 500 ミリ秒以内に行われたリクエストはすべて、longRunningOperation を呼び出すことになります。リクエストの 1 つがキャッシュに値を保存できると、後続のすべてのリクエストがその値をキャッシュから取得し、ステータス コード 200 のレスポンスが表示され始めます。その後、redis の有効期限メカニズムが開始されると、このパターンが 3 秒ごとに繰り返されます。
このおもちゃの例では、これは問題を引き起こしませんが、実稼働環境では、システムへの不必要な負荷、ユーザー エクスペリエンスの低下、さらには自己誘発的なサービス拒否につながる可能性があります。では、どうすればこれを防ぐことができるでしょうか?まあ、方法はいくつかあります。ロックを導入することもできます。キャッシュ ミスが発生すると、コードがロックを達成しようとします。分散ロックは簡単なことではなく、多くの場合、分散ロックには微妙な処理が必要な微妙なエッジケースが存在します。バックグラウンド ジョブを使用して値を定期的に再計算することもできますが、これには追加のプロセスを実行する必要があり、コード内で維持および監視する必要があるさらに別の歯車が導入されます。このアプローチは、動的キャッシュ キーがある場合にも実行できない可能性があります。確率的早期有効期限と呼ばれる別のアプローチがあり、これについてはさらに検討していきたいと考えています。
この手法を使用すると、確率に基づいて値を再計算できます。キャッシュから値をフェッチするときは、確率に基づいてキャッシュ値を再生成する必要があるかどうかも計算します。既存の価値の有効期限に近づくほど、確率は高くなります。
私は、Optimal Probabilistic Cache Stampede Prevention の A. Vattani、F.Chierichetti、K. Lowenstein による XFetch の具体的な実装に基づいています。
HTTP サーバーに新しいエンドポイントを導入します。これも負荷の高い計算を実行しますが、今回はキャッシュするときに XFetch を使用します。 XFetch が機能するには、高価な操作にかかった時間 (デルタ) とキャッシュ キーの有効期限がいつ切れるかを保存する必要があります。それを達成するために、これらの値とメッセージ自体を保持する構造体を導入します。
type probabilisticValue struct { Message string Expiry time.Time Delta time.Duration }
元のメッセージをこれらの属性でラップし、redis に保存するためにシリアル化する関数を追加します。
func wrapMessage(message string, delta, cacheTTL time.Duration) (string, error) { bts, err := json.Marshal(probabilisticValue{ Message: message, Delta: delta, Expiry: time.Now().Add(cacheTTL), }) if err != nil { return "", fmt.Errorf("could not marshal message: %w", err) } return string(bts), nil }
値を再計算して Redis に保存するメソッドも書きましょう:
func (ch *handler) recomputeValue(ctx context.Context, cacheKey string) (string, error) { start := time.Now() message := longRunningOperation() delta := time.Since(start) wrapped, err := wrapMessage(message, delta, ch.cacheTTL) if err != nil { return "", fmt.Errorf("could not wrap message: %w", err) } err = ch.rdb.Set(ctx, cacheKey, wrapped, ch.cacheTTL).Err() if err != nil { return "", fmt.Errorf("could not save value: %w", err) } return message, nil }
確率に基づいて値を更新する必要があるかどうかを判断するには、probabilisticValue にメソッドを追加します。
func (pv probabilisticValue) shouldUpdate() bool { // suggested default param in XFetch implementation // if increased - results in earlier expirations beta := 1.0 now := time.Now() scaledGap := pv.Delta.Seconds() * beta * math.Log(rand.Float64()) return now.Sub(pv.Expiry).Seconds() >= scaledGap }
すべてをフックすると、次のハンドラーが完成します:
func (ch *handler) probabilistic(w http.ResponseWriter, r *http.Request) { cacheKey := "probabilistic_cache_key" // we'll use 200 to signify a cache hit & 201 to signify a miss responseCode := http.StatusOK cachedData, err := ch.rdb.Get(r.Context(), cacheKey).Result() if err != nil { if !errors.Is(err, redis.Nil) { log.Println("could not reach redis", err.Error()) http.Error(w, "could not reach redis", http.StatusInternalServerError) return } res, err := ch.recomputeValue(r.Context(), cacheKey) if err != nil { log.Println("could not recompute value", err.Error()) http.Error(w, "could not recompute value", http.StatusInternalServerError) return } responseCode = http.StatusCreated cachedData = res w.WriteHeader(responseCode) _, _ = w.Write([]byte(cachedData)) return } pv := probabilisticValue{} err = json.Unmarshal([]byte(cachedData), &pv) if err != nil { log.Println("could not unmarshal probabilistic value", err.Error()) http.Error(w, "could not unmarshal probabilistic value", http.StatusInternalServerError) return } if pv.shouldUpdate() { _, err := ch.recomputeValue(r.Context(), cacheKey) if err != nil { log.Println("could not recompute value", err.Error()) http.Error(w, "could not recompute value", http.StatusInternalServerError) return } responseCode = http.StatusAccepted } w.WriteHeader(responseCode) _, _ = w.Write([]byte(cachedData)) }
ハンドラーは最初のハンドラーとほぼ同じように動作しますが、キャッシュ ヒットを取得するとサイコロを振ります。結果に応じて、フェッチしたばかりの値を返すか、値を早めに更新します。
HTTP ステータス コードを使用して、次の 3 つのケースのどちらかを判断します。
今度は新しいエンドポイントに対して実行して vegeta を再度起動します。結果は次のとおりです。
そこにある小さな青い塊は、実際にキャッシュ値を早期に更新し終えた時期を示しています。最初のウォームアップ期間の後にキャッシュミスが発生することはなくなりました。ユースケースにとって重要な場合は、初期のスパイクを回避するために、キャッシュされた値を事前に保存できます。
キャッシュをより積極的に行い、値をより頻繁に更新したい場合は、ベータ パラメーターを使用してみてください。ベータパラメータを 2 に設定した場合の同じ実験は次のようになります:
確率的な更新がより頻繁に行われるようになりました。
全体として、これはキャッシュ スタンピードを回避するのに役立つちょっとした巧妙なテクニックです。ただし、これはキャッシュから同じキーを定期的に取得する場合にのみ機能することに注意してください。そうでない場合は、あまりメリットがありません。
キャッシュスタンピードに対処する別の方法はありますか?間違いに気づきましたか?以下のコメント欄でお知らせください!
以上がGo における確率的な早期有効期限切れの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。