From c8408f70ea4d9cbe38f7a7008f69ea79a6510d26 Mon Sep 17 00:00:00 2001 From: jb0n Date: Fri, 9 Feb 2018 18:30:40 -0800 Subject: [PATCH 1/2] fixes for various race conditions --- cache.go | 19 ++++++++++------ cache_test.go | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++ segment.go | 48 +++++++++++++++++++-------------------- 3 files changed, 99 insertions(+), 31 deletions(-) diff --git a/cache.go b/cache.go index 0db6197..877e924 100644 --- a/cache.go +++ b/cache.go @@ -8,8 +8,12 @@ import ( "github.com/cespare/xxhash" ) +const ( + minBufSize = 512 * 1024 +) + type Cache struct { - locks [256]sync.Mutex + locks [256]sync.RWMutex segments [256]segment } @@ -22,8 +26,8 @@ func hashFunc(data []byte) uint64 { // `debug.SetGCPercent()`, set it to a much smaller value // to limit the memory consumption and GC pause time. func NewCache(size int) (cache *Cache) { - if size < 512*1024 { - size = 512 * 1024 + if size < minBufSize { + size = minBufSize } cache = new(Cache) for i := 0; i < 256; i++ { @@ -48,9 +52,9 @@ func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) { func (cache *Cache) Get(key []byte) (value []byte, err error) { hashVal := hashFunc(key) segId := hashVal & 255 - cache.locks[segId].Lock() + cache.locks[segId].RLock() value, _, err = cache.segments[segId].get(key, hashVal) - cache.locks[segId].Unlock() + cache.locks[segId].RUnlock() return } @@ -58,9 +62,9 @@ func (cache *Cache) Get(key []byte) (value []byte, err error) { func (cache *Cache) GetWithExpiration(key []byte) (value []byte, expireAt uint32, err error) { hashVal := hashFunc(key) segId := hashVal & 255 - cache.locks[segId].Lock() + cache.locks[segId].RLock() value, expireAt, err = cache.segments[segId].get(key, hashVal) - cache.locks[segId].Unlock() + cache.locks[segId].RUnlock() return } @@ -176,6 +180,7 @@ func (cache *Cache) OverwriteCount() (overwriteCount int64) { return } +//Clear is not threadsafe. func (cache *Cache) Clear() { for i := 0; i < 256; i++ { cache.locks[i].Lock() diff --git a/cache_test.go b/cache_test.go index dce5b95..1dd4e28 100644 --- a/cache_test.go +++ b/cache_test.go @@ -5,7 +5,9 @@ import ( "crypto/rand" "encoding/binary" "fmt" + mrand "math/rand" "strings" + "sync" "testing" "time" ) @@ -380,6 +382,67 @@ func TestIterator(t *testing.T) { } } +func TestRace(t *testing.T) { + cache := NewCache(minBufSize) + inUse := 8 + wg := sync.WaitGroup{} + var iters int64 = 1000 + + wg.Add(5) + addFunc := func() { + var i int64 + for i = 0; i < iters; i++ { + err := cache.SetInt(int64(mrand.Intn(inUse)), []byte("abc"), 1) + if err != nil { + t.Errorf("err: %s", err) + } + } + wg.Done() + } + getFunc := func() { + var i int64 + for i = 0; i < iters; i++ { + _, _ = cache.GetInt(int64(mrand.Intn(inUse))) //it will likely error w/ delFunc running too + } + wg.Done() + } + delFunc := func() { + var i int64 + for i = 0; i < iters; i++ { + cache.DelInt(int64(mrand.Intn(inUse))) + } + wg.Done() + } + evacFunc := func() { + var i int64 + for i = 0; i < iters; i++ { + _ = cache.EvacuateCount() + _ = cache.ExpiredCount() + _ = cache.EntryCount() + _ = cache.AverageAccessTime() + _ = cache.HitCount() + _ = cache.LookupCount() + _ = cache.HitRate() + _ = cache.OverwriteCount() + } + wg.Done() + } + resetFunc := func() { + var i int64 + for i = 0; i < iters; i++ { + cache.ResetStatistics() + } + wg.Done() + } + + go addFunc() + go getFunc() + go delFunc() + go evacFunc() + go resetFunc() + wg.Wait() +} + func BenchmarkCacheSet(b *testing.B) { cache := NewCache(256 * 1024 * 1024) var key [8]byte diff --git a/segment.go b/segment.go index f86397a..fdedb4e 100644 --- a/segment.go +++ b/segment.go @@ -2,6 +2,7 @@ package freecache import ( "errors" + "sync/atomic" "time" "unsafe" ) @@ -98,10 +99,10 @@ func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (e hdr.valLen = uint32(len(value)) if hdr.valCap >= hdr.valLen { //in place overwrite - seg.totalTime += int64(hdr.accessTime) - int64(originAccessTime) + atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime)) seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset) seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) - seg.overwrites++ + atomic.AddInt64(&seg.overwrites, 1) return } // avoid unnecessary memory copy. @@ -142,8 +143,8 @@ func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (e seg.rb.Write(key) seg.rb.Write(value) seg.rb.Skip(int64(hdr.valCap - hdr.valLen)) - seg.totalTime += int64(now) - seg.totalCount++ + atomic.AddInt64(&seg.totalTime, int64(now)) + atomic.AddInt64(&seg.totalCount, 1) seg.vacuumLen -= entryLen return } @@ -158,31 +159,31 @@ func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModi oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap) if oldHdr.deleted { consecutiveEvacuate = 0 - seg.totalTime -= int64(oldHdr.accessTime) - seg.totalCount-- + atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime)) + atomic.AddInt64(&seg.totalCount, -1) seg.vacuumLen += oldEntryLen continue } expired := oldHdr.expireAt != 0 && oldHdr.expireAt < now - leastRecentUsed := int64(oldHdr.accessTime)*seg.totalCount <= seg.totalTime + leastRecentUsed := int64(oldHdr.accessTime)*atomic.LoadInt64(&seg.totalCount) <= atomic.LoadInt64(&seg.totalTime) if expired || leastRecentUsed || consecutiveEvacuate > 5 { seg.delEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff) if oldHdr.slotId == slotId { slotModified = true } consecutiveEvacuate = 0 - seg.totalTime -= int64(oldHdr.accessTime) - seg.totalCount-- + atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime)) + atomic.AddInt64(&seg.totalCount, -1) seg.vacuumLen += oldEntryLen if expired { - seg.totalExpired++ + atomic.AddInt64(&seg.totalExpired, 1) } } else { // evacuate an old entry that has been accessed recently for better cache hit rate. newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen)) seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff, newOff) consecutiveEvacuate++ - seg.totalEvacuate++ + atomic.AddInt64(&seg.totalEvacuate, 1) } } return @@ -196,7 +197,7 @@ func (seg *segment) get(key []byte, hashVal uint64) (value []byte, expireAt uint idx, match := seg.lookup(slot, hash16, key) if !match { err = ErrNotFound - seg.missCount += 1 + atomic.AddInt64(&seg.missCount, 1) return } ptr := &slot[idx] @@ -209,19 +210,18 @@ func (seg *segment) get(key []byte, hashVal uint64) (value []byte, expireAt uint if hdr.expireAt != 0 && hdr.expireAt <= now { seg.delEntryPtr(slotId, hash16, ptr.offset) - seg.totalExpired++ + atomic.AddInt64(&seg.totalExpired, 1) err = ErrNotFound - seg.missCount += 1 + atomic.AddInt64(&seg.missCount, 1) return } - - seg.totalTime += int64(now - hdr.accessTime) + atomic.AddInt64(&seg.totalTime, int64(now-hdr.accessTime)) hdr.accessTime = now seg.rb.WriteAt(hdrBuf[:], ptr.offset) value = make([]byte, hdr.valLen) seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) - seg.hitCount += 1 + atomic.AddInt64(&seg.hitCount, 1) return } @@ -295,7 +295,7 @@ func (seg *segment) insertEntryPtr(slotId uint8, hash16 uint16, offset int64, id slotOff *= 2 } seg.slotLens[slotId]++ - seg.entryCount++ + atomic.AddInt64(&seg.entryCount, 1) slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap] copy(slot[idx+1:], slot[idx:]) slot[idx].offset = offset @@ -317,7 +317,7 @@ func (seg *segment) delEntryPtr(slotId uint8, hash16 uint16, offset int64) { seg.rb.WriteAt(entryHdrBuf[:], offset) copy(slot[idx:], slot[idx+1:]) seg.slotLens[slotId]-- - seg.entryCount-- + atomic.AddInt64(&seg.entryCount, -1) } func entryPtrIdx(slot []entryPtr, hash16 uint16) (idx int) { @@ -367,9 +367,9 @@ func (seg *segment) lookupByOff(slot []entryPtr, hash16 uint16, offset int64) (i } func (seg *segment) resetStatistics() { - seg.totalEvacuate = 0 - seg.totalExpired = 0 - seg.overwrites = 0 - seg.hitCount = 0 - seg.missCount = 0 + atomic.StoreInt64(&seg.totalEvacuate, 0) + atomic.StoreInt64(&seg.totalExpired, 0) + atomic.StoreInt64(&seg.overwrites, 0) + atomic.StoreInt64(&seg.hitCount, 0) + atomic.StoreInt64(&seg.missCount, 0) } From 80c67dcc1597608a5a5eed31bdab64d2fe2afa4c Mon Sep 17 00:00:00 2001 From: jb0n Date: Mon, 12 Feb 2018 13:02:33 -0800 Subject: [PATCH 2/2] make Clear() threadsafe. Removed RWMutex. --- cache.go | 14 ++++++-------- cache_test.go | 10 +++++++++- segment.go | 20 ++++++++++++++++++++ 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/cache.go b/cache.go index 877e924..b4ba137 100644 --- a/cache.go +++ b/cache.go @@ -13,7 +13,7 @@ const ( ) type Cache struct { - locks [256]sync.RWMutex + locks [256]sync.Mutex segments [256]segment } @@ -52,9 +52,9 @@ func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) { func (cache *Cache) Get(key []byte) (value []byte, err error) { hashVal := hashFunc(key) segId := hashVal & 255 - cache.locks[segId].RLock() + cache.locks[segId].Lock() value, _, err = cache.segments[segId].get(key, hashVal) - cache.locks[segId].RUnlock() + cache.locks[segId].Unlock() return } @@ -62,9 +62,9 @@ func (cache *Cache) Get(key []byte) (value []byte, err error) { func (cache *Cache) GetWithExpiration(key []byte) (value []byte, expireAt uint32, err error) { hashVal := hashFunc(key) segId := hashVal & 255 - cache.locks[segId].RLock() + cache.locks[segId].Lock() value, expireAt, err = cache.segments[segId].get(key, hashVal) - cache.locks[segId].RUnlock() + cache.locks[segId].Unlock() return } @@ -180,12 +180,10 @@ func (cache *Cache) OverwriteCount() (overwriteCount int64) { return } -//Clear is not threadsafe. func (cache *Cache) Clear() { for i := 0; i < 256; i++ { cache.locks[i].Lock() - newSeg := newSegment(len(cache.segments[i].rb.data), i) - cache.segments[i] = newSeg + cache.segments[i].clear() cache.locks[i].Unlock() } } diff --git a/cache_test.go b/cache_test.go index 1dd4e28..e8c669d 100644 --- a/cache_test.go +++ b/cache_test.go @@ -388,7 +388,7 @@ func TestRace(t *testing.T) { wg := sync.WaitGroup{} var iters int64 = 1000 - wg.Add(5) + wg.Add(6) addFunc := func() { var i int64 for i = 0; i < iters; i++ { @@ -434,12 +434,20 @@ func TestRace(t *testing.T) { } wg.Done() } + clearFunc := func() { + var i int64 + for i = 0; i < iters; i++ { + cache.Clear() + } + wg.Done() + } go addFunc() go getFunc() go delFunc() go evacFunc() go resetFunc() + go clearFunc() wg.Wait() } diff --git a/segment.go b/segment.go index fdedb4e..9307a27 100644 --- a/segment.go +++ b/segment.go @@ -373,3 +373,23 @@ func (seg *segment) resetStatistics() { atomic.StoreInt64(&seg.hitCount, 0) atomic.StoreInt64(&seg.missCount, 0) } + +func (seg *segment) clear() { + bufSize := len(seg.rb.data) + seg.rb = NewRingBuf(bufSize, 0) + seg.vacuumLen = int64(bufSize) + seg.slotCap = 1 + seg.slotsData = make([]entryPtr, 256*seg.slotCap) + for i := 0; i < len(seg.slotLens); i++ { + seg.slotLens[i] = 0 + } + + atomic.StoreInt64(&seg.hitCount, 0) + atomic.StoreInt64(&seg.missCount, 0) + atomic.StoreInt64(&seg.entryCount, 0) + atomic.StoreInt64(&seg.totalCount, 0) + atomic.StoreInt64(&seg.totalTime, 0) + atomic.StoreInt64(&seg.totalEvacuate, 0) + atomic.StoreInt64(&seg.totalExpired, 0) + atomic.StoreInt64(&seg.overwrites, 0) +}