diff --git a/pkg/store/copr/BUILD.bazel b/pkg/store/copr/BUILD.bazel index 0141741598e89..cf98acefa8e56 100644 --- a/pkg/store/copr/BUILD.bazel +++ b/pkg/store/copr/BUILD.bazel @@ -83,7 +83,7 @@ go_test( embed = [":copr"], flaky = True, race = "on", - shard_count = 30, + shard_count = 31, deps = [ "//pkg/kv", "//pkg/store/driver/backoff", diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index b6d59141e82dc..8a019be428906 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -572,10 +572,17 @@ func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) { // disable paging for batched task. b.tasks[idx].paging = false b.tasks[idx].pagingSize = 0 + // The task and it's batched can be served only in the store we chose. + // If the task is redirected to other replica, the batched task may not meet region-miss or store-not-match error. + // So disable busy threshold for the task which carries batched tasks. + b.tasks[idx].busyThreshold = 0 } if task.RowCountHint > 0 { b.tasks[idx].RowCountHint += task.RowCountHint } + batchedTask.task.paging = false + batchedTask.task.pagingSize = 0 + batchedTask.task.busyThreshold = 0 b.tasks[idx].batchTaskList[task.taskID] = batchedTask } handled = true diff --git a/pkg/store/copr/coprocessor_test.go b/pkg/store/copr/coprocessor_test.go index a3fef83fd785b..d9fa5933fd703 100644 --- a/pkg/store/copr/coprocessor_test.go +++ b/pkg/store/copr/coprocessor_test.go @@ -17,6 +17,7 @@ package copr import ( "context" "testing" + "time" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/tidb/pkg/kv" @@ -881,3 +882,53 @@ func TestSmallTaskConcurrencyLimit(t *testing.T) { require.Equal(t, smallConcPerCore, conc) require.Equal(t, smallTaskCount, count) } + +func TestBatchStoreCoprOnlySendToLeader(t *testing.T) { + // nil --- 'g' --- 'n' --- 't' --- nil + // <- 0 -> <- 1 -> <- 2 -> <- 3 -> + mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(t, err) + defer func() { + pdClient.Close() + err = mockClient.Close() + require.NoError(t, err) + }() + _, _, _ = testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) + pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) + defer pdCli.Close() + cache := NewRegionCache(tikv.NewRegionCache(pdCli)) + defer cache.Close() + + bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) + req := &kv.Request{ + StoreBatchSize: 3, + StoreBusyThreshold: time.Second, + } + ranges := buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z") + tasks, err := buildCopTasks(bo, ranges, &buildCopTaskOpt{ + req: req, + cache: cache, + rowHints: []int{1, 1, 3, 3}, + }) + require.Len(t, tasks, 1) + require.Zero(t, tasks[0].busyThreshold) + batched := tasks[0].batchTaskList + require.Len(t, batched, 3) + for _, task := range batched { + require.Zero(t, task.task.busyThreshold) + } + + req = &kv.Request{ + StoreBatchSize: 0, + StoreBusyThreshold: time.Second, + } + tasks, err = buildCopTasks(bo, ranges, &buildCopTaskOpt{ + req: req, + cache: cache, + rowHints: []int{1, 1, 3, 3}, + }) + require.Len(t, tasks, 4) + for _, task := range tasks { + require.Equal(t, task.busyThreshold, time.Second) + } +} diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index 7129cb2e061cd..01fa91deec103 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -16,9 +16,7 @@ package copr import ( "bytes" - "math" "strconv" - "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -285,57 +283,25 @@ func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *tikv.Store // BuildBatchTask fetches store and peer info for cop task, wrap it as `batchedCopTask`. func (c *RegionCache) BuildBatchTask(bo *Backoffer, req *kv.Request, task *copTask, replicaRead kv.ReplicaReadType) (*batchedCopTask, error) { - var ( - rpcContext *tikv.RPCContext - err error - ) - if replicaRead == kv.ReplicaReadFollower { - followerStoreSeed := uint32(0) - leastEstWaitTime := time.Duration(math.MaxInt64) - var ( - firstFollowerPeer *uint64 - followerContext *tikv.RPCContext - ) - for { - followerContext, err = c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), followerStoreSeed) - if err != nil { - return nil, err - } - if firstFollowerPeer == nil { - firstFollowerPeer = &rpcContext.Peer.Id - } else if *firstFollowerPeer == rpcContext.Peer.Id { - break - } - estWaitTime := followerContext.Store.EstimatedWaitTime() - // the wait time of this follower is under given threshold, choose it. - if estWaitTime > req.StoreBusyThreshold { - continue - } - if rpcContext == nil { - rpcContext = followerContext - } else if estWaitTime < leastEstWaitTime { - leastEstWaitTime = estWaitTime - rpcContext = followerContext - } - followerStoreSeed++ - } - // all replicas are busy, fallback to leader. - if rpcContext == nil { - replicaRead = kv.ReplicaReadLeader - } + if replicaRead != kv.ReplicaReadLeader { + return nil, nil } - if replicaRead == kv.ReplicaReadLeader { - rpcContext, err = c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), 0) - if err != nil { - return nil, err - } + rpcContext, err := c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), 0) + if err != nil { + return nil, err } // fallback to non-batch path if rpcContext == nil { return nil, nil } + + // when leader is busy, we don't batch the cop task to allow the load balance to work. + if rpcContext.Store.EstimatedWaitTime() > req.StoreBusyThreshold { + return nil, nil + } + return &batchedCopTask{ task: task, region: coprocessor.RegionInfo{