Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: only checkpoint leading spans above lead threshold #138790

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ changefeed.default_range_distribution_strategy enumeration default configures ho
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer application
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
changefeed.frontier_highwater_lag_checkpoint_threshold duration 10m0s controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled application
changefeed.span_checkpoint.lead_threshold (alias: changefeed.frontier_highwater_lag_checkpoint_threshold) duration 10m0s the amount of time a changefeed's leading spans must be ahead of its slowest spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to leading spans is disabled application
changefeed.memory.per_changefeed_limit byte size 512 MiB controls amount of data that can be buffered per changefeed application
changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance) duration 0s minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress application
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<tr><td><div id="setting-changefeed-event-consumer-worker-queue-size" class="anchored"><code>changefeed.event_consumer_worker_queue_size</code></div></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-frontier-highwater-lag-checkpoint-threshold" class="anchored"><code>changefeed.frontier_highwater_lag_checkpoint_threshold</code></div></td><td>duration</td><td><code>10m0s</code></td><td>controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-frontier-highwater-lag-checkpoint-threshold" class="anchored"><code>changefeed.span_checkpoint.lead_threshold<br />(alias: changefeed.frontier_highwater_lag_checkpoint_threshold)</code></div></td><td>duration</td><td><code>10m0s</code></td><td>the amount of time a changefeed&#39;s leading spans must be ahead of its slowest spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to leading spans is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-memory-per-changefeed-limit" class="anchored"><code>changefeed.memory.per_changefeed_limit</code></div></td><td>byte size</td><td><code>512 MiB</code></td><td>controls amount of data that can be buffered per changefeed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-min-highwater-advance" class="anchored"><code>changefeed.resolved_timestamp.min_update_interval<br />(alias: changefeed.min_highwater_advance)</code></div></td><td>duration</td><td><code>0s</code></td><td>minimum amount of time that must have elapsed since the last time a changefeed&#39;s resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,11 +1219,11 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {

var maxCheckpointSize int64 = 100 << 20
// Ensure that checkpoints happen every time by setting a large checkpoint size.
// Because setting 0 for the FrontierCheckpointFrequency disables checkpointing,
// Because setting 0 for the SpanCheckpointInterval disables checkpointing,
// setting 1 nanosecond is the smallest possible value.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1*time.Nanosecond)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)

// Note the tableSpan to avoid resolved events that leave no gaps
Expand Down Expand Up @@ -1385,9 +1385,9 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
}

// Checkpoint progress frequently, and set the checkpoint size limit.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)

registry := s.Server.JobRegistry().(*jobs.Registry)
Expand Down
27 changes: 17 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,9 +884,10 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu

// At a lower frequency, we checkpoint specific spans in the job progress
// either in backfills or if the highwater mark is excessively lagging behind.
inBackfill := ca.frontier.InBackfill(resolved)
hasLeadingSpans, _ := ca.frontier.HasLeadingSpans(sv)
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
(ca.frontier.InBackfill(resolved) || ca.frontier.HasLaggingSpans(sv)) &&
canCheckpointSpans(sv, ca.lastSpanFlush)
(inBackfill || hasLeadingSpans) && canCheckpointSpans(sv, ca.lastSpanFlush)

