diff --git a/DEPS.bzl b/DEPS.bzl index bf4ee0e36d695..004078cbd9108 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -5867,13 +5867,13 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sha256 = "92a67bcc499c06fd3d76cc153362540b22eaf1b09c4bda62a1599ce876b8ed78", - strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20241120071417-b5b7843d9037", + sha256 = "db34e3f94e5ac8fc5465b5440583c9e037a7f16aea8d0d8a200cdff210b12038", + strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20250102071301-c35d2b410115", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip", - "http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip", - "https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip", + "http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip", + "https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip", ], ) go_repository( diff --git a/br/pkg/restore/log_client/BUILD.bazel b/br/pkg/restore/log_client/BUILD.bazel index 7fb781e7ad0ef..78e588639bcb4 100644 --- a/br/pkg/restore/log_client/BUILD.bazel +++ b/br/pkg/restore/log_client/BUILD.bazel @@ -90,7 +90,7 @@ go_test( ], embed = [":log_client"], flaky = True, - shard_count = 45, + shard_count = 46, deps = [ "//br/pkg/errors", "//br/pkg/glue", diff --git a/br/pkg/restore/log_client/export_test.go b/br/pkg/restore/log_client/export_test.go index f78a54bf50c8a..1aaa472348ff8 100644 --- a/br/pkg/restore/log_client/export_test.go +++ b/br/pkg/restore/log_client/export_test.go @@ -127,3 +127,7 @@ func (helper *FakeStreamMetadataHelper) ReadFile( ) ([]byte, error) { return helper.Data[offset : offset+length], nil } + +func (m WithMigrations) CompactionDirs() []string { + return m.compactionDirs +} diff --git a/br/pkg/restore/log_client/migration.go b/br/pkg/restore/log_client/migration.go index a7b4307e0f568..c01a2f5095062 100644 --- a/br/pkg/restore/log_client/migration.go +++ b/br/pkg/restore/log_client/migration.go @@ -133,7 +133,11 @@ func (builder *WithMigrationsBuilder) coarseGrainedFilter(mig *backuppb.Migratio // log file [ .. .. .. .. ] // for _, compaction := range mig.Compactions { - if compaction.CompactionUntilTs < builder.shiftStartTS || compaction.CompactionFromTs > builder.restoredTS { + // Some old compaction may not contain input min / max ts. + // In that case, we should never filter it out. + rangeValid := compaction.InputMinTs != 0 && compaction.InputMaxTs != 0 + outOfRange := compaction.InputMaxTs < builder.shiftStartTS || compaction.InputMinTs > builder.restoredTS + if rangeValid && outOfRange { return true } } diff --git a/br/pkg/restore/log_client/migration_test.go b/br/pkg/restore/log_client/migration_test.go index 5368d7416dadf..f0c99439e3ae5 100644 --- a/br/pkg/restore/log_client/migration_test.go +++ b/br/pkg/restore/log_client/migration_test.go @@ -215,8 +215,8 @@ func TestMigrations(t *testing.T) { }, Compactions: []*backuppb.LogFileCompaction{ { - CompactionFromTs: 0, - CompactionUntilTs: 9, + InputMinTs: 1, + InputMaxTs: 9, }, }, }, @@ -240,8 +240,8 @@ func TestMigrations(t *testing.T) { }, Compactions: []*backuppb.LogFileCompaction{ { - CompactionFromTs: 50, - CompactionUntilTs: 52, + InputMinTs: 50, + InputMaxTs: 52, }, }, }, @@ -264,8 +264,8 @@ func TestMigrations(t *testing.T) { }, Compactions: []*backuppb.LogFileCompaction{ { - CompactionFromTs: 50, - CompactionUntilTs: 52, + InputMinTs: 50, + InputMaxTs: 52, }, }, }, @@ -275,8 +275,8 @@ func TestMigrations(t *testing.T) { }, Compactions: []*backuppb.LogFileCompaction{ { - CompactionFromTs: 120, - CompactionUntilTs: 140, + InputMinTs: 120, + InputMaxTs: 140, }, }, }, @@ -299,8 +299,8 @@ func TestMigrations(t *testing.T) { }, Compactions: []*backuppb.LogFileCompaction{ { - CompactionFromTs: 50, - CompactionUntilTs: 52, + InputMinTs: 50, + InputMaxTs: 52, }, }, }, @@ -310,8 +310,8 @@ func TestMigrations(t *testing.T) { }, Compactions: []*backuppb.LogFileCompaction{ { - CompactionFromTs: 1200, - CompactionUntilTs: 1400, + InputMinTs: 1200, + InputMaxTs: 1400, }, }, }, @@ -329,24 +329,114 @@ func TestMigrations(t *testing.T) { } ctx := context.Background() - for _, cs := range cases { - builder := logclient.NewMigrationBuilder(10, 100, 200) - withMigrations := builder.Build(cs.migrations) - it := withMigrations.Metas(generateMetaNameIter()) - checkMetaNameIter(t, cs.expectStoreIds, it) - it = withMigrations.Metas(generateMetaNameIter()) - collect := iter.CollectAll(ctx, it) - require.NoError(t, collect.Err) - for j, meta := range collect.Item { - physicalIter := generatePhysicalIter(meta) - checkPhysicalIter(t, cs.expectPhyLengths[j], physicalIter) - physicalIter = generatePhysicalIter(meta) - collect := iter.CollectAll(ctx, physicalIter) + for i, cs := range cases { + t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { + builder := logclient.NewMigrationBuilder(10, 100, 200) + withMigrations := builder.Build(cs.migrations) + it := withMigrations.Metas(generateMetaNameIter()) + checkMetaNameIter(t, cs.expectStoreIds, it) + it = withMigrations.Metas(generateMetaNameIter()) + collect := iter.CollectAll(ctx, it) require.NoError(t, collect.Err) - for k, phy := range collect.Item { - logicalIter := generateLogicalIter(phy) - checkLogicalIter(t, cs.expectLogLengths[j][k], logicalIter) + for j, meta := range collect.Item { + physicalIter := generatePhysicalIter(meta) + checkPhysicalIter(t, cs.expectPhyLengths[j], physicalIter) + physicalIter = generatePhysicalIter(meta) + collect := iter.CollectAll(ctx, physicalIter) + require.NoError(t, collect.Err) + for k, phy := range collect.Item { + logicalIter := generateLogicalIter(phy) + checkLogicalIter(t, cs.expectLogLengths[j][k], logicalIter) + } } + }) + } +} + +func pack[T any](ts ...T) []T { + return ts +} + +func TestFilterOut(t *testing.T) { + type Case struct { + ShiftedStartTs uint64 + RestoredTs uint64 + Migs []*backuppb.Migration + + ExceptedCompactionsArtificateDir []string + } + withCompactTsCompaction := func(iMin, iMax, cFrom, cUntil uint64, name string) *backuppb.LogFileCompaction { + return &backuppb.LogFileCompaction{ + InputMinTs: iMin, + InputMaxTs: iMax, + CompactionFromTs: cFrom, + CompactionUntilTs: cUntil, + Artifacts: name, + } + } + simpleCompaction := func(iMin, iMax uint64, name string) *backuppb.LogFileCompaction { + return &backuppb.LogFileCompaction{ + InputMinTs: iMin, + InputMaxTs: iMax, + Artifacts: name, } } + makeMig := func(cs ...*backuppb.LogFileCompaction) *backuppb.Migration { + return &backuppb.Migration{Compactions: cs} + } + + cases := []Case{ + { + ShiftedStartTs: 50, + RestoredTs: 60, + Migs: pack( + makeMig(simpleCompaction(49, 61, "a")), + makeMig(simpleCompaction(61, 80, "b")), + ), + + ExceptedCompactionsArtificateDir: pack("a"), + }, + { + ShiftedStartTs: 30, + RestoredTs: 50, + Migs: pack( + makeMig(simpleCompaction(40, 60, "1a")), + makeMig(simpleCompaction(10, 20, "1b")), + makeMig(simpleCompaction(31, 50, "2a")), + makeMig(simpleCompaction(50, 80, "2b")), + ), + + ExceptedCompactionsArtificateDir: pack("1a", "2a", "2b"), + }, + { + ShiftedStartTs: 30, + RestoredTs: 50, + Migs: pack( + makeMig(withCompactTsCompaction(49, 100, 50, 99, "a")), + makeMig(withCompactTsCompaction(10, 30, 15, 29, "b")), + makeMig(withCompactTsCompaction(8, 29, 10, 20, "c")), + ), + + ExceptedCompactionsArtificateDir: pack("a", "b"), + }, + { + ShiftedStartTs: 100, + RestoredTs: 120, + Migs: pack( + makeMig(withCompactTsCompaction(49, 100, 50, 99, "a")), + makeMig(withCompactTsCompaction(0, 0, 15, 29, "b")), + makeMig(withCompactTsCompaction(0, 0, 10, 20, "c")), + ), + + ExceptedCompactionsArtificateDir: pack("a", "b", "c"), + }, + } + + for i, c := range cases { + t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { + b := logclient.NewMigrationBuilder(c.ShiftedStartTs, c.ShiftedStartTs, c.RestoredTs) + i := b.Build(c.Migs) + require.ElementsMatch(t, i.CompactionDirs(), c.ExceptedCompactionsArtificateDir) + }) + } } diff --git a/br/pkg/stream/stream_metas.go b/br/pkg/stream/stream_metas.go index 6801035ce7214..11e449dbe4097 100644 --- a/br/pkg/stream/stream_metas.go +++ b/br/pkg/stream/stream_metas.go @@ -1073,8 +1073,9 @@ func (m MigrationExt) doTruncating(ctx context.Context, mig *pb.Migration, resul // NOTE: Execution of truncation wasn't implemented here. // If we are going to truncate some files, for now we still need to use `br log truncate`. for _, compaction := range mig.Compactions { - // Can we also remove the compaction when `until-ts` is equal to `truncated-to`...? - if compaction.CompactionUntilTs > mig.TruncatedTo { + // We can only clean up a compaction when we are sure all its inputs + // are no more used. + if compaction.InputMaxTs > mig.TruncatedTo { result.NewBase.Compactions = append(result.NewBase.Compactions, compaction) } else { m.tryRemovePrefix(ctx, compaction.Artifacts, result) diff --git a/br/pkg/stream/stream_metas_test.go b/br/pkg/stream/stream_metas_test.go index c0fcbbae623ce..8279707b65b09 100644 --- a/br/pkg/stream/stream_metas_test.go +++ b/br/pkg/stream/stream_metas_test.go @@ -452,13 +452,13 @@ func mDstrPfx(path ...string) migOP { } } -func mCompaction(cPath, aPath string, fromTs, untilTs uint64) migOP { +func mCompaction(cPath, aPath string, minTs, maxTs uint64) migOP { return func(m *backuppb.Migration) { c := &backuppb.LogFileCompaction{} c.GeneratedFiles = cPath c.Artifacts = aPath - c.CompactionFromTs = fromTs - c.CompactionUntilTs = untilTs + c.InputMinTs = minTs + c.InputMaxTs = maxTs m.Compactions = append(m.Compactions, c) } } diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 22fa031854fbe..3d0bf84ac4f78 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -179,6 +179,11 @@ func (t trivialFlushStream) RecvMsg(m any) error { return nil } +func (f *fakeStore) FlushNow(ctx context.Context, in *logbackup.FlushNowRequest, opts ...grpc.CallOption) (*logbackup.FlushNowResponse, error) { + f.flush() + return &logbackup.FlushNowResponse{Results: []*logbackup.FlushResult{{TaskName: "Universe", Success: true}}}, nil +} + func (f *fakeStore) GetID() uint64 { return f.id } diff --git a/go.mod b/go.mod index 7e9a0ac1a907d..fedc63af66beb 100644 --- a/go.mod +++ b/go.mod @@ -87,7 +87,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 github.com/pingcap/fn v1.0.0 - github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 + github.com/pingcap/kvproto v0.0.0-20250102071301-c35d2b410115 github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e diff --git a/go.sum b/go.sum index 0e36eb7a11528..9923e553c66c6 100644 --- a/go.sum +++ b/go.sum @@ -676,8 +676,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg= -github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20250102071301-c35d2b410115 h1:tFaBKtuVsTaYgWVa4fJVBHEi3vqdqRtmjMypEK+CN88= +github.com/pingcap/kvproto v0.0.0-20250102071301-c35d2b410115/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TLMdZfNEDaiU8cFpZe3iaqDbQ0M8=