Skip to content

Commit

Permalink
fix: fix primary cache not used and finish single flight
Browse files Browse the repository at this point in the history
  • Loading branch information
asjdf committed Feb 24, 2023
1 parent 8158c20 commit efc966f
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 28 deletions.
98 changes: 70 additions & 28 deletions cache/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/asjdf/gorm-cache/config"
"github.com/asjdf/gorm-cache/storage"
"github.com/asjdf/gorm-cache/util"
"github.com/hashicorp/go-multierror"
"gorm.io/gorm"
"gorm.io/gorm/callbacks"
"reflect"
Expand Down Expand Up @@ -56,8 +57,17 @@ func (h *queryHandler) BeforeQuery() func(db *gorm.DB) {
db.InstanceSet("gorm:cache:vars", db.Statement.Vars)

if util.ShouldCache(tableName, cache.Config.Tables) {
hitted := false
defer func() {
if hitted {
cache.IncrHitCount()
} else {
cache.IncrMissCount()
}
}()

// singleFlight Check
singleFlightKey := fmt.Sprintf("%s:%s", tableName, sql)
singleFlightKey := fmt.Sprintf("%s:%s", tableName, sql) // todo: key with vars
h.singleFlight.mu.Lock()
if h.singleFlight.m == nil {
h.singleFlight.m = make(map[string]*call)
Expand All @@ -78,11 +88,11 @@ func (h *queryHandler) BeforeQuery() func(db *gorm.DB) {
_ = db.AddError(err)
return
}
hitted = true
db.RowsAffected = c.rowsAffected
if c.err == nil { // 为保证后续流程不走,必须设一个error
db.Error = util.SingleFlightHit
} else {
db.Error = c.err
db.Error = multierror.Append(util.SingleFlightHit) // 为保证后续流程不走,必须设一个error
if c.err != nil {
db.Error = multierror.Append(db.Error, c.err)
}
h.cache.Logger.CtxInfo(ctx, "[BeforeQuery] single flight hit for key %v", singleFlightKey)
return
Expand All @@ -93,8 +103,7 @@ func (h *queryHandler) BeforeQuery() func(db *gorm.DB) {
h.singleFlight.mu.Unlock()
db.InstanceSet("gorm:cache:query:single_flight_call", c)

// try primary cache first
if cache.Config.CacheLevel == config.CacheLevelAll || cache.Config.CacheLevel == config.CacheLevelOnlyPrimary {
tryPrimaryCache := func() (hitted bool) {
primaryKeys := getPrimaryKeysFromWhereClause(db)
cache.Logger.CtxInfo(ctx, "[BeforeQuery] parse primary keys = %v", primaryKeys)

Expand Down Expand Up @@ -141,17 +150,17 @@ func (h *queryHandler) BeforeQuery() func(db *gorm.DB) {
return
}
db.Error = util.PrimaryCacheHit
hitted = true
return
}

if cache.Config.CacheLevel == config.CacheLevelAll || cache.Config.CacheLevel == config.CacheLevelOnlySearch {
trySearchCache := func() (hitted bool) {
// search cache hit
cacheValue, err := cache.GetSearchCache(ctx, tableName, sql, db.Statement.Vars...)
if err != nil {
if !errors.Is(err, storage.ErrCacheNotFound) {
cache.Logger.CtxError(ctx, "[BeforeQuery] get cache value for sql %s error: %v", sql, err)
}
cache.IncrMissCount()
db.Error = nil
return
}
Expand All @@ -174,8 +183,21 @@ func (h *queryHandler) BeforeQuery() func(db *gorm.DB) {
return
}
db.Error = util.SearchCacheHit
hitted = true
return
}

if cache.Config.CacheLevel == config.CacheLevelAll || cache.Config.CacheLevel == config.CacheLevelOnlyPrimary {
if tryPrimaryCache() {
hitted = true
return
}
}
if cache.Config.CacheLevel == config.CacheLevelAll || cache.Config.CacheLevel == config.CacheLevelOnlySearch {
if !hitted && trySearchCache() {
hitted = true
}
}
}
}
}
Expand All @@ -196,7 +218,11 @@ func (h *queryHandler) AfterQuery() func(db *gorm.DB) {
varObj, _ := db.InstanceGet("gorm:cache:vars")
vars := varObj.([]interface{})

if db.Error == nil && util.ShouldCache(tableName, cache.Config.Tables) {
if !util.ShouldCache(tableName, cache.Config.Tables) {
return
}

if db.Error == nil {
// error is nil -> cache not hit, we cache newly retrieved data
primaryKeys, objects := getObjectsAfterLoad(db)

Expand Down Expand Up @@ -266,29 +292,45 @@ func (h *queryHandler) AfterQuery() func(db *gorm.DB) {
return
}

if !cache.Config.DisableCachePenetrationProtect {
if errors.Is(db.Error, gorm.ErrRecordNotFound) { // 应对缓存穿透 未来可能考虑使用其他过滤器实现:如布隆过滤器
cache.Logger.CtxInfo(ctx, "[AfterQuery] set cache: %v", "recordNotFound")
err := cache.SetSearchCache(ctx, "recordNotFound", tableName, sql, vars...)
if err != nil {
cache.Logger.CtxError(ctx, "[AfterQuery] set search cache for sql: %s error: %v", sql, err)
return
}
cache.Logger.CtxInfo(ctx, "[AfterQuery] sql %s cached", sql)
// 应对缓存穿透 未来可能考虑使用其他过滤器实现:如布隆过滤器
if db.Error == gorm.ErrRecordNotFound && !cache.Config.DisableCachePenetrationProtect {
cache.Logger.CtxInfo(ctx, "[AfterQuery] set cache: %v", "recordNotFound")
err := cache.SetSearchCache(ctx, "recordNotFound", tableName, sql, vars...)
if err != nil {
cache.Logger.CtxError(ctx, "[AfterQuery] set search cache for sql: %s error: %v", sql, err)
return
}
}

switch db.Error {
case util.RecordNotFoundCacheHit:
db.Error = gorm.ErrRecordNotFound
cache.IncrHitCount()
case util.SearchCacheHit, util.PrimaryCacheHit, util.SingleFlightHit:
db.Error = nil
cache.IncrHitCount()
cache.Logger.CtxInfo(ctx, "[AfterQuery] sql %s cached", sql)
return
}
}()
// 之所以将上面的部分包在一个匿名函数中是为了方便
// 上面的cache完成后直接传播给其他等待中的goroutine
// 上面只处理非singleflight且无错误或记录不存在的情况
h.fillCallAfterQuery(db)

// 下面处理命中了缓存的情况
// 有以下几种err是专门用来传状态的:正常的cachehit 这种情况不存在error
// RecordNotFoundCacheHit 这种情况只会在notfound之后出现
// SingleFlightHit 这种情况下error中除了SingleFlightHit还可能会存在其他error来自gorm的error
// 且遇到任何一种hit我们都可以认为是命中了缓存 同时只可能命中至多两个hit(single+其他
if merr, ok := db.Error.(*multierror.Error); ok {
errs := merr.WrappedErrors()
if errors.Is(errs[0], util.SingleFlightHit) {
if len(errs) > 1 {
db.Error = errs[1]
} else {
db.Error = nil
}
}
}

switch db.Error {
case util.RecordNotFoundCacheHit:
db.Error = gorm.ErrRecordNotFound
case util.SearchCacheHit, util.PrimaryCacheHit:
db.Error = nil
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ require (
github.com/glebarez/go-sqlite v1.20.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
Expand All @@ -27,6 +29,8 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230126093431-47fa9a501578 // indirect
github.com/smartystreets/assertions v1.13.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/sys v0.4.0 // indirect
modernc.org/libc v1.22.2 // indirect
modernc.org/mathutil v1.5.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g=
github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
Expand Down Expand Up @@ -54,6 +58,10 @@ github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down

0 comments on commit efc966f

Please sign in to comment.