if checkpointSpans {
defer func() {
Expand Down Expand Up @@ -1094,11 +1095,11 @@ func newJobState(
}

func canCheckpointSpans(sv *settings.Values, lastCheckpoint time.Time) bool {
freq := changefeedbase.FrontierCheckpointFrequency.Get(sv)
if freq == 0 {
interval := changefeedbase.SpanCheckpointInterval.Get(sv)
if interval == 0 {
return false
}
return timeutil.Since(lastCheckpoint) > freq
return timeutil.Since(lastCheckpoint) > interval
}

func (j *jobState) canCheckpointSpans() bool {
Expand Down Expand Up @@ -1311,9 +1312,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {
return
}
cf.js.job = job
if changefeedbase.FrontierCheckpointFrequency.Get(&cf.FlowCtx.Cfg.Settings.SV) == 0 {
if changefeedbase.SpanCheckpointInterval.Get(&cf.FlowCtx.Cfg.Settings.SV) == 0 {
log.Warning(ctx,
"Frontier checkpointing disabled; set changefeed.frontier_checkpoint_frequency to non-zero value to re-enable")
"span-level checkpointing disabled; set changefeed.span_checkpoint.interval to positive duration to re-enable")
}

// Recover highwater information from job progress.
Expand Down Expand Up @@ -1670,13 +1671,19 @@ func (cf *changeFrontier) maybeCheckpointJob(
// highwater mark remains fixed while other spans may significantly outpace
// it, therefore to avoid losing that progress on changefeed resumption we
// also store as many of those leading spans as we can in the job progress
updateCheckpoint := (inBackfill || cf.frontier.HasLaggingSpans(&cf.js.settings.SV)) && cf.js.canCheckpointSpans()
hasLeadingSpans, leadTS := cf.frontier.HasLeadingSpans(&cf.js.settings.SV)
updateCheckpoint := (inBackfill || hasLeadingSpans) && cf.js.canCheckpointSpans()

// If the highwater has moved an empty checkpoint will be saved
var checkpoint jobspb.ChangefeedProgress_Checkpoint
if updateCheckpoint {
maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV)
checkpoint = cf.frontier.MakeCheckpoint(maxBytes)
maxBytes := changefeedbase.SpanCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV)
frontier := cf.frontier.Frontier()
if hasLeadingSpans {
checkpoint = cf.frontier.MakeCheckpoint(leadTS, maxBytes)
} else {
checkpoint = cf.frontier.MakeCheckpoint(frontier, maxBytes)
}
}

if updateCheckpoint || updateHighWater {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,7 @@ func reconcileJobStateWithLocalState(
}
}

maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Get(&execCfg.Settings.SV)
maxBytes := changefeedbase.SpanCheckpointMaxBytes.Get(&execCfg.Settings.SV)
checkpoint := checkpoint.Make(sf.Frontier(), func(forEachSpan span.Operation) {
for _, fs := range localState.aggregatorFrontier {
forEachSpan(fs.Span, fs.Timestamp)
Expand Down
20 changes: 10 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2336,11 +2336,11 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {

// Checkpoint progress frequently, allow a large enough checkpoint, and
// reduce the lag threshold to allow lag checkpointing to trigger
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.ClusterSettings().SV, 100<<20)
changefeedbase.FrontierHighwaterLagCheckpointThreshold.Override(
changefeedbase.SpanCheckpointLeadThreshold.Override(
context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond)

// We'll start changefeed with the cursor.
Expand Down Expand Up @@ -2522,9 +2522,9 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
require.NoError(t, jobFeed.Pause())

// Checkpoint progress frequently, and set the checkpoint size limit.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)

var tableSpan roachpb.Span
Expand Down Expand Up @@ -7341,9 +7341,9 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
}

// Checkpoint progress frequently, and set the checkpoint size limit.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)

registry := s.Server.JobRegistry().(*jobs.Registry)
Expand Down Expand Up @@ -7492,9 +7492,9 @@ func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) {
b.Header.MaxSpanRequestKeys = 1 + rnd.Int63n(25)
return nil
}
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, 100<<20)

emittedCount := 0
Expand Down Expand Up @@ -9936,7 +9936,7 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
// Checkpoint and trigger potential protected timestamp updates frequently.
// Make the protected timestamp lag long enough that it shouldn't be
// immediately updated after a restart.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
changefeedbase.ProtectTimestampInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
Expand Down
36 changes: 22 additions & 14 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,35 @@ var IdleTimeout = settings.RegisterDurationSetting(
settings.WithName("changefeed.auto_idle.timeout"),
)

// FrontierCheckpointFrequency controls the frequency of frontier checkpoints.
var FrontierCheckpointFrequency = settings.RegisterDurationSetting(
// SpanCheckpointInterval controls how often span-level checkpoints
// can be written.
var SpanCheckpointInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.frontier_checkpoint_frequency",
"controls the frequency with which span level checkpoints will be written; if 0, disabled",
"interval at which span-level checkpoints will be written; "+
"if 0, span-level checkpoints are disabled",
10*time.Minute,
settings.NonNegativeDuration,
settings.WithName("changefeed.span_checkpoint.interval"),
)

// FrontierHighwaterLagCheckpointThreshold controls the amount the high-water
// mark is allowed to lag behind the leading edge of the frontier before we
// begin to attempt checkpointing spans above the high-water mark
var FrontierHighwaterLagCheckpointThreshold = settings.RegisterDurationSetting(
// SpanCheckpointLeadThreshold controls the amount of time a changefeed's
// leading spans must be ahead of its slowest spans before a span-level
// checkpoint is written.
var SpanCheckpointLeadThreshold = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.frontier_highwater_lag_checkpoint_threshold",
"controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled",
"the amount of time a changefeed's leading spans must be ahead of its "+
"slowest spans before a span-level checkpoint to save leading span progress "+
"is written; if 0, span-level checkpoints due to leading spans is disabled",
10*time.Minute,
settings.NonNegativeDuration,
settings.WithPublic)
settings.WithPublic,
settings.WithName("changefeed.span_checkpoint.lead_threshold"),
)

// FrontierCheckpointMaxBytes controls the maximum number of key bytes that will be added
// to the checkpoint record.
// SpanCheckpointMaxBytes controls the maximum number of key bytes that will be added
// to a span-level checkpoint record.
// Checkpoint record could be fairly large.
// Assume we have a 10T table, and a 1/2G max range size: 20K spans.
// Span frontier merges adjacent spans, so worst case we have 10K spans.
Expand All @@ -100,12 +107,13 @@ var FrontierHighwaterLagCheckpointThreshold = settings.RegisterDurationSetting(
// - Assume we want to have at most 150MB worth of checkpoints in the job record.
//
// Therefore, we should write at most 6 MB of checkpoint/hour; OR, based on the default
// FrontierCheckpointFrequency setting, 1 MB per checkpoint.
var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting(
// SpanCheckpointInterval setting, 1 MB per checkpoint.
var SpanCheckpointMaxBytes = settings.RegisterByteSizeSetting(
settings.ApplicationLevel,
"changefeed.frontier_checkpoint_max_bytes",
"controls the maximum size of the checkpoint as a total size of key bytes",
"the maximum size of a changefeed span-level checkpoint as measured by the total size of key bytes",
1<<20, // 1 MiB
settings.WithName("changefeed.span_checkpoint.max_bytes"),
)

// ScanRequestLimit is the number of Scan requests that can run at once.
Expand Down
25 changes: 14 additions & 11 deletions pkg/ccl/changefeedccl/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,30 @@ import (
type SpanIter func(forEachSpan span.Operation)

// Make creates a checkpoint with as many spans that should be checkpointed (are
// above the highwater mark) as can fit in maxBytes, along with the earliest
// timestamp of the checkpointed spans. A SpanGroup is used to merge adjacent
// spans above the high-water mark.
// above the provided baseline timestamp) as can fit in maxBytes, along with the
// earliest timestamp of the checkpointed spans. A SpanGroup is used to merge
// adjacent spans to minimize the size of the checkpoint.
func Make(
frontier hlc.Timestamp, forEachSpan SpanIter, maxBytes int64,
baselineTS hlc.Timestamp, forEachSpan SpanIter, maxBytes int64,
) jobspb.ChangefeedProgress_Checkpoint {
// Collect leading spans into a SpanGroup to merge adjacent spans and store
// the lowest timestamp found
// the lowest timestamp found.
var checkpointSpanGroup roachpb.SpanGroup
checkpointFrontier := hlc.Timestamp{}
checkpointTS := hlc.MaxTimestamp
forEachSpan(func(s roachpb.Span, ts hlc.Timestamp) span.OpResult {
if frontier.Less(ts) {
if baselineTS.Less(ts) {
checkpointSpanGroup.Add(s)
if checkpointFrontier.IsEmpty() || ts.Less(checkpointFrontier) {
checkpointFrontier = ts
if ts.Less(checkpointTS) {
checkpointTS = ts
}
}
return span.ContinueMatch
})
if checkpointSpanGroup.Len() == 0 {
return jobspb.ChangefeedProgress_Checkpoint{}
}

// Ensure we only return up to maxBytes spans
// Ensure we only return up to maxBytes spans.
var checkpointSpans []roachpb.Span
var used int64
for _, span := range checkpointSpanGroup.Slice() {
Expand All @@ -51,6 +54,6 @@ func Make(

return jobspb.ChangefeedProgress_Checkpoint{
Spans: checkpointSpans,
Timestamp: checkpointFrontier,
Timestamp: checkpointTS,
}
}
Loading
Loading