首页 >后端开发 >Golang >Go 中的概率提前过期

Go 中的概率提前过期

Mary-Kate Olsen
Mary-Kate Olsen原创
2024-09-29 06:19:02754浏览

关于缓存踩踏

我经常遇到需要缓存这个或那个的情况。通常,这些值会被缓存一段时间。您可能熟悉这种模式。您尝试从缓存中获取一个值,如果成功,则将其返回给调用者并结束。如果该值不存在,您将获取它(很可能从数据库中)或计算它并将其放入缓存中。在大多数情况下,这非常有效。但是,如果您用于缓存条目的密钥被频繁访问,并且计算数据的操作需要一段时间,您最终会遇到多个并行请求同时发生缓存未命中的情况。所有这些请求都将从源独立加载并将值存储在缓存中。这会导致资源浪费,甚至可能导致拒绝服务。

让我用一个例子来说明。我将使用 Redis 进行缓存,并在顶部使用一个简单的 Go http 服务器。完整代码如下:

package main

import (
    "errors"
    "log"
    "net/http"
    "time"

    "github.com/redis/go-redis/v9"
)

type handler struct {
    rdb *redis.Client
    cacheTTL time.Duration
}

func (ch *handler) simple(w http.ResponseWriter, r *http.Request) {
    cacheKey := "my_cache_key"
    // we'll use 200 to signify a cache hit & 201 to signify a miss
    responseCode := http.StatusOK
    cachedData, err := ch.rdb.Get(r.Context(), cacheKey).Result()
    if err != nil {
        if !errors.Is(err, redis.Nil) {
            log.Println("could not reach redis", err.Error())
            http.Error(w, "could not reach redis", http.StatusInternalServerError)
            return
        }

        // cache miss - fetch & store
        res := longRunningOperation()
        responseCode = http.StatusCreated

        err = ch.rdb.Set(r.Context(), cacheKey, res, ch.cacheTTL).Err()
        if err != nil {
            log.Println("failed to set cache value", err.Error())
            http.Error(w, "failed to set cache value", http.StatusInternalServerError)
            return
        }
        cachedData = res
    }
    w.WriteHeader(responseCode)
    _, _ = w.Write([]byte(cachedData))
}

func longRunningOperation() string {
    time.Sleep(time.Millisecond * 500)
    return "hello"
}

func main() {
    ttl := time.Second * 3
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    handler := &handler{
        rdb: rdb,
        cacheTTL: ttl,
    }

    http.HandleFunc("/simple", handler.simple)
    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatalf("Could not start server: %s\n", err.Error())
    }
}

让我们在 /simple 端点上施加一些负载,看看会发生什么。我将使用 vegeta 来做这个。

我运行 vegeta Attack -duration=30s -rate=500 -targets=./targets_simple.txt > res_simple.bin。 Vegeta 最终每秒发出 500 个请求,持续 30 秒。我将它们绘制为 HTTP 结果代码的直方图,每个桶的跨度为 100 毫秒。结果如下图。

Probabilistic Early Expiration in Go

当我们开始实验时,缓存是空的 - 我们没有存储任何值。当一堆请求到达我们的服务器时,我们得到了最初的踩踏。他们都检查缓存,没有发现任何内容,调用 longRunningOperation 并将其存储在缓存中。由于 longRunningOperation 大约需要 500 毫秒才能完成前 500 毫秒内发出的任何请求,最终都会调用 longRunningOperation。一旦其中一个请求设法将值存储在缓存中,所有后续请求都会从缓存中获取该值,我们开始看到状态代码为 200 的响应。然后,随着 Redis 上的过期机制启动,该模式每 3 秒重复一次。

在这个玩具示例中,这不会导致任何问题,但在生产环境中,这可能会导致系统不必要的负载、用户体验下降,甚至导致自我拒绝服务。那么我们怎样才能防止这种情况发生呢?嗯,有几种方法。我们可以引入锁——任何缓存未命中都会导致代码尝试实现锁。分布式锁定并不是一件简单的事情,而且它们通常具有需要微妙处理的微妙边缘情况。我们还可以使用后台作业定期重新计算该值,但这需要运行一个额外的进程,引入另一个需要在我们的代码中维护和监视的齿轮。如果您有动态缓存键,则此方法也可能不可行。还有另一种方法,称为概率提前过期,这是我想进一步探索的方法。

概率提前到期

这种技术允许人们根据概率重新计算值。从缓存中获取值时,您还可以根据概率计算是否需要重新生成缓存值。越接近现有价值到期,概率就越高。

我的具体实现基于 A. Vattani、F.Chierichetti 和 K. Lowenstein 在《最优概率缓存踩踏预防》中的 XFetch。

我将在 HTTP 服务器上引入一个新端点,该端点也将执行昂贵的计算,但这次在缓存时使用 XFetch。为了使 XFetch 工作,我们需要存储昂贵的操作花费了多长时间(增量)以及缓存键何时过期。为了实现这一目标,我将引入一个结构体来保存这些值以及消息本身:

type probabilisticValue struct {
    Message string
    Expiry time.Time
    Delta time.Duration
}

我添加了一个函数来用这些属性包装原始消息并将其序列化以存储在 redis 中:

func wrapMessage(message string, delta, cacheTTL time.Duration) (string, error) {
    bts, err := json.Marshal(probabilisticValue{
        Message: message,
        Delta: delta,
        Expiry: time.Now().Add(cacheTTL),
    })
    if err != nil {
        return "", fmt.Errorf("could not marshal message: %w", err)
    }

    return string(bts), nil
}

我们还编写一个方法来重新计算并将值存储在redis中:

func (ch *handler) recomputeValue(ctx context.Context, cacheKey string) (string, error) {
    start := time.Now()
    message := longRunningOperation()
    delta := time.Since(start)

    wrapped, err := wrapMessage(message, delta, ch.cacheTTL)
    if err != nil {
        return "", fmt.Errorf("could not wrap message: %w", err)
    }
    err = ch.rdb.Set(ctx, cacheKey, wrapped, ch.cacheTTL).Err()
    if err != nil {
        return "", fmt.Errorf("could not save value: %w", err)
    }
    return message, nil
}

为了确定是否需要根据概率更新值,我们可以在 probabilisticValue 中添加一个方法:

func (pv probabilisticValue) shouldUpdate() bool {
    // suggested default param in XFetch implementation
    // if increased - results in earlier expirations
    beta := 1.0
    now := time.Now()
    scaledGap := pv.Delta.Seconds() * beta * math.Log(rand.Float64())
    return now.Sub(pv.Expiry).Seconds() >= scaledGap
}

如果我们将其全部连接起来,我们最终会得到以下处理程序:

func (ch *handler) probabilistic(w http.ResponseWriter, r *http.Request) {
    cacheKey := "probabilistic_cache_key"
    // we'll use 200 to signify a cache hit & 201 to signify a miss
    responseCode := http.StatusOK
    cachedData, err := ch.rdb.Get(r.Context(), cacheKey).Result()
    if err != nil {
        if !errors.Is(err, redis.Nil) {
            log.Println("could not reach redis", err.Error())
            http.Error(w, "could not reach redis", http.StatusInternalServerError)
            return
        }

        res, err := ch.recomputeValue(r.Context(), cacheKey)
        if err != nil {
            log.Println("could not recompute value", err.Error())
            http.Error(w, "could not recompute value", http.StatusInternalServerError)
            return
        }
        responseCode = http.StatusCreated
        cachedData = res

        w.WriteHeader(responseCode)
        _, _ = w.Write([]byte(cachedData))
        return
    }

    pv := probabilisticValue{}
    err = json.Unmarshal([]byte(cachedData), &pv)
    if err != nil {
        log.Println("could not unmarshal probabilistic value", err.Error())
        http.Error(w, "could not unmarshal probabilistic value", http.StatusInternalServerError)
        return
    }

    if pv.shouldUpdate() {
        _, err := ch.recomputeValue(r.Context(), cacheKey)
        if err != nil {
            log.Println("could not recompute value", err.Error())
            http.Error(w, "could not recompute value", http.StatusInternalServerError)
            return
        }
        responseCode = http.StatusAccepted
    }

    w.WriteHeader(responseCode)
    _, _ = w.Write([]byte(cachedData))
}

处理程序的工作方式与第一个处理程序非常相似,但是,在获得缓存命中后,我们就掷骰子。根据结果​​,我们要么只返回刚刚获取的值,要么提前更新该值。

我们将使用 HTTP 状态代码来确定以下 3 种情况:

  • 200 - 我们从缓存返回值
  • 201 - 缓存未命中,没有值
  • 202 - 缓存命中,触发概率更新

这次我再次启动 vegeta 在新端点上运行,结果如下:

Probabilistic Early Expiration in Go

Gumpalan biru kecil di sana menunjukkan apabila kami sebenarnya telah mengemas kini nilai cache lebih awal. Kami tidak lagi melihat kehilangan cache selepas tempoh pemanasan awal. Untuk mengelakkan lonjakan awal, anda boleh pra-simpan nilai cache jika ini penting untuk kes penggunaan anda.

Jika anda ingin menjadi lebih agresif dengan caching anda dan memuat semula nilai dengan lebih kerap, anda boleh bermain dengan parameter beta. Begini rupa percubaan yang sama dengan param beta yang ditetapkan kepada 2:

Probabilistic Early Expiration in Go

Kami kini melihat kemas kini berkebarangkalian dengan lebih kerap.

Semuanya ini adalah teknik kecil yang kemas yang boleh membantu mengelakkan rempuhan cache. Perlu diingat, ini hanya berfungsi jika anda mengambil kunci yang sama secara berkala daripada cache - jika tidak, anda tidak akan melihat banyak manfaat.

Ada cara lain untuk menangani rempuhan cache? perasan kesilapan? Beritahu saya dalam ulasan di bawah!

以上是Go 中的概率提前过期的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn