Skip to content

Commit

Permalink
changefeedccl: only checkpoint leading spans above lead threshold
Browse files Browse the repository at this point in the history
This patch updates the span-level checkpointing code to only checkpoint
spans that are above the lead threshold (formerly lag threshold) so that
we can at least maintain some minimal level of progress for spans that
we go through the effort of checkpointing.

Longer explanation: We create span-level checkpoints (at the configured
interval) if we are either in a backfill or there's at least one span
that is more than the lag/lead threshold ahead of the frontier min ts.
Before this change, in the latter case, we would checkpoint all spans
that are above the frontier min ts regardless of whether they are at
least the lag/lead threshold ahead. This change makes it so that when
we are creating a span-level checkpoint due to the lag/lead threshold,
we will only include spans that are ahead of the frontier min ts by at
least the lag/lead threshold so that we can maintain some minimal level
of progress for the spans that we checkpoint.

Release note: None
  • Loading branch information
andyyang890 committed Jan 14, 2025
1 parent a7809b1 commit bdd0cd4
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 26 deletions.
15 changes: 11 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,9 +884,10 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu

// At a lower frequency, we checkpoint specific spans in the job progress
// either in backfills or if the highwater mark is excessively lagging behind.
inBackfill := ca.frontier.InBackfill(resolved)
hasLeadingSpans, _ := ca.frontier.HasLeadingSpans(sv)
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
(ca.frontier.InBackfill(resolved) || ca.frontier.HasLeadingSpans(sv)) &&
canCheckpointSpans(sv, ca.lastSpanFlush)
(inBackfill || hasLeadingSpans) && canCheckpointSpans(sv, ca.lastSpanFlush)

