diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index b319585a02da..b78ebb5b272f 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -17,7 +17,7 @@ changefeed.default_range_distribution_strategy enumeration default configures ho
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer application
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
-changefeed.frontier_highwater_lag_checkpoint_threshold duration 10m0s controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled application
+changefeed.span_checkpoint.lead_threshold (alias: changefeed.frontier_highwater_lag_checkpoint_threshold) duration 10m0s the amount of time a changefeed's leading spans must be ahead of its slowest spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to leading spans is disabled application
changefeed.memory.per_changefeed_limit byte size 512 MiB controls amount of data that can be buffered per changefeed application
changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance) duration 0s minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress application
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 4c11feeb0ea4..08cbcb0e22e8 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -22,7 +22,7 @@
changefeed.event_consumer_worker_queue_size
| integer | 16 | if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer | Serverless/Dedicated/Self-Hosted |
changefeed.event_consumer_workers
| integer | 0 | the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled | Serverless/Dedicated/Self-Hosted |
changefeed.fast_gzip.enabled
| boolean | true | use fast gzip implementation | Serverless/Dedicated/Self-Hosted |
-changefeed.frontier_highwater_lag_checkpoint_threshold
| duration | 10m0s | controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled | Serverless/Dedicated/Self-Hosted |
+changefeed.span_checkpoint.lead_threshold (alias: changefeed.frontier_highwater_lag_checkpoint_threshold)
| duration | 10m0s | the amount of time a changefeed's leading spans must be ahead of its slowest spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to leading spans is disabled | Serverless/Dedicated/Self-Hosted |
changefeed.memory.per_changefeed_limit
| byte size | 512 MiB | controls amount of data that can be buffered per changefeed | Serverless/Dedicated/Self-Hosted |
changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance)
| duration | 0s | minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress | Serverless/Dedicated/Self-Hosted |
changefeed.node_throttle_config
| string |
| specifies node level throttling configuration for all changefeeeds | Serverless/Dedicated/Self-Hosted |
diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go
index 94630f8fb3ce..8fbece827a67 100644
--- a/pkg/ccl/changefeedccl/changefeed_processors.go
+++ b/pkg/ccl/changefeedccl/changefeed_processors.go
@@ -884,9 +884,10 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu
// At a lower frequency, we checkpoint specific spans in the job progress
// either in backfills or if the highwater mark is excessively lagging behind.
+ inBackfill := ca.frontier.InBackfill(resolved)
+ hasLeadingSpans, _ := ca.frontier.HasLeadingSpans(sv)
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
- (ca.frontier.InBackfill(resolved) || ca.frontier.HasLaggingSpans(sv)) &&
- canCheckpointSpans(sv, ca.lastSpanFlush)
+ (inBackfill || hasLeadingSpans) && canCheckpointSpans(sv, ca.lastSpanFlush)
if checkpointSpans {
defer func() {
@@ -1670,13 +1671,19 @@ func (cf *changeFrontier) maybeCheckpointJob(
// highwater mark remains fixed while other spans may significantly outpace
// it, therefore to avoid losing that progress on changefeed resumption we
// also store as many of those leading spans as we can in the job progress
- updateCheckpoint := (inBackfill || cf.frontier.HasLaggingSpans(&cf.js.settings.SV)) && cf.js.canCheckpointSpans()
+ hasLeadingSpans, leadTS := cf.frontier.HasLeadingSpans(&cf.js.settings.SV)
+ updateCheckpoint := (inBackfill || hasLeadingSpans) && cf.js.canCheckpointSpans()
// If the highwater has moved an empty checkpoint will be saved
var checkpoint jobspb.ChangefeedProgress_Checkpoint
if updateCheckpoint {
maxBytes := changefeedbase.SpanCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV)
- checkpoint = cf.frontier.MakeCheckpoint(maxBytes)
+ frontier := cf.frontier.Frontier()
+ if hasLeadingSpans {
+ checkpoint = cf.frontier.MakeCheckpoint(leadTS, maxBytes)
+ } else {
+ checkpoint = cf.frontier.MakeCheckpoint(frontier, maxBytes)
+ }
}
if updateCheckpoint || updateHighWater {
diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go
index 94222df075aa..924ab49da89e 100644
--- a/pkg/ccl/changefeedccl/changefeed_test.go
+++ b/pkg/ccl/changefeedccl/changefeed_test.go
@@ -2340,7 +2340,7 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {
context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond)
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.ClusterSettings().SV, 100<<20)
- changefeedbase.FrontierHighwaterLagCheckpointThreshold.Override(
+ changefeedbase.SpanCheckpointLeadThreshold.Override(
context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond)
// We'll start changefeed with the cursor.
diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go
index 6fd5d26e101e..f3c929e44c9d 100644
--- a/pkg/ccl/changefeedccl/changefeedbase/settings.go
+++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go
@@ -79,16 +79,20 @@ var SpanCheckpointInterval = settings.RegisterDurationSetting(
settings.WithName("changefeed.span_checkpoint.interval"),
)
-// FrontierHighwaterLagCheckpointThreshold controls the amount the high-water
-// mark is allowed to lag behind the leading edge of the frontier before we
-// begin to attempt checkpointing spans above the high-water mark
-var FrontierHighwaterLagCheckpointThreshold = settings.RegisterDurationSetting(
+// SpanCheckpointLeadThreshold controls the amount of time a changefeed's
+// leading spans must be ahead of its slowest spans before a span-level
+// checkpoint is written.
+var SpanCheckpointLeadThreshold = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.frontier_highwater_lag_checkpoint_threshold",
- "controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled",
+ "the amount of time a changefeed's leading spans must be ahead of its "+
+ "slowest spans before a span-level checkpoint to save leading span progress "+
+ "is written; if 0, span-level checkpoints due to leading spans is disabled",
10*time.Minute,
settings.NonNegativeDuration,
- settings.WithPublic)
+ settings.WithPublic,
+ settings.WithName("changefeed.span_checkpoint.lead_threshold"),
+)
// SpanCheckpointMaxBytes controls the maximum number of key bytes that will be added
// to a span-level checkpoint record.
diff --git a/pkg/ccl/changefeedccl/checkpoint/checkpoint.go b/pkg/ccl/changefeedccl/checkpoint/checkpoint.go
index 6230a9455f8e..ba12d518a408 100644
--- a/pkg/ccl/changefeedccl/checkpoint/checkpoint.go
+++ b/pkg/ccl/changefeedccl/checkpoint/checkpoint.go
@@ -18,27 +18,30 @@ import (
type SpanIter func(forEachSpan span.Operation)
// Make creates a checkpoint with as many spans that should be checkpointed (are
-// above the highwater mark) as can fit in maxBytes, along with the earliest
-// timestamp of the checkpointed spans. A SpanGroup is used to merge adjacent
-// spans above the high-water mark.
+// above the provided baseline timestamp) as can fit in maxBytes, along with the
+// earliest timestamp of the checkpointed spans. A SpanGroup is used to merge
+// adjacent spans to minimize the size of the checkpoint.
func Make(
- frontier hlc.Timestamp, forEachSpan SpanIter, maxBytes int64,
+ baselineTS hlc.Timestamp, forEachSpan SpanIter, maxBytes int64,
) jobspb.ChangefeedProgress_Checkpoint {
// Collect leading spans into a SpanGroup to merge adjacent spans and store
- // the lowest timestamp found
+ // the lowest timestamp found.
var checkpointSpanGroup roachpb.SpanGroup
- checkpointFrontier := hlc.Timestamp{}
+ checkpointTS := hlc.MaxTimestamp
forEachSpan(func(s roachpb.Span, ts hlc.Timestamp) span.OpResult {
- if frontier.Less(ts) {
+ if baselineTS.Less(ts) {
checkpointSpanGroup.Add(s)
- if checkpointFrontier.IsEmpty() || ts.Less(checkpointFrontier) {
- checkpointFrontier = ts
+ if ts.Less(checkpointTS) {
+ checkpointTS = ts
}
}
return span.ContinueMatch
})
+ if checkpointSpanGroup.Len() == 0 {
+ return jobspb.ChangefeedProgress_Checkpoint{}
+ }
- // Ensure we only return up to maxBytes spans
+ // Ensure we only return up to maxBytes spans.
var checkpointSpans []roachpb.Span
var used int64
for _, span := range checkpointSpanGroup.Slice() {
@@ -51,6 +54,6 @@ func Make(
return jobspb.ChangefeedProgress_Checkpoint{
Spans: checkpointSpans,
- Timestamp: checkpointFrontier,
+ Timestamp: checkpointTS,
}
}
diff --git a/pkg/ccl/changefeedccl/checkpoint/checkpoint_test.go b/pkg/ccl/changefeedccl/checkpoint/checkpoint_test.go
index 585da42e1d38..612aa56fc341 100644
--- a/pkg/ccl/changefeedccl/checkpoint/checkpoint_test.go
+++ b/pkg/ccl/changefeedccl/checkpoint/checkpoint_test.go
@@ -52,10 +52,11 @@ func TestCheckpointMake(t *testing.T) {
}
for name, tc := range map[string]struct {
- frontier hlc.Timestamp
- spans checkpointSpans
- maxBytes int64
- expected jobspb.ChangefeedProgress_Checkpoint
+ frontier hlc.Timestamp
+ spans checkpointSpans
+ maxBytes int64
+ leadThreshold int64
+ expected jobspb.ChangefeedProgress_Checkpoint
}{
"all spans ahead of frontier checkpointed": {
frontier: ts(1),
@@ -126,6 +127,51 @@ func TestCheckpointMake(t *testing.T) {
Spans: []roachpb.Span{{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}},
},
},
+ "all spans ahead of frontier checkpointed due to lead threshold": {
+ frontier: ts(1),
+ spans: checkpointSpans{
+ {span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, ts: ts(1)},
+ {span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, ts: ts(3)},
+ {span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, ts: ts(1)},
+ {span: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}, ts: ts(4)},
+ },
+ maxBytes: 100,
+ leadThreshold: 1,
+ expected: jobspb.ChangefeedProgress_Checkpoint{
+ Timestamp: ts(3),
+ Spans: []roachpb.Span{
+ {Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
+ {Key: roachpb.Key("d"), EndKey: roachpb.Key("e")},
+ },
+ },
+ },
+ "only some spans ahead of frontier checkpointed due to lead threshold": {
+ frontier: ts(1),
+ spans: checkpointSpans{
+ {span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, ts: ts(1)},
+ {span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, ts: ts(3)},
+ {span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, ts: ts(1)},
+ {span: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}, ts: ts(4)},
+ },
+ maxBytes: 100,
+ leadThreshold: 2,
+ expected: jobspb.ChangefeedProgress_Checkpoint{
+ Timestamp: ts(4),
+ Spans: []roachpb.Span{{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}},
+ },
+ },
+ "no spans ahead of frontier checkpointed due to lead threshold": {
+ frontier: ts(1),
+ spans: checkpointSpans{
+ {span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, ts: ts(1)},
+ {span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, ts: ts(3)},
+ {span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, ts: ts(1)},
+ {span: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}, ts: ts(4)},
+ },
+ maxBytes: 100,
+ leadThreshold: 3,
+ expected: jobspb.ChangefeedProgress_Checkpoint{},
+ },
} {
t.Run(name, func(t *testing.T) {
forEachSpan := func(fn span.Operation) {
@@ -134,7 +180,8 @@ func TestCheckpointMake(t *testing.T) {
}
}
- actual := checkpoint.Make(tc.frontier, forEachSpan, tc.maxBytes)
+ baselineTS := tc.frontier.Add(tc.leadThreshold, 0)
+ actual := checkpoint.Make(baselineTS, forEachSpan, tc.maxBytes)
require.Equal(t, tc.expected, actual)
})
}
diff --git a/pkg/ccl/changefeedccl/resolvedspan/frontier.go b/pkg/ccl/changefeedccl/resolvedspan/frontier.go
index 93e348e04258..2f657e28c80f 100644
--- a/pkg/ccl/changefeedccl/resolvedspan/frontier.go
+++ b/pkg/ccl/changefeedccl/resolvedspan/frontier.go
@@ -170,7 +170,9 @@ func (f *CoordinatorFrontier) InBackfill(r jobspb.ResolvedSpan) bool {
}
// MakeCheckpoint creates a checkpoint based on the current state of the frontier.
-func (f *CoordinatorFrontier) MakeCheckpoint(maxBytes int64) jobspb.ChangefeedProgress_Checkpoint {
+func (f *CoordinatorFrontier) MakeCheckpoint(
+ baselineTS hlc.Timestamp, maxBytes int64,
+) jobspb.ChangefeedProgress_Checkpoint {
return checkpoint.Make(f.Frontier(), f.Entries, maxBytes)
}
@@ -293,19 +295,21 @@ func (f *resolvedSpanFrontier) assertBoundaryNotEarlier(
return nil
}
-// HasLaggingSpans returns whether the frontier has lagging spans as defined
+// HasLeadingSpans returns whether the frontier has lagging spans as defined
// by whether the frontier trails the latest timestamp by at least
-// changefeedbase.FrontierHighwaterLagCheckpointThreshold.
-func (f *resolvedSpanFrontier) HasLaggingSpans(sv *settings.Values) bool {
- lagThresholdNanos := int64(changefeedbase.FrontierHighwaterLagCheckpointThreshold.Get(sv))
+// changefeedbase.SpanCheckpointLeadThreshold. If there are, it returns the
+// lead timestamp (frontier + lead threshold).
+func (f *resolvedSpanFrontier) HasLeadingSpans(sv *settings.Values) (bool, hlc.Timestamp) {
+ lagThresholdNanos := int64(changefeedbase.SpanCheckpointLeadThreshold.Get(sv))
if lagThresholdNanos == 0 {
- return false
+ return false, hlc.Timestamp{}
}
frontier := f.Frontier()
if frontier.IsEmpty() {
frontier = f.statementTime
}
- return frontier.Add(lagThresholdNanos, 0).Less(f.latestTS)
+ leadTS := frontier.Add(lagThresholdNanos, 0)
+ return leadTS.Less(f.latestTS), leadTS
}
// resolvedSpanBoundary encapsulates a resolved span boundary, which is