Skip to content

Commit

Permalink
br: resolve stuck in backup (#54736) (#55403)
Browse files Browse the repository at this point in the history
close #53480
  • Loading branch information
ti-chi-bot authored Jan 10, 2025
1 parent ee8e3d9 commit 2b5884c
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 2 deletions.
4 changes: 3 additions & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ go_test(
"client_test.go",
"main_test.go",
"schema_test.go",
"store_test.go",
],
embed = [":backup"],
flaky = True,
race = "on",
shard_count = 10,
shard_count = 13,
deps = [
"//br/pkg/conn",
"//br/pkg/errors",
Expand Down Expand Up @@ -97,6 +98,7 @@ go_test(
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_pd_client//:client",
"@io_opencensus_go//stats/view",
"@org_golang_google_grpc//:grpc",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_multierr//:multierr",
],
Expand Down
65 changes: 64 additions & 1 deletion br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1374,12 +1374,74 @@ func (bc *Client) handleFineGrained(
return backoffMill, nil
}

// timeoutRecv cancel the context if `Refresh()` is not called within the specified time `timeout`.
type timeoutRecv struct {
wg sync.WaitGroup
parentCtx context.Context
cancel context.CancelCauseFunc

refresh chan struct{}
}

// Refresh the timeout ticker
func (trecv *timeoutRecv) Refresh() {
select {
case <-trecv.parentCtx.Done():
case trecv.refresh <- struct{}{}:
}
}

// Stop the timeout ticker
func (trecv *timeoutRecv) Stop() {
close(trecv.refresh)
trecv.wg.Wait()
trecv.cancel(nil)
}

var TimeoutOneResponse = time.Hour

func (trecv *timeoutRecv) loop(timeout time.Duration) {
defer trecv.wg.Done()
ticker := time.NewTicker(timeout)
defer ticker.Stop()
for {
ticker.Reset(timeout)
select {
case <-trecv.parentCtx.Done():
return
case _, ok := <-trecv.refresh:
if !ok {
return
}
case <-ticker.C:
log.Warn("receive a backup response timeout")
trecv.cancel(errors.Errorf("receive a backup response timeout"))
}
}
}

func StartTimeoutRecv(ctx context.Context, timeout time.Duration) (context.Context, *timeoutRecv) {
cctx, cancel := context.WithCancelCause(ctx)
trecv := &timeoutRecv{
parentCtx: ctx,
cancel: cancel,
refresh: make(chan struct{}),
}
trecv.wg.Add(1)
go trecv.loop(timeout)
return cctx, trecv
}

func doSendBackup(
ctx context.Context,
pctx context.Context,
client backuppb.BackupClient,
req backuppb.BackupRequest,
respFn func(*backuppb.BackupResponse) error,
) error {
// Backup might be stuck on GRPC `waitonHeader`, so start a timeout ticker to
// terminate the backup if it does not receive any new response for a long time.
ctx, timerecv := StartTimeoutRecv(pctx, TimeoutOneResponse)
defer timerecv.Stop()
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
logutil.CL(ctx).Info("failpoint hint-backup-start injected, " +
"process will notify the shell.")
Expand Down Expand Up @@ -1424,6 +1486,7 @@ func doSendBackup(

for {
resp, err := bCli.Recv()
timerecv.Refresh()
if err != nil {
if errors.Cause(err) == io.EOF { // nolint:errorlint
logutil.CL(ctx).Debug("backup streaming finish",
Expand Down
108 changes: 108 additions & 0 deletions br/pkg/backup/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package backup

import (
"context"
"io"
"testing"
"time"

backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

type MockBackupClient struct {
backuppb.BackupClient

recvFunc func(context.Context) (*backuppb.BackupResponse, error)
}

func (mbc *MockBackupClient) Backup(ctx context.Context, _ *backuppb.BackupRequest, _ ...grpc.CallOption) (backuppb.Backup_BackupClient, error) {
return &MockBackupBackupClient{ctx: ctx, recvFunc: mbc.recvFunc}, nil
}

type MockBackupBackupClient struct {
backuppb.Backup_BackupClient

ctx context.Context
recvFunc func(context.Context) (*backuppb.BackupResponse, error)
}

func (mbbc *MockBackupBackupClient) CloseSend() error {
return nil
}

func (mbbc *MockBackupBackupClient) Recv() (*backuppb.BackupResponse, error) {
if mbbc.recvFunc != nil {
return mbbc.recvFunc(mbbc.ctx)
}
return &backuppb.BackupResponse{}, nil
}

func TestTimeoutRecv(t *testing.T) {
ctx := context.Background()
TimeoutOneResponse = time.Millisecond * 800
// Just Timeout Once
{
err := doSendBackup(ctx, &MockBackupClient{
recvFunc: func(ctx context.Context) (*backuppb.BackupResponse, error) {
time.Sleep(time.Second)
require.Error(t, ctx.Err())
return nil, io.EOF
},
}, backuppb.BackupRequest{}, func(br *backuppb.BackupResponse) error { return nil })
require.NoError(t, err)
}

// Timeout Not At First
{
count := 0
err := doSendBackup(ctx, &MockBackupClient{
recvFunc: func(ctx context.Context) (*backuppb.BackupResponse, error) {
require.NoError(t, ctx.Err())
if count == 15 {
time.Sleep(time.Second)
require.Error(t, ctx.Err())
return nil, io.EOF
}
count += 1
time.Sleep(time.Millisecond * 80)
return &backuppb.BackupResponse{}, nil
},
}, backuppb.BackupRequest{}, func(br *backuppb.BackupResponse) error { return nil })
require.NoError(t, err)
}
}

func TestTimeoutRecvCancel(t *testing.T) {
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)

_, trecv := StartTimeoutRecv(cctx, time.Hour)
cancel()
trecv.wg.Wait()
}

func TestTimeoutRecvCanceled(t *testing.T) {
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)
defer cancel()

tctx, trecv := StartTimeoutRecv(cctx, time.Hour)
trecv.Stop()
require.Equal(t, "context canceled", tctx.Err().Error())
}

0 comments on commit 2b5884c

Please sign in to comment.