if checkpointSpans {
defer func() {
Expand Down Expand Up @@ -1670,13 +1671,19 @@ func (cf *changeFrontier) maybeCheckpointJob(
// highwater mark remains fixed while other spans may significantly outpace
// it, therefore to avoid losing that progress on changefeed resumption we
// also store as many of those leading spans as we can in the job progress
updateCheckpoint := (inBackfill || cf.frontier.HasLeadingSpans(&cf.js.settings.SV)) && cf.js.canCheckpointSpans()
hasLeadingSpans, leadTS := cf.frontier.HasLeadingSpans(&cf.js.settings.SV)
updateCheckpoint := (inBackfill || hasLeadingSpans) && cf.js.canCheckpointSpans()

// If the highwater has moved an empty checkpoint will be saved
var checkpoint jobspb.ChangefeedProgress_Checkpoint
if updateCheckpoint {
maxBytes := changefeedbase.SpanCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV)
checkpoint = cf.frontier.MakeCheckpoint(maxBytes)
frontier := cf.frontier.Frontier()
if hasLeadingSpans {
checkpoint = cf.frontier.MakeCheckpoint(leadTS, maxBytes)
} else {
checkpoint = cf.frontier.MakeCheckpoint(frontier, maxBytes)
}
}

if updateCheckpoint || updateHighWater {
Expand Down
25 changes: 14 additions & 11 deletions pkg/ccl/changefeedccl/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,30 @@ import (
type SpanIter func(forEachSpan span.Operation)

// Make creates a checkpoint with as many spans that should be checkpointed (are
// above the highwater mark) as can fit in maxBytes, along with the earliest
// timestamp of the checkpointed spans. A SpanGroup is used to merge adjacent
// spans above the high-water mark.
// above the provided baseline timestamp) as can fit in maxBytes, along with the
// earliest timestamp of the checkpointed spans. A SpanGroup is used to merge
// adjacent spans to minimize the size of the checkpoint.
func Make(
frontier hlc.Timestamp, forEachSpan SpanIter, maxBytes int64,
baselineTS hlc.Timestamp, forEachSpan SpanIter, maxBytes int64,
) jobspb.ChangefeedProgress_Checkpoint {
// Collect leading spans into a SpanGroup to merge adjacent spans and store
// the lowest timestamp found
// the lowest timestamp found.
var checkpointSpanGroup roachpb.SpanGroup
checkpointFrontier := hlc.Timestamp{}
checkpointTS := hlc.MaxTimestamp
forEachSpan(func(s roachpb.Span, ts hlc.Timestamp) span.OpResult {
if frontier.Less(ts) {
if baselineTS.Less(ts) {
checkpointSpanGroup.Add(s)
if checkpointFrontier.IsEmpty() || ts.Less(checkpointFrontier) {
checkpointFrontier = ts
if ts.Less(checkpointTS) {
checkpointTS = ts
}
}
return span.ContinueMatch
})
if checkpointSpanGroup.Len() == 0 {
return jobspb.ChangefeedProgress_Checkpoint{}
}

// Ensure we only return up to maxBytes spans
// Ensure we only return up to maxBytes spans.
var checkpointSpans []roachpb.Span
var used int64
for _, span := range checkpointSpanGroup.Slice() {
Expand All @@ -51,6 +54,6 @@ func Make(

return jobspb.ChangefeedProgress_Checkpoint{
Spans: checkpointSpans,
Timestamp: checkpointFrontier,
Timestamp: checkpointTS,
}
}
57 changes: 52 additions & 5 deletions pkg/ccl/changefeedccl/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ func TestCheckpointMake(t *testing.T) {
}

for name, tc := range map[string]struct {
frontier hlc.Timestamp
spans checkpointSpans
maxBytes int64
expected jobspb.ChangefeedProgress_Checkpoint
frontier hlc.Timestamp
spans checkpointSpans
maxBytes int64
leadThreshold int64
expected jobspb.ChangefeedProgress_Checkpoint
}{
"all spans ahead of frontier checkpointed": {
frontier: ts(1),
Expand Down Expand Up @@ -126,6 +127,51 @@ func TestCheckpointMake(t *testing.T) {
Spans: []roachpb.Span{{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}},
},
},
"all spans ahead of frontier checkpointed due to lead threshold": {
frontier: ts(1),
spans: checkpointSpans{
{span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, ts: ts(1)},
{span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, ts: ts(3)},
{span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, ts: ts(1)},
{span: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}, ts: ts(4)},
},
maxBytes: 100,
leadThreshold: 1,
expected: jobspb.ChangefeedProgress_Checkpoint{
Timestamp: ts(3),
Spans: []roachpb.Span{
{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")},
},
},
},
"only some spans ahead of frontier checkpointed due to lead threshold": {
frontier: ts(1),
spans: checkpointSpans{
{span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, ts: ts(1)},
{span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, ts: ts(3)},
{span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, ts: ts(1)},
{span: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}, ts: ts(4)},
},
maxBytes: 100,
leadThreshold: 2,
expected: jobspb.ChangefeedProgress_Checkpoint{
Timestamp: ts(4),
Spans: []roachpb.Span{{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}},
},
},
"no spans ahead of frontier checkpointed due to lead threshold": {
frontier: ts(1),
spans: checkpointSpans{
{span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, ts: ts(1)},
{span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, ts: ts(3)},
{span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, ts: ts(1)},
{span: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}, ts: ts(4)},
},
maxBytes: 100,
leadThreshold: 3,
expected: jobspb.ChangefeedProgress_Checkpoint{},
},
} {
t.Run(name, func(t *testing.T) {
forEachSpan := func(fn span.Operation) {
Expand All @@ -134,7 +180,8 @@ func TestCheckpointMake(t *testing.T) {
}
}

actual := checkpoint.Make(tc.frontier, forEachSpan, tc.maxBytes)
baselineTS := tc.frontier.Add(tc.leadThreshold, 0)
actual := checkpoint.Make(baselineTS, forEachSpan, tc.maxBytes)
require.Equal(t, tc.expected, actual)
})
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/ccl/changefeedccl/resolvedspan/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,10 @@ func (f *CoordinatorFrontier) InBackfill(r jobspb.ResolvedSpan) bool {
}

// MakeCheckpoint creates a checkpoint based on the current state of the frontier.
func (f *CoordinatorFrontier) MakeCheckpoint(maxBytes int64) jobspb.ChangefeedProgress_Checkpoint {
return checkpoint.Make(f.Frontier(), f.Entries, maxBytes)
func (f *CoordinatorFrontier) MakeCheckpoint(
baselineTS hlc.Timestamp, maxBytes int64,
) jobspb.ChangefeedProgress_Checkpoint {
return checkpoint.Make(baselineTS, f.Entries, maxBytes)
}

// spanFrontier is a type alias to make it possible to embed and forward calls
Expand Down Expand Up @@ -295,17 +297,19 @@ func (f *resolvedSpanFrontier) assertBoundaryNotEarlier(

// HasLeadingSpans returns whether the frontier has lagging spans as defined
// by whether the frontier trails the latest timestamp by at least
// changefeedbase.SpanCheckpointLeadThreshold.
func (f *resolvedSpanFrontier) HasLeadingSpans(sv *settings.Values) bool {
// changefeedbase.SpanCheckpointLeadThreshold. If there are, it returns the
// lead timestamp (frontier + lead threshold).
func (f *resolvedSpanFrontier) HasLeadingSpans(sv *settings.Values) (bool, hlc.Timestamp) {
lagThresholdNanos := int64(changefeedbase.SpanCheckpointLeadThreshold.Get(sv))
if lagThresholdNanos == 0 {
return false
return false, hlc.Timestamp{}
}
frontier := f.Frontier()
if frontier.IsEmpty() {
frontier = f.statementTime
}
return frontier.Add(lagThresholdNanos, 0).Less(f.latestTS)
leadTS := frontier.Add(lagThresholdNanos, 0)
return leadTS.Less(f.latestTS), leadTS
}

// resolvedSpanBoundary encapsulates a resolved span boundary, which is
Expand Down

0 comments on commit bdd0cd4

Please sign in to comment.