Skip to content

Commit

Permalink
br: revert backup progress (pingcap#53914)
Browse files Browse the repository at this point in the history
  • Loading branch information
Leavrth authored and ti-chi-bot committed Jan 6, 2025
1 parent 7362832 commit a63670f
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 101 deletions.
51 changes: 34 additions & 17 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
// MaxResolveLocksbackupOffSleep is the maximum sleep time for resolving locks.
// 10 minutes for every round.
MaxResolveLocksbackupOffSleepMs = 600000

IncompleteRangesUpdateInterval = time.Second * 15
)

// ClientMgr manages connections needed by backup.
Expand All @@ -69,6 +71,13 @@ type Checksum struct {
// ProgressUnit represents the unit of progress.
type ProgressUnit string

const (
// UnitRange represents the progress updated counter when a range finished.
UnitRange ProgressUnit = "range"
// UnitRegion represents the progress updated counter when a region finished.
UnitRegion ProgressUnit = "region"
)

type MainBackupLoop struct {
BackupSender

Expand All @@ -82,7 +91,7 @@ type MainBackupLoop struct {
ReplicaReadLabel map[string]string
StateNotifier chan BackupRetryPolicy

ProgressCallBack func()
ProgressCallBack func(ProgressUnit)
GetBackupClientCallBack func(ctx context.Context, storeID uint64, reset bool) (backuppb.BackupClient, error)
}

Expand Down Expand Up @@ -176,6 +185,9 @@ func (bc *Client) RunLoop(ctx context.Context, loop *MainBackupLoop) error {
round := uint64(0)
// reset grpc connection every round except key_locked error.
reset := true
// update incompleteRanges to advance the progress and the request.
incompleteRangesUpdateTicker := time.NewTicker(IncompleteRangesUpdateInterval)
defer incompleteRangesUpdateTicker.Stop()
mainLoop:
for {
round += 1
Expand Down Expand Up @@ -211,8 +223,7 @@ mainLoop:
mainCancel()
return ctx.Err()
default:
iter := loop.GlobalProgressTree.Iter()
inCompleteRanges = iter.GetIncompleteRanges()
inCompleteRanges = loop.GlobalProgressTree.GetIncompleteRanges()
if len(inCompleteRanges) == 0 {
// all range backuped
logutil.CL(ctx).Info("This round finished all backup ranges", zap.Uint64("round", round))
Expand Down Expand Up @@ -259,13 +270,21 @@ mainLoop:
}
// infinite loop to collect region backup response to global channel
loop.CollectStoreBackupsAsync(handleCtx, round, storeBackupResultChMap, globalBackupResultCh)
incompleteRangesUpdateTicker.Reset(IncompleteRangesUpdateInterval)
handleLoop:
for {
select {
case <-ctx.Done():
handleCancel()
mainCancel()
return ctx.Err()
case <-incompleteRangesUpdateTicker.C:
startUpdate := time.Now()
inCompleteRanges = loop.GlobalProgressTree.GetIncompleteRanges()
loop.BackupReq.SubRanges = getBackupRanges(inCompleteRanges)
elapsed := time.Since(startUpdate)
log.Info("update the incomplete ranges", zap.Duration("take", elapsed))
incompleteRangesUpdateTicker.Reset(max(5*elapsed, IncompleteRangesUpdateInterval))
case storeBackupInfo := <-loop.StateNotifier:
if storeBackupInfo.All {
logutil.CL(mainCtx).Info("cluster state changed. restart store backups", zap.Uint64("round", round))
Expand Down Expand Up @@ -352,7 +371,7 @@ mainLoop:
if lock != nil {
allTxnLocks = append(allTxnLocks, lock)
}
loop.ProgressCallBack()
loop.ProgressCallBack(UnitRegion)
}
}
}
Expand Down Expand Up @@ -562,7 +581,7 @@ func (bc *Client) StartCheckpointRunner(
backupTS uint64,
ranges []rtree.Range,
safePointID string,
progressCallBack func(),
progressCallBack func(ProgressUnit),
) (err error) {
if bc.checkpointMeta == nil {
bc.checkpointMeta = &checkpoint.CheckpointMetadataForBackup{
Expand Down Expand Up @@ -603,31 +622,28 @@ func (bc *Client) getProgressRange(r rtree.Range) *rtree.ProgressRange {
if bc.checkpointMeta != nil && len(bc.checkpointMeta.CheckpointDataMap) > 0 {
rangeTree, exists := bc.checkpointMeta.CheckpointDataMap[groupKey]
if exists {
incomplete := rangeTree.GetIncompleteRange(r.StartKey, r.EndKey)
delete(bc.checkpointMeta.CheckpointDataMap, groupKey)
return &rtree.ProgressRange{
Res: rangeTree,
Incomplete: incomplete,
Origin: r,
GroupKey: groupKey,
Res: rangeTree,
Origin: r,
GroupKey: groupKey,
Complete: false,
}
}
}

// the origin range are not recorded in checkpoint
// return the default progress range
return &rtree.ProgressRange{
Res: rtree.NewRangeTree(),
Incomplete: []rtree.Range{
r,
},
Res: rtree.NewRangeTree(),
Origin: r,
GroupKey: groupKey,
Complete: false,
}
}

// LoadCheckpointRange loads the checkpoint(finished) sub-ranges of the current range, and calculate its incompleted sub-ranges.
func (bc *Client) loadCheckpointRanges(ctx context.Context, progressCallBack func()) (map[string]rtree.RangeTree, error) {
func (bc *Client) loadCheckpointRanges(ctx context.Context, progressCallBack func(ProgressUnit)) (map[string]rtree.RangeTree, error) {
rangeDataMap := make(map[string]rtree.RangeTree)

pastDureTime, err := checkpoint.WalkCheckpointFileForBackup(ctx, bc.storage, bc.cipher, func(groupKey string, rg checkpoint.BackupValueType) {
Expand All @@ -637,7 +653,7 @@ func (bc *Client) loadCheckpointRanges(ctx context.Context, progressCallBack fun
rangeDataMap[groupKey] = rangeTree
}
rangeTree.Put(rg.StartKey, rg.EndKey, rg.Files)
progressCallBack()
progressCallBack(UnitRegion)
})

// we should adjust start-time of the summary to `pastDureTime` earlier
Expand Down Expand Up @@ -1082,7 +1098,7 @@ func (bc *Client) BackupRanges(
concurrency uint,
replicaReadLabel map[string]string,
metaWriter *metautil.MetaWriter,
progressCallBack func(),
progressCallBack func(ProgressUnit),
) error {
log.Info("Backup Ranges Started", rtree.ZapRanges(ranges))
init := time.Now()
Expand All @@ -1101,6 +1117,7 @@ func (bc *Client) BackupRanges(
if err != nil {
return errors.Trace(err)
}
globalProgressTree.SetCallBack(func() { progressCallBack(UnitRange) })

stateNotifier := make(chan BackupRetryPolicy)
ObserveStoreChangesAsync(ctx, stateNotifier, bc.mgr.GetPDClient())
Expand Down
14 changes: 7 additions & 7 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func TestOnBackupResponse(t *testing.T) {
require.NoError(t, err)
require.Nil(t, lock)

incomplete := tree.Iter().GetIncompleteRanges()
incomplete := tree.GetIncompleteRanges()
require.Len(t, incomplete, 1)
require.Equal(t, []byte("b"), incomplete[0].StartKey)
require.Equal(t, []byte("c"), incomplete[0].EndKey)
Expand All @@ -398,7 +398,7 @@ func TestOnBackupResponse(t *testing.T) {
lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
require.NoError(t, err)
require.Nil(t, lock)
incomplete = tree.Iter().GetIncompleteRanges()
incomplete = tree.GetIncompleteRanges()
require.Len(t, incomplete, 0)

// case #5: failed case, key is locked
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestMainBackupLoop(t *testing.T) {
GlobalProgressTree: &tree,
ReplicaReadLabel: nil,
StateNotifier: ch,
ProgressCallBack: func() {},
ProgressCallBack: func(backup.ProgressUnit) {},
GetBackupClientCallBack: mockGetBackupClientCallBack,
}

Expand Down Expand Up @@ -548,7 +548,7 @@ func TestMainBackupLoop(t *testing.T) {
GlobalProgressTree: &tree,
ReplicaReadLabel: nil,
StateNotifier: ch,
ProgressCallBack: func() {},
ProgressCallBack: func(backup.ProgressUnit) {},
GetBackupClientCallBack: mockGetBackupClientCallBack,
}

Expand Down Expand Up @@ -600,7 +600,7 @@ func TestMainBackupLoop(t *testing.T) {
GlobalProgressTree: &tree,
ReplicaReadLabel: nil,
StateNotifier: ch,
ProgressCallBack: func() {},
ProgressCallBack: func(backup.ProgressUnit) {},
GetBackupClientCallBack: mockGetBackupClientCallBack,
}
go func() {
Expand Down Expand Up @@ -655,7 +655,7 @@ func TestMainBackupLoop(t *testing.T) {
GlobalProgressTree: &tree,
ReplicaReadLabel: nil,
StateNotifier: ch,
ProgressCallBack: func() {},
ProgressCallBack: func(backup.ProgressUnit) {},
GetBackupClientCallBack: mockGetBackupClientCallBack,
}
go func() {
Expand Down Expand Up @@ -712,7 +712,7 @@ func TestMainBackupLoop(t *testing.T) {
GlobalProgressTree: &tree,
ReplicaReadLabel: nil,
StateNotifier: ch,
ProgressCallBack: func() {},
ProgressCallBack: func(backup.ProgressUnit) {},
GetBackupClientCallBack: mockGetBackupClientCallBack,
}
go func() {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/rtree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 5,
shard_count = 6,
deps = [
":rtree",
"//pkg/kv",
Expand Down
73 changes: 28 additions & 45 deletions br/pkg/rtree/rtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,11 @@ func (rangeTree *RangeTree) GetIncompleteRange(
}

type ProgressRange struct {
Res RangeTree
Incomplete []Range
Origin Range
GroupKey string
Res RangeTree
Origin Range
GroupKey string
// only for statistic
Complete bool
}

// Less impls btree.Item.
Expand All @@ -332,15 +333,24 @@ func (pr *ProgressRange) Less(than *ProgressRange) bool {
// All the progress ranges it sorted do not overlap.
type ProgressRangeTree struct {
*btree.BTreeG[*ProgressRange]

completeCallBack func()
}

// NewProgressRangeTree returns an empty range tree.
func NewProgressRangeTree() ProgressRangeTree {
return ProgressRangeTree{
BTreeG: btree.NewG[*ProgressRange](32, (*ProgressRange).Less),

completeCallBack: func() {},
}
}

// SetCallBack set the complete call back to update the progress.
func (rangeTree *ProgressRangeTree) SetCallBack(callback func()) {
rangeTree.completeCallBack = callback
}

// find is a helper function to find an item that contains the range.
func (rangeTree *ProgressRangeTree) find(pr *ProgressRange) *ProgressRange {
var ret *ProgressRange
Expand Down Expand Up @@ -392,50 +402,23 @@ func (rangeTree *ProgressRangeTree) FindContained(startKey, endKey []byte) (*Pro
return ret, nil
}

type incompleteRangesFetcherItem struct {
pr *ProgressRange
complete bool
}

type IncompleteRangesFetcher struct {
items []*incompleteRangesFetcherItem
left int
}

func (rangeTree *ProgressRangeTree) Iter() *IncompleteRangesFetcher {
items := make([]*incompleteRangesFetcherItem, 0, rangeTree.Len())
rangeTree.Ascend(func(item *ProgressRange) bool {
items = append(items, &incompleteRangesFetcherItem{
pr: item,
complete: false,
})
return true
})
return &IncompleteRangesFetcher{
items: items,
left: len(items),
}
}

func (iter *IncompleteRangesFetcher) GetIncompleteRanges() []Range {
func (rangeTree *ProgressRangeTree) GetIncompleteRanges() []Range {
// about 64 MB memory if there are 1 million ranges
incompleteRanges := make([]Range, 0, len(iter.items))
for _, item := range iter.items {
if item.complete {
continue
}

incomplete := item.pr.Res.GetIncompleteRange(item.pr.Origin.StartKey, item.pr.Origin.EndKey)
incompleteRanges := make([]Range, 0, rangeTree.Len())
rangeTree.Ascend(func(item *ProgressRange) bool {
// NOTE: maybe there is a late response whose range overlaps with an existing item, which
// may cause the complete range tree to become incomplete. Therefore, `item.Complete` is
// only for statistic.
incomplete := item.Res.GetIncompleteRange(item.Origin.StartKey, item.Origin.EndKey)
if len(incomplete) == 0 {
item.complete = true
iter.left -= 1
continue
if !item.Complete {
item.Complete = true
rangeTree.completeCallBack()
}
return true
}
incompleteRanges = append(incompleteRanges, incomplete...)
}
return true
})
return incompleteRanges
}

func (iter *IncompleteRangesFetcher) Len() int {
return iter.left
}
39 changes: 35 additions & 4 deletions br/pkg/rtree/rtree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ func TestProgressRangeTree(t *testing.T) {
require.NoError(t, prTree.Insert(buildProgressRange("cc", "dd")))
require.NoError(t, prTree.Insert(buildProgressRange("ee", "ff")))

prIter := prTree.Iter()
ranges := prIter.GetIncompleteRanges()
ranges := prTree.GetIncompleteRanges()
require.Equal(t, rtree.Range{StartKey: []byte("aa"), EndKey: []byte("cc")}, ranges[0])
require.Equal(t, rtree.Range{StartKey: []byte("cc"), EndKey: []byte("dd")}, ranges[1])
require.Equal(t, rtree.Range{StartKey: []byte("ee"), EndKey: []byte("ff")}, ranges[2])
Expand All @@ -253,7 +252,7 @@ func TestProgressRangeTree(t *testing.T) {
require.NoError(t, err)
pr.Res.Put([]byte("cc"), []byte("dd"), nil)

ranges = prIter.GetIncompleteRanges()
ranges = prTree.GetIncompleteRanges()
require.Equal(t, rtree.Range{StartKey: []byte("aa"), EndKey: []byte("aaa")}, ranges[0])
require.Equal(t, rtree.Range{StartKey: []byte("b"), EndKey: []byte("cc")}, ranges[1])
require.Equal(t, rtree.Range{StartKey: []byte("ee"), EndKey: []byte("ff")}, ranges[2])
Expand All @@ -270,6 +269,38 @@ func TestProgressRangeTree(t *testing.T) {
require.NoError(t, err)
pr.Res.Put([]byte("ee"), []byte("ff"), nil)

ranges = prIter.GetIncompleteRanges()
ranges = prTree.GetIncompleteRanges()
require.Equal(t, 0, len(ranges))
}

func TestProgreeRangeTreeCallBack(t *testing.T) {
prTree := rtree.NewProgressRangeTree()

require.NoError(t, prTree.Insert(buildProgressRange("a", "b")))
require.NoError(t, prTree.Insert(buildProgressRange("c", "d")))
require.NoError(t, prTree.Insert(buildProgressRange("e", "f")))

completeCount := 0
prTree.SetCallBack(func() { completeCount += 1 })

pr, err := prTree.FindContained([]byte("a"), []byte("b"))
require.NoError(t, err)
pr.Res.Put([]byte("a"), []byte("b"), nil)
ranges := prTree.GetIncompleteRanges()
require.Equal(t, completeCount, 1)
require.Equal(t, rtree.Range{StartKey: []byte("c"), EndKey: []byte("d")}, ranges[0])
require.Equal(t, rtree.Range{StartKey: []byte("e"), EndKey: []byte("f")}, ranges[1])

pr.Res.Put([]byte("a"), []byte("aa"), nil)
ranges = prTree.GetIncompleteRanges()
require.Equal(t, completeCount, 1)
require.Equal(t, rtree.Range{StartKey: []byte("aa"), EndKey: []byte("b")}, ranges[0])
require.Equal(t, rtree.Range{StartKey: []byte("c"), EndKey: []byte("d")}, ranges[1])
require.Equal(t, rtree.Range{StartKey: []byte("e"), EndKey: []byte("f")}, ranges[2])

pr.Res.Put([]byte("a"), []byte("b"), nil)
ranges = prTree.GetIncompleteRanges()
require.Equal(t, completeCount, 1)
require.Equal(t, rtree.Range{StartKey: []byte("c"), EndKey: []byte("d")}, ranges[0])
require.Equal(t, rtree.Range{StartKey: []byte("e"), EndKey: []byte("f")}, ranges[1])
}
Loading

0 comments on commit a63670f

Please sign in to comment.