Skip to content

Commit

Permalink
fix filtering ingested ssts
Browse files Browse the repository at this point in the history
Signed-off-by: hillium <[email protected]>
  • Loading branch information
YuJuncen committed Jan 10, 2025
1 parent 2e2dd07 commit 67ee0dd
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 1 deletion.
5 changes: 4 additions & 1 deletion br/pkg/restore/log_client/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (builder *WithMigrationsBuilder) Build(migs []*backuppb.Migration) WithMigr
compactionDirs: compactionDirs,
fullBackups: fullBackups,
restoredTS: builder.restoredTS,
startTS: builder.startTS,
}
return withMigrations
}
Expand Down Expand Up @@ -218,6 +219,7 @@ type WithMigrations struct {
compactionDirs []string
fullBackups []string
restoredTS uint64
startTS uint64
}

func (wm *WithMigrations) Metas(metaNameIter MetaNameIter) MetaMigrationsIter {
Expand Down Expand Up @@ -249,7 +251,8 @@ func (wm *WithMigrations) Compactions(ctx context.Context, s storage.ExternalSto

func (wm *WithMigrations) IngestedSSTss(ctx context.Context, s storage.ExternalStorage) iter.TryNextor[*backuppb.IngestedSSTs] {
filteredOut := iter.FilterOut(stream.LoadIngestedSSTss(ctx, s, wm.fullBackups), func(ebk stream.IngestedSSTss) bool {
return !ebk.GroupFinished() || ebk.GroupTS() > wm.restoredTS
gts := ebk.GroupTS()
return !ebk.GroupFinished() || gts < wm.startTS || gts > wm.restoredTS
})
return iter.FlatMap(filteredOut, func(ebk stream.IngestedSSTss) iter.TryNextor[*backuppb.IngestedSSTs] {
return iter.Map(iter.FromSlice(ebk), func(p stream.PathedIngestedSSTs) *backuppb.IngestedSSTs {
Expand Down
119 changes: 119 additions & 0 deletions br/pkg/restore/log_client/playground_but_must_ends_with_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package logclient

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"math"
"os"
"testing"

"github.com/cheggaaa/pb/v3"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/stretchr/testify/require"
)

var (
_t_ *testing.T
)

func must[T any](t T, err error) T {
require.NoError(_t_, err)
return t
}

func TestXXX(t *testing.T) {

req := require.New(t)
_t_ = t
defer func() { _t_ = nil }()
ctx := context.Background()

strgURL, ok := os.LookupEnv("storage")
req.True(ok)
outFile := "./tables.txt"
if of, ok := os.LookupEnv("out"); ok {
outFile = of
}
outFileFd := must(os.Create(outFile))

s := must(storage.NewFromURL(ctx, strgURL))
mb := new(WithMigrationsBuilder)
wm := mb.Build(nil)
fileMgr := must(CreateLogFileManager(ctx, LogFileManagerInit{
StartTS: 0,
RestoreTS: math.MaxInt64,
Storage: s,

MigrationsBuilder: mb,
Migrations: &wm,
MetadataDownloadBatchSize: 128,
}))
files := must(fileMgr.LoadDDLFilesAndCountDMLFiles(ctx))
hp := fileMgr.helper
p := pb.New(len(files))
p.Start()
defer p.Finish()

for _, file := range files {
content := must(hp.ReadFile(ctx, file.Path, file.RangeOffset, file.RangeLength, file.CompressionType, s, file.FileEncryptionInfo))
it := stream.NewEventIterator(content)
p.Increment()

for it.Valid() {
it.Next()

txnEntry := kv.Entry{Key: it.Key(), Value: it.Value()}

if !stream.IsMetaDBKey(txnEntry.Key) {
// only restore mDB and mDDLHistory
continue
}

rawKey := must(stream.ParseTxnMetaKeyFrom(txnEntry.Key))
value := must(stream.ExtractValue(&txnEntry, file.Cf))
opType := "PUT"
if file.Cf == "write" {
// parse the write
rawWriteCFValue := new(stream.RawWriteCFValue)
req.NoError(rawWriteCFValue.ParseFrom(txnEntry.Value))
if rawWriteCFValue.IsDelete() {
opType = "DEL"
}
if rawWriteCFValue.IsRollback() {
opType = "ROLLBACK"
}
}

if meta.IsDBkey(rawKey.Field) {
if len(value) == 0 {
must(fmt.Fprintf(outFileFd, "DB:empty:%s:%s:%s\n", file.Cf, opType, hex.EncodeToString(it.Key())))
continue
}
// print the db name, key hex, db id to outFileFd
dbInfo := new(model.DBInfo)
req.NoError(json.Unmarshal(value, dbInfo))
must(fmt.Fprintf(outFileFd, "DB:%d:%s:%s:%s\n", dbInfo.ID, dbInfo.Name.O, file.Cf, hex.EncodeToString(it.Key())))
}

if meta.IsTableKey(rawKey.Field) {
if len(value) == 0 {
must(fmt.Fprintf(outFileFd, "Table:empty:%s:%s:%s\n", file.Cf, opType, hex.EncodeToString(it.Key())))
continue
}
dbID := must(stream.ParseDBIDFromTableKey(txnEntry.Key))
tableInfo := new(model.TableInfo)
req.NoError(json.Unmarshal(value, tableInfo), "tableinfo = %s\n", value)
must(fmt.Fprintf(outFileFd, "Table:%d:%d:%s:%s\n", dbID, tableInfo.ID, tableInfo.Name.O, hex.EncodeToString(it.Key())))
}
}

}

req.NoError(outFileFd.Close())
}

0 comments on commit 67ee0dd

Please sign in to comment.