>백엔드 개발 >Golang >Golang의 캐시 라이브러리 freecache에 대해 알아보는 기사 1개

Golang의 캐시 라이브러리 freecache에 대해 알아보는 기사 1개

青灯夜游
青灯夜游앞으로
2022-02-21 10:43:425843검색

이 글은 Golangcache를 이해하고 Golang의 캐시 라이브러리 freecache를 간단하게 소개하는 글이 모든 사람에게 도움이 되기를 바랍니다.

Golang의 캐시 라이브러리 freecache에 대해 알아보는 기사 1개

Go는 일반적으로 캐시 시나리오를 개발하기 위해 맵 또는 캐시 프레임워크를 사용합니다. 스레드 안전을 위해 sync.Map 또는 스레드 안전 캐시 프레임워크가 사용됩니다. sync.Map或线程安全的缓存框架。

缓存场景中如果数据量大于百万级别,需要特别考虑数据类型对于gc的影响(注意string类型底层是指针+Len+Cap,因此也算是指针类型),如果缓存key和value都是非指针类型的话就无需多虑了。【相关推荐:Go视频教程

但实际应用场景中,key和value是(包含)指针类型数据是很常见的,因此使用缓存框架需要特别注意其对gc影响,从是否对GC影响角度来看缓存框架大致分为2类:

  • 零GC开销:比如freecache或bigcache这种,底层基于ringbuf,减小指针个数;
  • 有GC开销:直接基于Map来实现的缓存框架。

对于map而言,gc时会扫描所有key/value键值对,如果其都是基本类型,那么gc便不会再扫描。

下面以freecache为例分析下其实现原理,代码示例如下:

func main() {
   cacheSize := 100 * 1024 * 1024
   cache := freecache.NewCache(cacheSize)

   for i := 0; i < N; i++ {
      str := strconv.Itoa(i)
      _ = cache.Set([]byte(str), []byte(str), 1)
   }

   now := time.Now()
   runtime.GC()
   fmt.Printf("freecache, GC took: %s\n", time.Since(now))
   _, _ = cache.Get([]byte("aa"))

   now = time.Now()
   for i := 0; i < N; i++ {
      str := strconv.Itoa(i)
      _, _ = cache.Get([]byte(str))
   }
   fmt.Printf("freecache, Get took: %s\n\n", time.Since(now))
}

1 初始化

freecache.NewCache会初始化本地缓存,size表示存储空间大小,freecache会初始化256个segment,每个segment是独立的存储单元,freecache加锁维度也是基于segment的,每个segment有一个ringbuf,初始大小为size/256。freecache号称零GC的来源就是其指针是固定的,只有512个,每个segment有2个,分别是rb和slotData(注意切片为指针类型)。

type segment struct {
   rb            RingBuf // ring buffer that stores data
   segId         int
   _             uint32  // 占位
   missCount     int64
   hitCount      int64
   entryCount    int64
   totalCount    int64      // number of entries in ring buffer, including deleted entries.
   totalTime     int64      // used to calculate least recent used entry.
   timer         Timer      // Timer giving current time
   totalEvacuate int64      // used for debug
   totalExpired  int64      // used for debug
   overwrites    int64      // used for debug
   touched       int64      // used for debug
   vacuumLen     int64      // up to vacuumLen, new data can be written without overwriting old data.
   slotLens      [256]int32 // The actual length for every slot.
   slotCap       int32      // max number of entry pointers a slot can hold.
   slotsData     []entryPtr // 索引指针
}

func NewCacheCustomTimer(size int, timer Timer) (cache *Cache) {
    cache = new(Cache)
    for i := 0; i < segmentCount; i++ {
        cache.segments[i] = newSegment(size/segmentCount, i, timer)
    }
}
func newSegment(bufSize int, segId int, timer Timer) (seg segment) {
    seg.rb = NewRingBuf(bufSize, 0)
    seg.segId = segId
    seg.timer = timer
    seg.vacuumLen = int64(bufSize)
    seg.slotCap = 1
    seg.slotsData = make([]entryPtr, 256*seg.slotCap) // 每个slotData初始化256个单位大小
}

2 读写流程

freecache的key和value都是[]byte数组,使用时需要自行序列化和反序列化,如果缓存复杂对象不可忽略其序列化和反序列化带来的影响,首先看下Set流程:

_ = cache.Set([]byte(str), []byte(str), 1)

