Skip to content

Commit

Permalink
changefeedccl: only checkpoint leading spans above lead threshold
Browse files Browse the repository at this point in the history
This patch updates the span-level checkpointing code to only checkpoint
spans that are above the lead threshold (formerly lag threshold) so that
we can at least maintain some minimal level of progress for spans that
we go through the effort of checkpointing.

It also renames `changefeed.frontier_highwater_lag_checkpoint_threshold`
to `changefeed.span_checkpoint.lead_threshold` for consistency.

Release note (ops change): The cluster setting
`changefeed.frontier_highwater_lag_checkpoint_threshold` has been
renamed to `changefeed.span_checkpoint.lead_threshold`. The old name
remains available for backwards-compatibility.
  • Loading branch information
andyyang890 committed Jan 13, 2025
1 parent a5674dd commit d2d3f8f
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 36 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ changefeed.default_range_distribution_strategy enumeration default configures ho
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer application
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
changefeed.frontier_highwater_lag_checkpoint_threshold duration 10m0s controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled application
changefeed.span_checkpoint.lead_threshold (alias: changefeed.frontier_highwater_lag_checkpoint_threshold) duration 10m0s the amount of time a changefeed's leading spans must be ahead of its slowest spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to leading spans is disabled application
changefeed.memory.per_changefeed_limit byte size 512 MiB controls amount of data that can be buffered per changefeed application
changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance) duration 0s minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress application
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<tr><td><div id="setting-changefeed-event-consumer-worker-queue-size" class="anchored"><code>changefeed.event_consumer_worker_queue_size</code></div></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-frontier-highwater-lag-checkpoint-threshold" class="anchored"><code>changefeed.frontier_highwater_lag_checkpoint_threshold</code></div></td><td>duration</td><td><code>10m0s</code></td><td>controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-frontier-highwater-lag-checkpoint-threshold" class="anchored"><code>changefeed.span_checkpoint.lead_threshold<br />(alias: changefeed.frontier_highwater_lag_checkpoint_threshold)</code></div></td><td>duration</td><td><code>10m0s</code></td><td>the amount of time a changefeed&#39;s leading spans must be ahead of its slowest spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to leading spans is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-memory-per-changefeed-limit" class="anchored"><code>changefeed.memory.per_changefeed_limit</code></div></td><td>byte size</td><td><code>512 MiB</code></td><td>controls amount of data that can be buffered per changefeed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-min-highwater-advance" class="anchored"><code>changefeed.resolved_timestamp.min_update_interval<br />(alias: changefeed.min_highwater_advance)</code></div></td><td>duration</td><td><code>0s</code></td><td>minimum amount of time that must have elapsed since the last time a changefeed&#39;s resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
15 changes: 11 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,9 +884,10 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu

// At a lower frequency, we checkpoint specific spans in the job progress
// either in backfills or if the highwater mark is excessively lagging behind.
inBackfill := ca.frontier.InBackfill(resolved)
hasLeadingSpans, _ := ca.frontier.HasLeadingSpans(sv)
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
(ca.frontier.InBackfill(resolved) || ca.frontier.HasLaggingSpans(sv)) &&
canCheckpointSpans(sv, ca.lastSpanFlush)
(inBackfill || hasLeadingSpans) && canCheckpointSpans(sv, ca.lastSpanFlush)

