From 67ee0dd2a7fcc3fc6fcc75bde4b51d1f7a5a4ddf Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 10 Jan 2025 17:40:00 +0800 Subject: [PATCH] fix filtering ingested ssts Signed-off-by: hillium --- br/pkg/restore/log_client/migration.go | 5 +- .../playground_but_must_ends_with_test.go | 119 ++++++++++++++++++ 2 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 br/pkg/restore/log_client/playground_but_must_ends_with_test.go diff --git a/br/pkg/restore/log_client/migration.go b/br/pkg/restore/log_client/migration.go index 9802e190162f8..81a458d42315b 100644 --- a/br/pkg/restore/log_client/migration.go +++ b/br/pkg/restore/log_client/migration.go @@ -165,6 +165,7 @@ func (builder *WithMigrationsBuilder) Build(migs []*backuppb.Migration) WithMigr compactionDirs: compactionDirs, fullBackups: fullBackups, restoredTS: builder.restoredTS, + startTS: builder.startTS, } return withMigrations } @@ -218,6 +219,7 @@ type WithMigrations struct { compactionDirs []string fullBackups []string restoredTS uint64 + startTS uint64 } func (wm *WithMigrations) Metas(metaNameIter MetaNameIter) MetaMigrationsIter { @@ -249,7 +251,8 @@ func (wm *WithMigrations) Compactions(ctx context.Context, s storage.ExternalSto func (wm *WithMigrations) IngestedSSTss(ctx context.Context, s storage.ExternalStorage) iter.TryNextor[*backuppb.IngestedSSTs] { filteredOut := iter.FilterOut(stream.LoadIngestedSSTss(ctx, s, wm.fullBackups), func(ebk stream.IngestedSSTss) bool { - return !ebk.GroupFinished() || ebk.GroupTS() > wm.restoredTS + gts := ebk.GroupTS() + return !ebk.GroupFinished() || gts < wm.startTS || gts > wm.restoredTS }) return iter.FlatMap(filteredOut, func(ebk stream.IngestedSSTss) iter.TryNextor[*backuppb.IngestedSSTs] { return iter.Map(iter.FromSlice(ebk), func(p stream.PathedIngestedSSTs) *backuppb.IngestedSSTs { diff --git a/br/pkg/restore/log_client/playground_but_must_ends_with_test.go b/br/pkg/restore/log_client/playground_but_must_ends_with_test.go new file mode 100644 index 0000000000000..80fded042e4eb --- /dev/null +++ b/br/pkg/restore/log_client/playground_but_must_ends_with_test.go @@ -0,0 +1,119 @@ +package logclient + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + "math" + "os" + "testing" + + "github.com/cheggaaa/pb/v3" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/stream" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/stretchr/testify/require" +) + +var ( + _t_ *testing.T +) + +func must[T any](t T, err error) T { + require.NoError(_t_, err) + return t +} + +func TestXXX(t *testing.T) { + + req := require.New(t) + _t_ = t + defer func() { _t_ = nil }() + ctx := context.Background() + + strgURL, ok := os.LookupEnv("storage") + req.True(ok) + outFile := "./tables.txt" + if of, ok := os.LookupEnv("out"); ok { + outFile = of + } + outFileFd := must(os.Create(outFile)) + + s := must(storage.NewFromURL(ctx, strgURL)) + mb := new(WithMigrationsBuilder) + wm := mb.Build(nil) + fileMgr := must(CreateLogFileManager(ctx, LogFileManagerInit{ + StartTS: 0, + RestoreTS: math.MaxInt64, + Storage: s, + + MigrationsBuilder: mb, + Migrations: &wm, + MetadataDownloadBatchSize: 128, + })) + files := must(fileMgr.LoadDDLFilesAndCountDMLFiles(ctx)) + hp := fileMgr.helper + p := pb.New(len(files)) + p.Start() + defer p.Finish() + + for _, file := range files { + content := must(hp.ReadFile(ctx, file.Path, file.RangeOffset, file.RangeLength, file.CompressionType, s, file.FileEncryptionInfo)) + it := stream.NewEventIterator(content) + p.Increment() + + for it.Valid() { + it.Next() + + txnEntry := kv.Entry{Key: it.Key(), Value: it.Value()} + + if !stream.IsMetaDBKey(txnEntry.Key) { + // only restore mDB and mDDLHistory + continue + } + + rawKey := must(stream.ParseTxnMetaKeyFrom(txnEntry.Key)) + value := must(stream.ExtractValue(&txnEntry, file.Cf)) + opType := "PUT" + if file.Cf == "write" { + // parse the write + rawWriteCFValue := new(stream.RawWriteCFValue) + req.NoError(rawWriteCFValue.ParseFrom(txnEntry.Value)) + if rawWriteCFValue.IsDelete() { + opType = "DEL" + } + if rawWriteCFValue.IsRollback() { + opType = "ROLLBACK" + } + } + + if meta.IsDBkey(rawKey.Field) { + if len(value) == 0 { + must(fmt.Fprintf(outFileFd, "DB:empty:%s:%s:%s\n", file.Cf, opType, hex.EncodeToString(it.Key()))) + continue + } + // print the db name, key hex, db id to outFileFd + dbInfo := new(model.DBInfo) + req.NoError(json.Unmarshal(value, dbInfo)) + must(fmt.Fprintf(outFileFd, "DB:%d:%s:%s:%s\n", dbInfo.ID, dbInfo.Name.O, file.Cf, hex.EncodeToString(it.Key()))) + } + + if meta.IsTableKey(rawKey.Field) { + if len(value) == 0 { + must(fmt.Fprintf(outFileFd, "Table:empty:%s:%s:%s\n", file.Cf, opType, hex.EncodeToString(it.Key()))) + continue + } + dbID := must(stream.ParseDBIDFromTableKey(txnEntry.Key)) + tableInfo := new(model.TableInfo) + req.NoError(json.Unmarshal(value, tableInfo), "tableinfo = %s\n", value) + must(fmt.Fprintf(outFileFd, "Table:%d:%d:%s:%s\n", dbID, tableInfo.ID, tableInfo.Name.O, hex.EncodeToString(it.Key()))) + } + } + + } + + req.NoError(outFileFd.Close()) +}