Set流程首先对key进行hash,hashVal类型uint64,其低8位segID对应segment数组,低8-15位表示slotId对应slotsData下标,高16位表示slotsData下标对应的[]entryPtr某个数据,这里需要查找操作。注意[]entryPtr数组大小为slotCap(初始为1),当扩容时会slotCap倍增。

每个segment对应一个lock(sync.Mutex),因此其能够支持较大并发量,而不像sync.Map只有一个锁。

func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
   hashVal := hashFunc(key)
   segID := hashVal & segmentAndOpVal // 低8位
   cache.locks[segID].Lock() // 加锁
   err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
   cache.locks[segID].Unlock()
}

func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) {
   slotId := uint8(hashVal >> 8)
   hash16 := uint16(hashVal >> 16)
   slot := seg.getSlot(slotId)
   idx, match := seg.lookup(slot, hash16, key)

   var hdrBuf [ENTRY_HDR_SIZE]byte
   hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
   if match { // 有数据更新操作
      matchedPtr := &slot[idx]
      seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset)
      hdr.slotId = slotId
      hdr.hash16 = hash16
      hdr.keyLen = uint16(len(key))
      originAccessTime := hdr.accessTime
      hdr.accessTime = now
      hdr.expireAt = expireAt
      hdr.valLen = uint32(len(value))
      if hdr.valCap >= hdr.valLen {
         // 已存在数据value空间能存下此次value大小
         atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime))
         seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset)
         seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
         atomic.AddInt64(&seg.overwrites, 1)
         return
      }
      // 删除对应entryPtr,涉及到slotsData内存copy,ringbug中只是标记删除
      seg.delEntryPtr(slotId, slot, idx)
      match = false
      // increase capacity and limit entry len.
      for hdr.valCap < hdr.valLen {
         hdr.valCap *= 2
      }
      if hdr.valCap > uint32(maxKeyValLen-len(key)) {
         hdr.valCap = uint32(maxKeyValLen - len(key))
      }
   } else { // 无数据
      hdr.slotId = slotId
      hdr.hash16 = hash16
      hdr.keyLen = uint16(len(key))
      hdr.accessTime = now
      hdr.expireAt = expireAt
      hdr.valLen = uint32(len(value))
      hdr.valCap = uint32(len(value))
      if hdr.valCap == 0 { // avoid infinite loop when increasing capacity.
         hdr.valCap = 1
      }
   }
   
   // 数据实际长度为 ENTRY_HDR_SIZE=24 + key和value的长度    
   entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap)
   slotModified := seg.evacuate(entryLen, slotId, now)
   if slotModified {
      // the slot has been modified during evacuation, we need to looked up for the &#39;idx&#39; again.
      // otherwise there would be index out of bound error.
      slot = seg.getSlot(slotId)
      idx, match = seg.lookup(slot, hash16, key)
      // assert(match == false)
   }
   newOff := seg.rb.End()
   seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen)
   seg.rb.Write(hdrBuf[:])
   seg.rb.Write(key)
   seg.rb.Write(value)
   seg.rb.Skip(int64(hdr.valCap - hdr.valLen))
   atomic.AddInt64(&seg.totalTime, int64(now))
   atomic.AddInt64(&seg.totalCount, 1)
   seg.vacuumLen -= entryLen
   return
}

seg.evacuate会评估ringbuf是否有足够空间存储key/value,如果空间不够,其会从空闲空间尾部后一位(也就是待淘汰数据的开始位置)开始扫描(oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()),如果对应数据已被逻辑deleted或者已过期,那么该块内存可以直接回收,如果不满足回收条件,则将entry从环头调换到环尾,再更新entry的索引,如果这样循环5次还是不行,那么需要将当前oldHdrBuf回收以满足内存需要。

执行完seg.evacuate所需空间肯定是能满足的,然后就是写入索引和数据了,insertEntryPtr就是写入索引操作,当[]entryPtr中元素个数大于seg.slotCap(初始1)时,需要扩容操作,对应方法见seg.expand,这里不再赘述。

写入ringbuf就是执行rb.Write即可。