if checkpointSpans {
defer func() {
Expand Down Expand Up @@ -1670,13 +1671,19 @@ func (cf *changeFrontier) maybeCheckpointJob(
// highwater mark remains fixed while other spans may significantly outpace
// it, therefore to avoid losing that progress on changefeed resumption we
// also store as many of those leading spans as we can in the job progress
updateCheckpoint := (inBackfill || cf.frontier.HasLaggingSpans(&cf.js.settings.SV)) && cf.js.canCheckpointSpans()
hasLeadingSpans, leadTS := cf.frontier.HasLeadingSpans(&cf.js.settings.SV)
updateCheckpoint := (inBackfill || hasLeadingSpans) && cf.js.canCheckpointSpans()

// If the highwater has moved an empty checkpoint will be saved
var checkpoint jobspb.ChangefeedProgress_Checkpoint
if updateCheckpoint {
maxBytes := changefeedbase.SpanCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV)
checkpoint = cf.frontier.MakeCheckpoint(maxBytes)
frontier := cf.frontier.Frontier()
if hasLeadingSpans {
checkpoint = cf.frontier.MakeCheckpoint(leadTS, maxBytes)
} else {
checkpoint = cf.frontier.MakeCheckpoint(frontier, maxBytes)
}
}

if updateCheckpoint || updateHighWater {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2340,7 +2340,7 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {
context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond)
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.ClusterSettings().SV, 100<<20)
changefeedbase.FrontierHighwaterLagCheckpointThreshold.Override(
changefeedbase.SpanCheckpointLeadThreshold.Override(
context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond)

// We'll start changefeed with the cursor.
Expand Down
16 changes: 10 additions & 6 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,20 @@ var SpanCheckpointInterval = settings.RegisterDurationSetting(
settings.WithName("changefeed.span_checkpoint.interval"),
)

// FrontierHighwaterLagCheckpointThreshold controls the amount the high-water
// mark is allowed to lag behind the leading edge of the frontier before we
// begin to attempt checkpointing spans above the high-water mark
var FrontierHighwaterLagCheckpointThreshold = settings.RegisterDurationSetting(
// SpanCheckpointLeadThreshold controls the amount of time a changefeed's
// leading spans must be ahead of its slowest spans before a span-level
// checkpoint is written.
var SpanCheckpointLeadThreshold = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.frontier_highwater_lag_checkpoint_threshold",
"controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled",
"the amount of time a changefeed's leading spans must be ahead of its "+
"slowest spans before a span-level checkpoint to save leading span progress "+
"is written; if 0, span-level checkpoints due to leading spans is disabled",
10*time.Minute,
settings.NonNegativeDuration,
settings.WithPublic)
settings.WithPublic,
settings.WithName("changefeed.span_checkpoint.lead_threshold"),
)

// SpanCheckpointMaxBytes controls the maximum number of key bytes that will be added
// to a span-level checkpoint record.
Expand Down
25 changes: 14 additions & 11 deletions pkg/ccl/changefeedccl/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,30 @@ import (
type SpanIter func(forEachSpan span.Operation)

// Make creates a checkpoint with as many spans that should be checkpointed (are
// above the highwater mark) as can fit in maxBytes, along with the earliest
// timestamp of the checkpointed spans. A SpanGroup is used to merge adjacent
// spans above the high-water mark.
// above the provided baseline timestamp) as can fit in maxBytes, along with the
// earliest timestamp of the checkpointed spans. A SpanGroup is used to merge
// adjacent spans to minimize the size of the checkpoint.
func Make(
frontier hlc.Timestamp, forEachSpan SpanIter, maxBytes int64,
baselineTS hlc.Timestamp, forEachSpan SpanIter, maxBytes int64,
) jobspb.ChangefeedProgress_Checkpoint {
// Collect leading spans into a SpanGroup to merge adjacent spans and store
// the lowest timestamp found
// the lowest timestamp found.
var checkpointSpanGroup roachpb.SpanGroup
checkpointFrontier := hlc.Timestamp{}
checkpointTS := hlc.MaxTimestamp
forEachSpan(func(s roachpb.Span, ts hlc.Timestamp) span.OpResult {
if frontier.Less(ts) {
if baselineTS.Less(ts) {
checkpointSpanGroup.Add(s)
if checkpointFrontier.IsEmpty() || ts.Less(checkpointFrontier) {
checkpointFrontier = ts
if ts.Less(checkpointTS) {
checkpointTS = ts
}
}
return span.ContinueMatch
})
if checkpointSpanGroup.Len() == 0 {
return jobspb.ChangefeedProgress_Checkpoint{}
}

// Ensure we only return up to maxBytes spans
// Ensure we only return up to maxBytes spans.
var checkpointSpans []roachpb.Span
var used int64
for _, span := range checkpointSpanGroup.Slice() {
Expand All @@ -51,6 +54,6 @@ func Make(

return jobspb.ChangefeedProgress_Checkpoint{
Spans: checkpointSpans,
Timestamp: checkpointFrontier,
Timestamp: checkpointTS,
}
}
57 changes: 52 additions & 5 deletions pkg/ccl/changefeedccl/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ func TestCheckpointMake(t *testing.T) {
}

for name, tc := range map[string]struct {
frontier hlc.Timestamp
spans checkpointSpans
maxBytes int64
expected jobspb.ChangefeedProgress_Checkpoint
frontier hlc.Timestamp
spans checkpointSpans
maxBytes int64
leadThreshold int64
expected jobspb.ChangefeedProgress_Checkpoint
}{
"all spans ahead of frontier checkpointed": {
frontier: ts(1),
Expand Down Expand Up @@ -126,6 +127,51 @@ func TestCheckpointMake(t *testing.T) {
Spans: []roachpb.Span{{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}},
},
},
"all spans ahead of frontier checkpointed due to lead threshold": {
frontier: ts(1),
spans: checkpointSpans{
{span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, ts: ts(1)},
{span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, ts: ts(3)},
{span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, ts: ts(1)},
{span: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}, ts: ts(4)},
},
maxBytes: 100,
leadThreshold: 1,
expected: jobspb.ChangefeedProgress_Checkpoint{
Timestamp: ts(3),
Spans: []roachpb.Span{
{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")},
},
},
},
"only some spans ahead of frontier checkpointed due to lead threshold": {
frontier: ts(1),
spans: checkpointSpans{
{span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, ts: ts(1)},
{span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, ts: ts(3)},
{span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, ts: ts(1)},
{span: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}, ts: ts(4)},
},
maxBytes: 100,
leadThreshold: 2,
expected: jobspb.ChangefeedProgress_Checkpoint{
Timestamp: ts(4),
Spans: []roachpb.Span{{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}},
},
},
"no spans ahead of frontier checkpointed due to lead threshold": {
frontier: ts(1),
spans: checkpointSpans{
{span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, ts: ts(1)},
{span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, ts: ts(3)},
{span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, ts: ts(1)},
{span: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}, ts: ts(4)},
},
maxBytes: 100,
leadThreshold: 3,
expected: jobspb.ChangefeedProgress_Checkpoint{},
},
} {
t.Run(name, func(t *testing.T) {
forEachSpan := func(fn span.Operation) {
Expand All @@ -134,7 +180,8 @@ func TestCheckpointMake(t *testing.T) {
}
}

actual := checkpoint.Make(tc.frontier, forEachSpan, tc.maxBytes)
baselineTS := tc.frontier.Add(tc.leadThreshold, 0)
actual := checkpoint.Make(baselineTS, forEachSpan, tc.maxBytes)
require.Equal(t, tc.expected, actual)
})
}
Expand Down
18 changes: 11 additions & 7 deletions pkg/ccl/changefeedccl/resolvedspan/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ func (f *CoordinatorFrontier) InBackfill(r jobspb.ResolvedSpan) bool {
}

// MakeCheckpoint creates a checkpoint based on the current state of the frontier.
func (f *CoordinatorFrontier) MakeCheckpoint(maxBytes int64) jobspb.ChangefeedProgress_Checkpoint {
func (f *CoordinatorFrontier) MakeCheckpoint(
baselineTS hlc.Timestamp, maxBytes int64,
) jobspb.ChangefeedProgress_Checkpoint {
return checkpoint.Make(f.Frontier(), f.Entries, maxBytes)
}

Expand Down Expand Up @@ -293,19 +295,21 @@ func (f *resolvedSpanFrontier) assertBoundaryNotEarlier(
return nil
}

// HasLaggingSpans returns whether the frontier has lagging spans as defined
// HasLeadingSpans returns whether the frontier has lagging spans as defined
// by whether the frontier trails the latest timestamp by at least
// changefeedbase.FrontierHighwaterLagCheckpointThreshold.
func (f *resolvedSpanFrontier) HasLaggingSpans(sv *settings.Values) bool {
lagThresholdNanos := int64(changefeedbase.FrontierHighwaterLagCheckpointThreshold.Get(sv))
// changefeedbase.SpanCheckpointLeadThreshold. If there are, it returns the
// lead timestamp (frontier + lead threshold).
func (f *resolvedSpanFrontier) HasLeadingSpans(sv *settings.Values) (bool, hlc.Timestamp) {
lagThresholdNanos := int64(changefeedbase.SpanCheckpointLeadThreshold.Get(sv))
if lagThresholdNanos == 0 {
return false
return false, hlc.Timestamp{}
}
frontier := f.Frontier()
if frontier.IsEmpty() {
frontier = f.statementTime
}
return frontier.Add(lagThresholdNanos, 0).Less(f.latestTS)
leadTS := frontier.Add(lagThresholdNanos, 0)
return leadTS.Less(f.latestTS), leadTS
}

// resolvedSpanBoundary encapsulates a resolved span boundary, which is
Expand Down

0 comments on commit d2d3f8f

Please sign in to comment.