Skip to content

Commit

Permalink
Merge pull request #536 from OdyseeTeam/improve-cache-getter-retry
Browse files Browse the repository at this point in the history
Add extra context and a retry delay for cache retrieval
  • Loading branch information
anbsky authored Jan 22, 2025
2 parents 29f6092 + a66a68d commit 891f37c
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 20 deletions.
23 changes: 15 additions & 8 deletions app/query/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ const (
)

type CacheRequest struct {
Method string
Params any
Method string
Params any
metaKey string
}

type CachedResponse struct {
Expand Down Expand Up @@ -75,12 +76,18 @@ func NewQueryCacheWithInvalidator(baseCache cache.CacheInterface[any]) (*QueryCa
return qc, nil
}

func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*CachedResponse, error) {
log := logger.Log()
cacheReq := CacheRequest{
Method: query.Method(),
Params: query.Params(),
func NewCacheRequest(method string, params any, metaKey string) CacheRequest {
return CacheRequest{
Method: method,
Params: params,
metaKey: metaKey,
}
}

func (c *QueryCache) Retrieve(query *Query, metaKey string, getter func() (any, error)) (*CachedResponse, error) {
log := logger.Log()

cacheReq := NewCacheRequest(query.Method(), query.Params(), metaKey)

ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
Expand Down Expand Up @@ -207,7 +214,7 @@ func (r CacheRequest) GetCacheKey() string {
params = string(p)
}
}
fmt.Fprintf(digester, "%s:%s:%s", "request", r.Method, params)
fmt.Fprintf(digester, "[%s]%s:%s:%s", r.metaKey, "request", r.Method, params)
hash := digester.Sum(nil)
return fmt.Sprintf("%x", hash)
}
Expand Down
24 changes: 16 additions & 8 deletions app/query/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,34 @@ func TestGetCacheKey(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
seen := map[string]bool{}
params := []map[string]any{{}, {"uri": "what"}, {"uri": "odysee"}, nil}
genCacheKey := func(params map[string]any) string {
params := []map[string]any{
{},
{"uri": "what"},
{"uri": "odysee"},
nil,
}
genCacheKey := func(params map[string]any, metaKey string) string {
req := jsonrpc.NewRequest(MethodResolve, params)
query, err := NewQuery(req, "")
require.NoError(err)
cacheReq := CacheRequest{
Method: query.Method(),
Params: query.Params(),
}
cacheReq := NewCacheRequest(query.Method(), query.Params(), metaKey)
return cacheReq.GetCacheKey()
}
for _, p := range params {
t.Run(fmt.Sprintf("%+v", p), func(t *testing.T) {
cacheKey := genCacheKey(p)
cacheKey := genCacheKey(p, "")
assert.Len(cacheKey, 32)
assert.NotContains(seen, cacheKey)
seen[cacheKey] = true
})
t.Run(fmt.Sprintf("%+v", p), func(t *testing.T) {
cacheKey := genCacheKey(p, "user@endpoint")
assert.Len(cacheKey, 32)
assert.NotContains(seen, cacheKey)
seen[cacheKey] = true
})
}
assert.Contains(seen, genCacheKey(params[1]))
assert.Contains(seen, genCacheKey(params[1], "user@endpoint"))
}

func TestCachedResponseMarshal(t *testing.T) {
Expand Down
10 changes: 6 additions & 4 deletions app/query/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,11 +646,12 @@ func preflightCacheHook(caller *Caller, ctx context.Context) (*jsonrpc.RPCRespon
}
query := QueryFromContext(ctx)

retrieverRetries := 3
retriever := func() (any, error) {
getterRetries := config.GetCacheGetterRetries()
getter := func() (any, error) {
var resp *jsonrpc.RPCResponse
var err error
for attempt := range retrieverRetries {
for attempt := range getterRetries {
time.Sleep(time.Duration(attempt) * time.Second)
start := time.Now()
resp, err = caller.SendQuery(ctx, query)
duration := time.Since(start).Seconds()
Expand All @@ -674,7 +675,8 @@ func preflightCacheHook(caller *Caller, ctx context.Context) (*jsonrpc.RPCRespon
return resp, err
}

cachedResp, err := caller.Cache.Retrieve(query, retriever)
metaKey := fmt.Sprintf("%d@%s", caller.userID, caller.Endpoint())
cachedResp, err := caller.Cache.Retrieve(query, metaKey, getter)
if err != nil {
return nil, rpcerrors.NewSDKError(err)
}
Expand Down
5 changes: 5 additions & 0 deletions apps/lbrytv/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ func GetTokenCacheTimeout() time.Duration {
return Config.Viper.GetDuration("TokenCacheTimeout") * time.Second
}

func GetCacheGetterRetries() int {
return Config.Viper.GetInt("CacheGetterRetries")
}

func GetCORSDomains() []string {
return Config.Viper.GetStringSlice("CORSDomains")
}
Expand Down Expand Up @@ -254,4 +258,5 @@ func init() {
c.Viper.SetDefault("Address", ":8080")
c.Viper.SetDefault("Host", "http://localhost:8080")
c.Viper.SetDefault("Logging", map[string]string{"level": "debug", "format": "console"})
c.Viper.SetDefault("CacheGetterRetries", 3)
}

0 comments on commit 891f37c

Please sign in to comment.