func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModified bool) {
   var oldHdrBuf [ENTRY_HDR_SIZE]byte
   consecutiveEvacuate := 0
   for seg.vacuumLen < entryLen {
      oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()
      seg.rb.ReadAt(oldHdrBuf[:], oldOff)
      oldHdr := (*entryHdr)(unsafe.Pointer(&oldHdrBuf[0]))
      oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap)
      if oldHdr.deleted { // 已删除
         consecutiveEvacuate = 0
         atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
         atomic.AddInt64(&seg.totalCount, -1)
         seg.vacuumLen += oldEntryLen
         continue
      }
      expired := oldHdr.expireAt != 0 && oldHdr.expireAt < now
      leastRecentUsed := int64(oldHdr.accessTime)*atomic.LoadInt64(&seg.totalCount) <= atomic.LoadInt64(&seg.totalTime)
      if expired || leastRecentUsed || consecutiveEvacuate > 5 {
      // 可以回收
         seg.delEntryPtrByOffset(oldHdr.slotId, oldHdr.hash16, oldOff)
         if oldHdr.slotId == slotId {
            slotModified = true
         }
         consecutiveEvacuate = 0
         atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
         atomic.AddInt64(&seg.totalCount, -1)
         seg.vacuumLen += oldEntryLen
         if expired {
            atomic.AddInt64(&seg.totalExpired, 1)
         } else {
            atomic.AddInt64(&seg.totalEvacuate, 1)
         }
      } else {
         // evacuate an old entry that has been accessed recently for better cache hit rate.
         newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen))
         seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff, newOff)
         consecutiveEvacuate++
         atomic.AddInt64(&seg.totalEvacuate, 1)
      }
   }
}

freecache的Get流程相对来说简单点,通过hash找到对应segment,通过slotId找到对应索引slot,然后通过二分+遍历寻找数据,如果找不到直接返回ErrNotFound,否则更新一些time指标。Get流程还会更新缓存命中率相关指标。

func (cache *Cache) Get(key []byte) (value []byte, err error) {
   hashVal := hashFunc(key)
   segID := hashVal & segmentAndOpVal
   cache.locks[segID].Lock()
   value, _, err = cache.segments[segID].get(key, nil, hashVal, false)
   cache.locks[segID].Unlock()
   return
}
func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) {
   hdr, ptr, err := seg.locate(key, hashVal, peek) // hash+定位查找
   if err != nil {
      return
   }
   expireAt = hdr.expireAt
   if cap(buf) >= int(hdr.valLen) {
      value = buf[:hdr.valLen]
   } else {
      value = make([]byte, hdr.valLen)
   }

   seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
}

定位到数据之后,读取ringbuf即可,注意一般来说读取到的value是新创建的内存空间,因此涉及到[]byte

캐시 시나리오의 데이터 양이 100만 레벨보다 큰 경우 데이터 유형이 gc에 미치는 영향을 특별히 고려해야 합니다(문자열 유형의 맨 아래 레이어는 포인터 + Len + Cap, 따라서 포인터 유형이기도 합니다.) 캐시 키와 값이 모두 포인터 유형이 아닌 경우 걱정할 필요가 없습니다. [관련 추천:

Go 동영상 튜토리얼

]

그러나 실제 응용 시나리오에서는 키와 값이 포인터 유형의 데이터인(포함) 경우가 매우 일반적이므로 캐시 프레임워크를 사용할 때 특별한 주의가 필요합니다. GC에 미치는 영향, 관점에서 보면 캐시 프레임워크는 크게 두 가지 범주로 나뉩니다.

  • GC 오버헤드 없음: freecache 또는 bigcache와 같은 하위 계층은 ringbuf를 기반으로 합니다. 포인터 수 감소
  • GC 오버헤드 포함: Map을 기반으로 직접 구현된 캐싱 프레임워크.
맵의 경우 모든 키/값 쌍이 gc 중에 스캔됩니다. 모두 기본 유형인 경우 gc는 이를 다시 스캔하지 않습니다.

다음은 freecache를 예로 들어 구현 원리를 분석한 것입니다. 코드 예는 다음과 같습니다. rrreee

1 초기화🎜freecache.NewCache는 로컬 캐시, 크기는 저장소를 나타냅니다. 공간 크기와 관련하여 freecache는 256개의 세그먼트를 초기화합니다. 각 세그먼트는 독립적인 저장 단위입니다. freecache의 잠금 크기도 세그먼트를 기반으로 합니다. 각 세그먼트에는 초기 크기가 256인 ringbuf가 있습니다. . Freecache가 GC가 0이라고 주장하는 이유는 포인터가 512개로 고정되어 있고 각 세그먼트에 rb와 SlotData라는 2개가 있기 때문입니다(슬라이스는 포인터 유형입니다). 🎜rrreee

