Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/restore/log_client: use input ts as filter #58734

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5867,13 +5867,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "92a67bcc499c06fd3d76cc153362540b22eaf1b09c4bda62a1599ce876b8ed78",
strip_prefix = "github.com/pingcap/[email protected]20241120071417-b5b7843d9037",
sha256 = "db34e3f94e5ac8fc5465b5440583c9e037a7f16aea8d0d8a200cdff210b12038",
strip_prefix = "github.com/pingcap/[email protected]20250102071301-c35d2b410115",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ go_test(
],
embed = [":log_client"],
flaky = True,
shard_count = 45,
shard_count = 46,
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/restore/log_client/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,7 @@ func (helper *FakeStreamMetadataHelper) ReadFile(
) ([]byte, error) {
return helper.Data[offset : offset+length], nil
}

func (m WithMigrations) CompactionDirs() []string {
return m.compactionDirs
}
6 changes: 5 additions & 1 deletion br/pkg/restore/log_client/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ func (builder *WithMigrationsBuilder) coarseGrainedFilter(mig *backuppb.Migratio
// log file [ .. .. .. .. ]
//
for _, compaction := range mig.Compactions {
if compaction.CompactionUntilTs < builder.shiftStartTS || compaction.CompactionFromTs > builder.restoredTS {
// Some old compaction may not contain input min / max ts.
// In that case, we should never filter it out.
rangeValid := compaction.InputMinTs != 0 && compaction.InputMaxTs != 0
outOfRange := compaction.InputMaxTs < builder.shiftStartTS || compaction.InputMinTs > builder.restoredTS
if rangeValid && outOfRange {
return true
}
}
Expand Down
146 changes: 118 additions & 28 deletions br/pkg/restore/log_client/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ func TestMigrations(t *testing.T) {
},
Compactions: []*backuppb.LogFileCompaction{
{
CompactionFromTs: 0,
CompactionUntilTs: 9,
InputMinTs: 1,
InputMaxTs: 9,
},
},
},
Expand All @@ -240,8 +240,8 @@ func TestMigrations(t *testing.T) {
},
Compactions: []*backuppb.LogFileCompaction{
{
CompactionFromTs: 50,
CompactionUntilTs: 52,
InputMinTs: 50,
InputMaxTs: 52,
},
},
},
Expand All @@ -264,8 +264,8 @@ func TestMigrations(t *testing.T) {
},
Compactions: []*backuppb.LogFileCompaction{
{
CompactionFromTs: 50,
CompactionUntilTs: 52,
InputMinTs: 50,
InputMaxTs: 52,
},
},
},
Expand All @@ -275,8 +275,8 @@ func TestMigrations(t *testing.T) {
},
Compactions: []*backuppb.LogFileCompaction{
{
CompactionFromTs: 120,
CompactionUntilTs: 140,
InputMinTs: 120,
InputMaxTs: 140,
},
},
},
Expand All @@ -299,8 +299,8 @@ func TestMigrations(t *testing.T) {
},
Compactions: []*backuppb.LogFileCompaction{
{
CompactionFromTs: 50,
CompactionUntilTs: 52,
InputMinTs: 50,
InputMaxTs: 52,
},
},
},
Expand All @@ -310,8 +310,8 @@ func TestMigrations(t *testing.T) {
},
Compactions: []*backuppb.LogFileCompaction{
{
CompactionFromTs: 1200,
CompactionUntilTs: 1400,
InputMinTs: 1200,
InputMaxTs: 1400,
},
},
},
Expand All @@ -329,24 +329,114 @@ func TestMigrations(t *testing.T) {
}

ctx := context.Background()
for _, cs := range cases {
builder := logclient.NewMigrationBuilder(10, 100, 200)
withMigrations := builder.Build(cs.migrations)
it := withMigrations.Metas(generateMetaNameIter())
checkMetaNameIter(t, cs.expectStoreIds, it)
it = withMigrations.Metas(generateMetaNameIter())
collect := iter.CollectAll(ctx, it)
require.NoError(t, collect.Err)
for j, meta := range collect.Item {
physicalIter := generatePhysicalIter(meta)
checkPhysicalIter(t, cs.expectPhyLengths[j], physicalIter)
physicalIter = generatePhysicalIter(meta)
collect := iter.CollectAll(ctx, physicalIter)
for i, cs := range cases {
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
builder := logclient.NewMigrationBuilder(10, 100, 200)
withMigrations := builder.Build(cs.migrations)
it := withMigrations.Metas(generateMetaNameIter())
checkMetaNameIter(t, cs.expectStoreIds, it)
it = withMigrations.Metas(generateMetaNameIter())
collect := iter.CollectAll(ctx, it)
require.NoError(t, collect.Err)
for k, phy := range collect.Item {
logicalIter := generateLogicalIter(phy)
checkLogicalIter(t, cs.expectLogLengths[j][k], logicalIter)
for j, meta := range collect.Item {
physicalIter := generatePhysicalIter(meta)
checkPhysicalIter(t, cs.expectPhyLengths[j], physicalIter)
physicalIter = generatePhysicalIter(meta)
collect := iter.CollectAll(ctx, physicalIter)
require.NoError(t, collect.Err)
for k, phy := range collect.Item {
logicalIter := generateLogicalIter(phy)
checkLogicalIter(t, cs.expectLogLengths[j][k], logicalIter)
}
}
})
}
}

