Skip to content

Commit

Permalink
feat: Skip writeback for chunks fetched by queriers older than a dura…
Browse files Browse the repository at this point in the history
…tion (#15605)
  • Loading branch information
mveitas authored Jan 6, 2025
1 parent 226e9f1 commit 3b8d993
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 38 deletions.
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,11 @@ The `chunk_store_config` block configures how chunks will be cached and how long
# The CLI flags prefix for this block configuration is: store.index-cache-write
[write_dedupe_cache_config: <cache_config>]
# Chunks fetched from queriers before this duration will not be written to the
# cache. A value of 0 will write all chunks to the cache
# CLI flag: -store.skip-query-writeback-older-than
[skip_query_writeback_cache_older_than: <duration> | default = 0s]
# Chunks will be handed off to the L2 cache after this duration. 0 to disable L2
# cache.
# CLI flag: -store.chunks-cache-l2.handoff
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func testChunkFetcher(t *testing.T, c cache.Cache, chunks []chunk.Chunk) {
},
}

fetcher, err := fetcher.New(c, nil, false, s, nil, 0)
fetcher, err := fetcher.New(c, nil, false, s, nil, 0, 0)
require.NoError(t, err)
defer fetcher.Stop()

Expand Down
27 changes: 18 additions & 9 deletions pkg/storage/chunk/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type Fetcher struct {
cachel2 cache.Cache
cacheStubs bool

l2CacheHandoff time.Duration
l2CacheHandoff time.Duration
skipQueryWritebackCacheOlderThan time.Duration

wait sync.WaitGroup
decodeRequests chan decodeRequest
Expand All @@ -69,15 +70,16 @@ type decodeResponse struct {
}

// New makes a new ChunkFetcher.
func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration) (*Fetcher, error) {
func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration, skipQueryWritebackOlderThan time.Duration) (*Fetcher, error) {
c := &Fetcher{
schema: schema,
storage: storage,
cache: cache,
cachel2: cachel2,
l2CacheHandoff: l2CacheHandoff,
cacheStubs: cacheStubs,
decodeRequests: make(chan decodeRequest),
schema: schema,
storage: storage,
cache: cache,
cachel2: cachel2,
l2CacheHandoff: l2CacheHandoff,
cacheStubs: cacheStubs,
skipQueryWritebackCacheOlderThan: skipQueryWritebackOlderThan,
decodeRequests: make(chan decodeRequest),
}

c.wait.Add(chunkDecodeParallelism)
Expand Down Expand Up @@ -138,6 +140,9 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chun
l2OnlyChunks := make([]chunk.Chunk, 0, len(chunks))

for _, m := range chunks {
if c.skipQueryWritebackCacheOlderThan > 0 && m.From.Time().Before(time.Now().UTC().Add(-c.skipQueryWritebackCacheOlderThan)) {
continue
}
// Similar to below, this is an optimization to not bother looking in the l1 cache if there isn't a reasonable
// expectation to find it there.
if c.l2CacheHandoff > 0 && m.From.Time().Before(time.Now().UTC().Add(-extendedHandoff)) {
Expand Down Expand Up @@ -230,6 +235,10 @@ func (c *Fetcher) WriteBackCache(ctx context.Context, chunks []chunk.Chunk) erro
keysL2 := make([]string, 0, len(chunks))
bufsL2 := make([][]byte, 0, len(chunks))
for i := range chunks {
if c.skipQueryWritebackCacheOlderThan > 0 && chunks[i].From.Time().Before(time.Now().UTC().Add(-c.skipQueryWritebackCacheOlderThan)) {
continue
}

var encoded []byte
var err error
if !c.cacheStubs {
Expand Down
59 changes: 37 additions & 22 deletions pkg/storage/chunk/fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ import (
func Test(t *testing.T) {
now := time.Now()
tests := []struct {
name string
handoff time.Duration
storeStart []chunk.Chunk
l1Start []chunk.Chunk
l2Start []chunk.Chunk
fetch []chunk.Chunk
l1KeysRequested int
l1End []chunk.Chunk
l2KeysRequested int
l2End []chunk.Chunk
name string
handoff time.Duration
skipQueryWriteback time.Duration
storeStart []chunk.Chunk
l1Start []chunk.Chunk
l2Start []chunk.Chunk
fetch []chunk.Chunk
l1KeysRequested int
l1End []chunk.Chunk
l2KeysRequested int
l2End []chunk.Chunk
}{
{
name: "all found in L1 cache",
Expand Down Expand Up @@ -82,6 +83,19 @@ func Test(t *testing.T) {
l2KeysRequested: 3,
l2End: makeChunks(now, c{7 * time.Hour, 8 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}),
},
{
name: "skipQueryWriteback",
handoff: 24 * time.Hour,
skipQueryWriteback: 3 * 24 * time.Hour,
storeStart: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}),
l1Start: []chunk.Chunk{},
l2Start: []chunk.Chunk{},
fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}),
l1KeysRequested: 3,
l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}),
l2KeysRequested: 0,
l2End: []chunk.Chunk{},
},
{
name: "writeback l1",
handoff: 24 * time.Hour,
Expand Down Expand Up @@ -194,7 +208,7 @@ func Test(t *testing.T) {
assert.NoError(t, chunkClient.PutChunks(context.Background(), test.storeStart))

// Build fetcher
f, err := New(c1, c2, false, sc, chunkClient, test.handoff)
f, err := New(c1, c2, false, sc, chunkClient, test.handoff, test.skipQueryWriteback)
assert.NoError(t, err)

// Run the test
Expand Down Expand Up @@ -235,16 +249,17 @@ func BenchmarkFetch(b *testing.B) {
fetch = append(fetch, storeStart...)

test := struct {
name string
handoff time.Duration
storeStart []chunk.Chunk
l1Start []chunk.Chunk
l2Start []chunk.Chunk
fetch []chunk.Chunk
l1KeysRequested int
l1End []chunk.Chunk
l2KeysRequested int
l2End []chunk.Chunk
name string
handoff time.Duration
skipQueryWriteback time.Duration
storeStart []chunk.Chunk
l1Start []chunk.Chunk
l2Start []chunk.Chunk
fetch []chunk.Chunk
l1KeysRequested int
l1End []chunk.Chunk
l2KeysRequested int
l2End []chunk.Chunk
}{
name: "some in L1, some in L2",
handoff: time.Duration(numchunks/3+100) * time.Hour,
Expand Down Expand Up @@ -291,7 +306,7 @@ func BenchmarkFetch(b *testing.B) {
_ = chunkClient.PutChunks(context.Background(), test.storeStart)

// Build fetcher
f, _ := New(c1, c2, false, sc, chunkClient, test.handoff)
f, _ := New(c1, c2, false, sc, chunkClient, test.handoff, test.skipQueryWriteback)

for i := 0; i < b.N; i++ {
_, err := f.FetchChunks(context.Background(), test.fetch)
Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/config/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
)

type ChunkStoreConfig struct {
ChunkCacheConfig cache.Config `yaml:"chunk_cache_config"`
ChunkCacheConfigL2 cache.Config `yaml:"chunk_cache_config_l2"`
WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config" doc:"description=Write dedupe cache is deprecated along with legacy index types (aws, aws-dynamo, bigtable, bigtable-hashed, cassandra, gcp, gcp-columnkey, grpc-store).\nConsider using TSDB index which does not require a write dedupe cache."`
ChunkCacheConfig cache.Config `yaml:"chunk_cache_config"`
ChunkCacheConfigL2 cache.Config `yaml:"chunk_cache_config_l2"`
WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config" doc:"description=Write dedupe cache is deprecated along with legacy index types (aws, aws-dynamo, bigtable, bigtable-hashed, cassandra, gcp, gcp-columnkey, grpc-store).\nConsider using TSDB index which does not require a write dedupe cache."`
SkipQueryWritebackOlderThan time.Duration `yaml:"skip_query_writeback_cache_older_than"`

L2ChunkCacheHandoff time.Duration `yaml:"l2_chunk_cache_handoff"`
CacheLookupsOlderThan model.Duration `yaml:"cache_lookups_older_than"`
Expand All @@ -38,6 +39,7 @@ func (cfg *ChunkStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.L2ChunkCacheHandoff, "store.chunks-cache-l2.handoff", 0, "Chunks will be handed off to the L2 cache after this duration. 0 to disable L2 cache.")
f.BoolVar(&cfg.chunkCacheStubs, "store.chunks-cache.cache-stubs", false, "If true, don't write the full chunk to cache, just a stub entry.")
cfg.WriteDedupeCacheConfig.RegisterFlagsWithPrefix("store.index-cache-write.", "", f)
f.DurationVar(&cfg.SkipQueryWritebackOlderThan, "store.skip-query-writeback-older-than", 0, "Chunks fetched from queriers before this duration will not be written to the cache. A value of 0 will write all chunks to the cache")

f.Var(&cfg.CacheLookupsOlderThan, "store.cache-lookups-older-than", "Cache index entries older than this period. 0 to disable.")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (s *LokiStore) init() error {
if err != nil {
return err
}
f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.L2ChunkCacheHandoff)
f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.L2ChunkCacheHandoff, s.storeCfg.SkipQueryWritebackOlderThan)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/series_store_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestChunkWriter_PutOne(t *testing.T) {
idx := &mockIndexWriter{}
client := &mockChunksClient{}

f, err := fetcher.New(cache, nil, false, schemaConfig, client, 0)
f, err := fetcher.New(cache, nil, false, schemaConfig, client, 0, 0)
require.NoError(t, err)

cw := NewChunkWriter(f, schemaConfig, idx, true)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (m *mockChunkStore) GetChunks(_ context.Context, _ string, _, _ model.Time,
panic(err)
}

f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 0)
f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 0, 0)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 3b8d993

Please sign in to comment.