2 읽기 및 쓰기 프로세스🎜🎜프리캐시 키와 값은 모두 []byte 배열이므로 자체적으로 직렬화 및 역직렬화해야 합니다. 복잡한 개체를 캐시하는 경우 직렬화 및 역직렬화의 영향을 무시할 수 없습니다. 먼저 Set 프로세스를 살펴보세요. 🎜rrreee🎜Set 프로세스는 먼저 키를 해시하며 hashVal 유형은 uint64입니다. 하위 8비트 segID는 세그먼트 배열에 해당하고, 하위 8~15비트는 SlotId가 SlotData 첨자에 해당함을 나타내고, 상위 16비트는 SlotData 첨자에 해당하는 []entryPtr 데이터를 나타냅니다. . 여기에서 검색 작업이 필요합니다. []entryPtr 배열의 크기는 SlotCap(초기 1)입니다. 확장하면 SlotCap이 두 배가 됩니다. 🎜
🎜 각 세그먼트는 잠금(sync.Mutex)에 해당하므로 잠금이 하나만 있는 sync.Map과 달리 많은 양의 동시성을 지원할 수 있습니다. 🎜
rrreee🎜seg.evacuate는 ringbuf에 키/값을 저장할 공간이 충분한지 평가합니다. 공간이 충분하지 않으면 여유 공간의 끝(즉, 시작 위치)부터 스캔을 시작합니다. 제거할 데이터) (oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()), 해당 데이터가 논리적으로 삭제되거나 만료된 경우 메모리 블록을 직접 재활용할 수 있습니다. 그렇지 않은 경우 재활용 조건이 충족되면 항목이 링의 헤드에서 링의 끝까지 교체된 다음 이 루핑의 경우 항목의 인덱스가 업데이트됩니다. 시간이 여전히 작동하지 않으면 현재 oldHdrBuf를 재활용하여 메모리 요구 사항을 충족해야 합니다. 🎜🎜seg.evacuate 실행 후 필요한 공간이 충분해야 인덱스와 데이터가 기록됩니다. insertEntryPtr은 []entryPtr의 요소 수가 seg보다 클 때 쓰기 작업입니다. .slotCap(초기 1)인 경우 확장 작업이 필요한 경우 해당 메서드는 seg.expand를 참조하세요. 이에 대해서는 여기서 다시 설명하지 않습니다. 🎜🎜ringbuf에 쓰려면 rb.Write를 실행하면 됩니다. 🎜rrreee🎜Freecache의 Get 프로세스는 해시를 통해 해당 세그먼트를 찾고, SlotId를 통해 해당 인덱스 슬롯을 찾은 다음, 바이너리 + 순회를 통해 데이터를 찾고, 발견되지 않으면 ErrNotFound를 직접 반환하고, 그렇지 않으면 일부 시간 표시기를 업데이트합니다. . Get 프로세스는 캐시 적중률 관련 표시기도 업데이트합니다. 🎜rrreee🎜데이터를 찾은 후 ringbuf를 읽으세요. 일반적으로 읽은 값은 새로 생성된 메모리 공간이므로 []byte 데이터의 복사 작업이 포함됩니다. 🎜🎜3 요약🎜🎜 여러 일반적인 캐시 프레임워크의 스트레스 테스트 성능을 보면 Set 성능 차이는 크지만 정량적 수준에서는 Get 성능 차이가 크지 않으므로 대부분의 시나리오에서는 그럴 필요가 없습니다. Set./Get 성능에 너무 많은 관심을 기울이면 기능이 비즈니스 요구 사항과 gc 영향을 충족하는지 여부에 초점을 맞춰야 합니다. 성능 스트레스 테스트 비교는 https://golang2.eddycjy.com/posts/ch5/04-를 참조하세요. Performance/🎜🎜Cache에는 메모리의 모든 데이터를 캐시하는 특별한 시나리오가 있습니다. 업데이트 시 크기가 제대로 설정되지 않은 경우 Freecache가 사용됩니다. , 일부 데이터가 삭제될 수 있으며 이는 기대에 부합하지 않습니다. 이 문제를 피하기 위해 프리캐시를 사용하려면 크기를 "충분히 크게" 설정해야 하지만 메모리 공간 사용량에도 주의를 기울여야 합니다. 🎜🎜더 많은 프로그래밍 관련 지식을 보려면 🎜프로그래밍 교육🎜을 방문하세요! ! 🎜

위 내용은 Golang의 캐시 라이브러리 freecache에 대해 알아보는 기사 1개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 juejin.cn에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제