Skip to content

Commit

Permalink
Merge #133894
Browse files Browse the repository at this point in the history
133894: crosscluster/producer: flush at lower threshold if consumer is ready r=dt a=msbutler

This patch allows the event stream to flush at a lower buffer threshold if the consumer is ready for more work. If a consumer is waiting around for more work, the producer should give them more work to keep consumer busy.

If the consumer is not ready, the event stream will buffer to a higher threshold, preventing a flush from blocking rangefeeds, inducing catchup scans.

Epic: none

Release note: none

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Oct 30, 2024
2 parents ef26d15 + 7552cec commit 548e585
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ var batchSizeSetting = settings.RegisterByteSizeSetting(
settings.ApplicationLevel,
"logical_replication.stream_batch_size",
"target batch size for logical replication stream",
1<<20,
16<<20,
)

func newLogicalReplicationWriterProcessor(
Expand Down
15 changes: 14 additions & 1 deletion pkg/ccl/crosscluster/producer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package producer
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster"
Expand Down Expand Up @@ -68,6 +69,8 @@ type eventStream struct {
lastCheckpointLen int

debug streampb.DebugProducerStatus

consumerReady atomic.Bool
}

var quantize = settings.RegisterDurationSettingWithExplicitUnit(
Expand Down Expand Up @@ -233,6 +236,9 @@ func (s *eventStream) Next(ctx context.Context) (bool, error) {
s.debug.Flushes.EmitWaitNanos.Add(emitWait)
s.debug.LastPolledMicros.Store(timeutil.Now().UnixMicro())

s.consumerReady.Store(true)
defer s.consumerReady.Store(false)

select {
case <-ctx.Done():
return false, ctx.Err()
Expand Down Expand Up @@ -388,7 +394,13 @@ func (s *eventStream) sendCheckpoint(ctx context.Context, frontier rangefeed.Vis
}

func (s *eventStream) maybeFlushBatch(ctx context.Context) error {
if s.seb.size > int(s.spec.Config.BatchByteSize) {
// If the consumer is ready to ingest, flush at a lower threshold. This
// ensures the consumer always has work to do.
//
// If the consumer is not ready, the larger batch delays the flush call and
// preventing the slow consumer from blocking rangefeed progress, avoiding
// catchup scans.
if (s.seb.size > int(s.spec.Config.BatchByteSize)) || s.consumerReady.Load() && s.seb.size > minBatchByteSize {
return s.flushBatch(ctx)
}
return nil
Expand Down Expand Up @@ -497,6 +509,7 @@ func (s *eventStream) validateProducerJobAndSpec(ctx context.Context) (roachpb.T
}

const defaultBatchSize = 1 << 20
const minBatchByteSize = 1 << 20

func streamPartition(
evalCtx *eval.Context, streamID streampb.StreamID, opaqueSpec []byte,
Expand Down

0 comments on commit 548e585

Please sign in to comment.