Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: rename span-level checkpoint settings for consistency #139064

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ changefeed.default_range_distribution_strategy enumeration default configures ho
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer application
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
changefeed.frontier_highwater_lag_checkpoint_threshold duration 10m0s controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled application
changefeed.span_checkpoint.lag_threshold (alias: changefeed.frontier_highwater_lag_checkpoint_threshold) duration 10m0s the amount of time a changefeed's lagging (slowest) spans must lag behind its leading (fastest) spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to lagging spans is disabled application
changefeed.memory.per_changefeed_limit byte size 512 MiB controls amount of data that can be buffered per changefeed application
changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance) duration 0s minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress application
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<tr><td><div id="setting-changefeed-event-consumer-worker-queue-size" class="anchored"><code>changefeed.event_consumer_worker_queue_size</code></div></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-frontier-highwater-lag-checkpoint-threshold" class="anchored"><code>changefeed.frontier_highwater_lag_checkpoint_threshold</code></div></td><td>duration</td><td><code>10m0s</code></td><td>controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-frontier-highwater-lag-checkpoint-threshold" class="anchored"><code>changefeed.span_checkpoint.lag_threshold<br />(alias: changefeed.frontier_highwater_lag_checkpoint_threshold)</code></div></td><td>duration</td><td><code>10m0s</code></td><td>the amount of time a changefeed&#39;s lagging (slowest) spans must lag behind its leading (fastest) spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to lagging spans is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-memory-per-changefeed-limit" class="anchored"><code>changefeed.memory.per_changefeed_limit</code></div></td><td>byte size</td><td><code>512 MiB</code></td><td>controls amount of data that can be buffered per changefeed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-min-highwater-advance" class="anchored"><code>changefeed.resolved_timestamp.min_update_interval<br />(alias: changefeed.min_highwater_advance)</code></div></td><td>duration</td><td><code>0s</code></td><td>minimum amount of time that must have elapsed since the last time a changefeed&#39;s resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,11 +1219,11 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {

var maxCheckpointSize int64 = 100 << 20
// Ensure that checkpoints happen every time by setting a large checkpoint size.
// Because setting 0 for the FrontierCheckpointFrequency disables checkpointing,
// Because setting 0 for the SpanCheckpointInterval disables checkpointing,
// setting 1 nanosecond is the smallest possible value.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1*time.Nanosecond)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)

// Note the tableSpan to avoid resolved events that leave no gaps
Expand Down Expand Up @@ -1385,9 +1385,9 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
}

// Checkpoint progress frequently, and set the checkpoint size limit.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)

registry := s.Server.JobRegistry().(*jobs.Registry)
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1094,11 +1094,11 @@ func newJobState(
}