func pack[T any](ts ...T) []T {
return ts
}

func TestFilterOut(t *testing.T) {
type Case struct {
ShiftedStartTs uint64
RestoredTs uint64
Migs []*backuppb.Migration

ExceptedCompactionsArtificateDir []string
}
withCompactTsCompaction := func(iMin, iMax, cFrom, cUntil uint64, name string) *backuppb.LogFileCompaction {
return &backuppb.LogFileCompaction{
InputMinTs: iMin,
InputMaxTs: iMax,
CompactionFromTs: cFrom,
CompactionUntilTs: cUntil,
Artifacts: name,
}
}
simpleCompaction := func(iMin, iMax uint64, name string) *backuppb.LogFileCompaction {
return &backuppb.LogFileCompaction{
InputMinTs: iMin,
InputMaxTs: iMax,
Artifacts: name,
}
}
makeMig := func(cs ...*backuppb.LogFileCompaction) *backuppb.Migration {
return &backuppb.Migration{Compactions: cs}
}

cases := []Case{
{
ShiftedStartTs: 50,
RestoredTs: 60,
Migs: pack(
makeMig(simpleCompaction(49, 61, "a")),
makeMig(simpleCompaction(61, 80, "b")),
),

ExceptedCompactionsArtificateDir: pack("a"),
},
{
ShiftedStartTs: 30,
RestoredTs: 50,
Migs: pack(
makeMig(simpleCompaction(40, 60, "1a")),
makeMig(simpleCompaction(10, 20, "1b")),
makeMig(simpleCompaction(31, 50, "2a")),
makeMig(simpleCompaction(50, 80, "2b")),
),

ExceptedCompactionsArtificateDir: pack("1a", "2a", "2b"),
},
{
ShiftedStartTs: 30,
RestoredTs: 50,
Migs: pack(
makeMig(withCompactTsCompaction(49, 100, 50, 99, "a")),
makeMig(withCompactTsCompaction(10, 30, 15, 29, "b")),
makeMig(withCompactTsCompaction(8, 29, 10, 20, "c")),
),

ExceptedCompactionsArtificateDir: pack("a", "b"),
},
{
ShiftedStartTs: 100,
RestoredTs: 120,
Migs: pack(
makeMig(withCompactTsCompaction(49, 100, 50, 99, "a")),
makeMig(withCompactTsCompaction(0, 0, 15, 29, "b")),
makeMig(withCompactTsCompaction(0, 0, 10, 20, "c")),
),

ExceptedCompactionsArtificateDir: pack("a", "b", "c"),
},
}

for i, c := range cases {
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
b := logclient.NewMigrationBuilder(c.ShiftedStartTs, c.ShiftedStartTs, c.RestoredTs)
i := b.Build(c.Migs)
require.ElementsMatch(t, i.CompactionDirs(), c.ExceptedCompactionsArtificateDir)
})
}
}
5 changes: 3 additions & 2 deletions br/pkg/stream/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,8 +1073,9 @@ func (m MigrationExt) doTruncating(ctx context.Context, mig *pb.Migration, resul
// NOTE: Execution of truncation wasn't implemented here.
// If we are going to truncate some files, for now we still need to use `br log truncate`.
for _, compaction := range mig.Compactions {
// Can we also remove the compaction when `until-ts` is equal to `truncated-to`...?
if compaction.CompactionUntilTs > mig.TruncatedTo {
// We can only clean up a compaction when we are sure all its inputs
// are no more used.
if compaction.InputMaxTs > mig.TruncatedTo {
result.NewBase.Compactions = append(result.NewBase.Compactions, compaction)
} else {
m.tryRemovePrefix(ctx, compaction.Artifacts, result)
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/stream/stream_metas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,13 @@ func mDstrPfx(path ...string) migOP {
}
}

func mCompaction(cPath, aPath string, fromTs, untilTs uint64) migOP {
func mCompaction(cPath, aPath string, minTs, maxTs uint64) migOP {
return func(m *backuppb.Migration) {
c := &backuppb.LogFileCompaction{}
c.GeneratedFiles = cPath
c.Artifacts = aPath
c.CompactionFromTs = fromTs
c.CompactionUntilTs = untilTs
c.InputMinTs = minTs
c.InputMaxTs = maxTs
m.Compactions = append(m.Compactions, c)
}
}
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ func (t trivialFlushStream) RecvMsg(m any) error {
return nil
}

func (f *fakeStore) FlushNow(ctx context.Context, in *logbackup.FlushNowRequest, opts ...grpc.CallOption) (*logbackup.FlushNowResponse, error) {
f.flush()
return &logbackup.FlushNowResponse{Results: []*logbackup.FlushResult{{TaskName: "Universe", Success: true}}}, nil
}

func (f *fakeStore) GetID() uint64 {
return f.id
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/fn v1.0.0
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037
github.com/pingcap/kvproto v0.0.0-20250102071301-c35d2b410115
github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20250102071301-c35d2b410115 h1:tFaBKtuVsTaYgWVa4fJVBHEi3vqdqRtmjMypEK+CN88=
github.com/pingcap/kvproto v0.0.0-20250102071301-c35d2b410115/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TLMdZfNEDaiU8cFpZe3iaqDbQ0M8=
Expand Down