func canCheckpointSpans(sv *settings.Values, lastCheckpoint time.Time) bool {
freq := changefeedbase.FrontierCheckpointFrequency.Get(sv)
if freq == 0 {
interval := changefeedbase.SpanCheckpointInterval.Get(sv)
if interval == 0 {
return false
}
return timeutil.Since(lastCheckpoint) > freq
return timeutil.Since(lastCheckpoint) > interval
}

func (j *jobState) canCheckpointSpans() bool {
Expand Down Expand Up @@ -1311,9 +1311,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {
return
}
cf.js.job = job
if changefeedbase.FrontierCheckpointFrequency.Get(&cf.FlowCtx.Cfg.Settings.SV) == 0 {
if changefeedbase.SpanCheckpointInterval.Get(&cf.FlowCtx.Cfg.Settings.SV) == 0 {
log.Warning(ctx,
"Frontier checkpointing disabled; set changefeed.frontier_checkpoint_frequency to non-zero value to re-enable")
"span-level checkpointing disabled; set changefeed.span_checkpoint.interval to positive duration to re-enable")
}

// Recover highwater information from job progress.
Expand Down Expand Up @@ -1675,7 +1675,7 @@ func (cf *changeFrontier) maybeCheckpointJob(
// If the highwater has moved an empty checkpoint will be saved
var checkpoint jobspb.ChangefeedProgress_Checkpoint
if updateCheckpoint {
maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV)
maxBytes := changefeedbase.SpanCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV)
checkpoint = cf.frontier.MakeCheckpoint(maxBytes)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,7 @@ func reconcileJobStateWithLocalState(
}
}

maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Get(&execCfg.Settings.SV)
maxBytes := changefeedbase.SpanCheckpointMaxBytes.Get(&execCfg.Settings.SV)
checkpoint := checkpoint.Make(sf.Frontier(), func(forEachSpan span.Operation) {
for _, fs := range localState.aggregatorFrontier {
forEachSpan(fs.Span, fs.Timestamp)
Expand Down
20 changes: 10 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2336,11 +2336,11 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {

// Checkpoint progress frequently, allow a large enough checkpoint, and
// reduce the lag threshold to allow lag checkpointing to trigger
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.ClusterSettings().SV, 100<<20)
changefeedbase.FrontierHighwaterLagCheckpointThreshold.Override(
changefeedbase.SpanCheckpointLagThreshold.Override(
context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond)

// We'll start changefeed with the cursor.
Expand Down Expand Up @@ -2522,9 +2522,9 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
require.NoError(t, jobFeed.Pause())

// Checkpoint progress frequently, and set the checkpoint size limit.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)

var tableSpan roachpb.Span
Expand Down Expand Up @@ -7341,9 +7341,9 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
}

// Checkpoint progress frequently, and set the checkpoint size limit.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)

registry := s.Server.JobRegistry().(*jobs.Registry)
Expand Down Expand Up @@ -7492,9 +7492,9 @@ func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) {
b.Header.MaxSpanRequestKeys = 1 + rnd.Int63n(25)
return nil
}
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, 100<<20)

emittedCount := 0
Expand Down Expand Up @@ -9936,7 +9936,7 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
// Checkpoint and trigger potential protected timestamp updates frequently.
// Make the protected timestamp lag long enough that it shouldn't be
// immediately updated after a restart.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
changefeedbase.ProtectTimestampInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
Expand Down
37 changes: 23 additions & 14 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,36 @@ var IdleTimeout = settings.RegisterDurationSetting(
settings.WithName("changefeed.auto_idle.timeout"),
)

// FrontierCheckpointFrequency controls the frequency of frontier checkpoints.
var FrontierCheckpointFrequency = settings.RegisterDurationSetting(
// SpanCheckpointInterval controls how often span-level checkpoints
// can be written.
var SpanCheckpointInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.frontier_checkpoint_frequency",
"controls the frequency with which span level checkpoints will be written; if 0, disabled",
"interval at which span-level checkpoints will be written; "+
"if 0, span-level checkpoints are disabled",
10*time.Minute,
settings.NonNegativeDuration,
settings.WithName("changefeed.span_checkpoint.interval"),
)

// FrontierHighwaterLagCheckpointThreshold controls the amount the high-water
// mark is allowed to lag behind the leading edge of the frontier before we
// begin to attempt checkpointing spans above the high-water mark
var FrontierHighwaterLagCheckpointThreshold = settings.RegisterDurationSetting(
// SpanCheckpointLagThreshold controls the amount of time a changefeed's
// lagging spans must lag behind its leading spans before a span-level
// checkpoint is written.
var SpanCheckpointLagThreshold = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.frontier_highwater_lag_checkpoint_threshold",
"controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled",
"the amount of time a changefeed's lagging (slowest) spans must lag "+
"behind its leading (fastest) spans before a span-level checkpoint "+
"to save leading span progress is written; if 0, span-level checkpoints "+
"due to lagging spans is disabled",
10*time.Minute,
settings.NonNegativeDuration,
settings.WithPublic)
settings.WithPublic,
settings.WithName("changefeed.span_checkpoint.lag_threshold"),
)

// FrontierCheckpointMaxBytes controls the maximum number of key bytes that will be added
// to the checkpoint record.
// SpanCheckpointMaxBytes controls the maximum number of key bytes that will be added
// to a span-level checkpoint record.
// Checkpoint record could be fairly large.
// Assume we have a 10T table, and a 1/2G max range size: 20K spans.
// Span frontier merges adjacent spans, so worst case we have 10K spans.
Expand All @@ -100,12 +108,13 @@ var FrontierHighwaterLagCheckpointThreshold = settings.RegisterDurationSetting(
// - Assume we want to have at most 150MB worth of checkpoints in the job record.
//
// Therefore, we should write at most 6 MB of checkpoint/hour; OR, based on the default
// FrontierCheckpointFrequency setting, 1 MB per checkpoint.
var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting(
// SpanCheckpointInterval setting, 1 MB per checkpoint.
var SpanCheckpointMaxBytes = settings.RegisterByteSizeSetting(
settings.ApplicationLevel,
"changefeed.frontier_checkpoint_max_bytes",
"controls the maximum size of the checkpoint as a total size of key bytes",
"the maximum size of a changefeed span-level checkpoint as measured by the total size of key bytes",
1<<20, // 1 MiB
settings.WithName("changefeed.span_checkpoint.max_bytes"),
)

// ScanRequestLimit is the number of Scan requests that can run at once.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestCheckpointCatchupTime(t *testing.T) {
defer log.Scope(t).Close(t)

const numSpans = 100
maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Default()
maxBytes := changefeedbase.SpanCheckpointMaxBytes.Default()
hwm := hlc.Timestamp{}
rng, _ := randutil.NewTestRand()

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/resolvedspan/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ func (f *resolvedSpanFrontier) assertBoundaryNotEarlier(

// HasLaggingSpans returns whether the frontier has lagging spans as defined
// by whether the frontier trails the latest timestamp by at least
// changefeedbase.FrontierHighwaterLagCheckpointThreshold.
// changefeedbase.SpanCheckpointLagThreshold.
func (f *resolvedSpanFrontier) HasLaggingSpans(sv *settings.Values) bool {
lagThresholdNanos := int64(changefeedbase.FrontierHighwaterLagCheckpointThreshold.Get(sv))
lagThresholdNanos := int64(changefeedbase.SpanCheckpointLagThreshold.Get(sv))
if lagThresholdNanos == 0 {
return false
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,13 +1047,13 @@ func runCDCInitialScanRollingRestart(
switch checkpointType {
case cdcNormalCheckpoint:
setupStmts = append(setupStmts,
`SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '1s'`,
`SET CLUSTER SETTING changefeed.span_checkpoint.interval = '1s'`,
`SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'false'`,
)
case cdcShutdownCheckpoint:
const largeSplitCount = 5
setupStmts = append(setupStmts,
`SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '0'`,
`SET CLUSTER SETTING changefeed.span_checkpoint.interval = '0'`,
`SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'true'`,
// Split some bigger chunks up to scatter it a bit more.
fmt.Sprintf(`ALTER TABLE large SPLIT AT SELECT id FROM large ORDER BY random() LIMIT %d`, largeSplitCount/4),
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ func makeCDCBenchOptions(c cluster.Cluster) (option.StartOpts, install.ClusterSe

// Checkpoint frequently. Some of the larger benchmarks might overload the
// cluster. Producing frequent span-level checkpoints helps with recovery.
settings.ClusterSettings["changefeed.frontier_checkpoint_frequency"] = "60s"
settings.ClusterSettings["changefeed.frontier_highwater_lag_checkpoint_threshold"] = "30s"
settings.ClusterSettings["changefeed.span_checkpoint.interval"] = "60s"
settings.ClusterSettings["changefeed.span_checkpoint.lag_threshold"] = "30s"

// Bump up the number of allowed catchup scans. Doing catchup for 100k ranges with default
// configuration (8 client side, 16 per store) takes a while (~1500-2000 ranges per min minutes).
Expand Down
Loading