From bf608152e604253b72a65480a3ad1628065166d5 Mon Sep 17 00:00:00 2001 From: Andy Yang Date: Mon, 13 Jan 2025 16:16:17 -0500 Subject: [PATCH] changefeedccl: rename changefeed.frontier_checkpoint_frequency setting This patch renames `changefeed.frontier_checkpoint_frequency` to `changefeed.span_checkpoint.interval` for consistency. Release note: None --- pkg/ccl/changefeedccl/alter_changefeed_test.go | 6 +++--- pkg/ccl/changefeedccl/changefeed_processors.go | 10 +++++----- pkg/ccl/changefeedccl/changefeed_test.go | 10 +++++----- pkg/ccl/changefeedccl/changefeedbase/settings.go | 11 +++++++---- pkg/cmd/roachtest/tests/cdc.go | 4 ++-- pkg/cmd/roachtest/tests/cdc_bench.go | 2 +- 6 files changed, 23 insertions(+), 20 deletions(-) diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index bd93464d5166..764d0b94d568 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -1219,9 +1219,9 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) { var maxCheckpointSize int64 = 100 << 20 // Ensure that checkpoints happen every time by setting a large checkpoint size. - // Because setting 0 for the FrontierCheckpointFrequency disables checkpointing, + // Because setting 0 for the SpanCheckpointInterval disables checkpointing, // setting 1 nanosecond is the smallest possible value. - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 1*time.Nanosecond) changefeedbase.SpanCheckpointMaxBytes.Override( context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize) @@ -1385,7 +1385,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { } // Checkpoint progress frequently, and set the checkpoint size limit. - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 1) changefeedbase.SpanCheckpointMaxBytes.Override( context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index c0848b046b92..94630f8fb3ce 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1094,11 +1094,11 @@ func newJobState( } func canCheckpointSpans(sv *settings.Values, lastCheckpoint time.Time) bool { - freq := changefeedbase.FrontierCheckpointFrequency.Get(sv) - if freq == 0 { + interval := changefeedbase.SpanCheckpointInterval.Get(sv) + if interval == 0 { return false } - return timeutil.Since(lastCheckpoint) > freq + return timeutil.Since(lastCheckpoint) > interval } func (j *jobState) canCheckpointSpans() bool { @@ -1311,9 +1311,9 @@ func (cf *changeFrontier) Start(ctx context.Context) { return } cf.js.job = job - if changefeedbase.FrontierCheckpointFrequency.Get(&cf.FlowCtx.Cfg.Settings.SV) == 0 { + if changefeedbase.SpanCheckpointInterval.Get(&cf.FlowCtx.Cfg.Settings.SV) == 0 { log.Warning(ctx, - "Frontier checkpointing disabled; set changefeed.frontier_checkpoint_frequency to non-zero value to re-enable") + "span-level checkpointing disabled; set changefeed.span_checkpoint.interval to positive duration to re-enable") } // Recover highwater information from job progress. diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 9968b5798890..94222df075aa 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -2336,7 +2336,7 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) { // Checkpoint progress frequently, allow a large enough checkpoint, and // reduce the lag threshold to allow lag checkpointing to trigger - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond) changefeedbase.SpanCheckpointMaxBytes.Override( context.Background(), &s.ClusterSettings().SV, 100<<20) @@ -2522,7 +2522,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { require.NoError(t, jobFeed.Pause()) // Checkpoint progress frequently, and set the checkpoint size limit. - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 1) changefeedbase.SpanCheckpointMaxBytes.Override( context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize) @@ -7341,7 +7341,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) { } // Checkpoint progress frequently, and set the checkpoint size limit. - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 1) changefeedbase.SpanCheckpointMaxBytes.Override( context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize) @@ -7492,7 +7492,7 @@ func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) { b.Header.MaxSpanRequestKeys = 1 + rnd.Int63n(25) return nil } - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 1) changefeedbase.SpanCheckpointMaxBytes.Override( context.Background(), &s.Server.ClusterSettings().SV, 100<<20) @@ -9936,7 +9936,7 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) { // Checkpoint and trigger potential protected timestamp updates frequently. // Make the protected timestamp lag long enough that it shouldn't be // immediately updated after a restart. - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond) changefeedbase.ProtectTimestampInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index c1ae7aec921e..f28590b92ed0 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -67,13 +67,16 @@ var IdleTimeout = settings.RegisterDurationSetting( settings.WithName("changefeed.auto_idle.timeout"), ) -// FrontierCheckpointFrequency controls the frequency of frontier checkpoints. -var FrontierCheckpointFrequency = settings.RegisterDurationSetting( +// SpanCheckpointInterval controls how often span-level checkpoints +// can be written. +var SpanCheckpointInterval = settings.RegisterDurationSetting( settings.ApplicationLevel, "changefeed.frontier_checkpoint_frequency", - "controls the frequency with which span level checkpoints will be written; if 0, disabled", + "interval at which span-level checkpoints will be written; "+ + "if 0, span-level checkpoints are disabled", 10*time.Minute, settings.NonNegativeDuration, + settings.WithName("changefeed.span_checkpoint.interval"), ) // FrontierHighwaterLagCheckpointThreshold controls the amount the high-water @@ -100,7 +103,7 @@ var FrontierHighwaterLagCheckpointThreshold = settings.RegisterDurationSetting( // - Assume we want to have at most 150MB worth of checkpoints in the job record. // // Therefore, we should write at most 6 MB of checkpoint/hour; OR, based on the default -// FrontierCheckpointFrequency setting, 1 MB per checkpoint. +// SpanCheckpointInterval setting, 1 MB per checkpoint. var SpanCheckpointMaxBytes = settings.RegisterByteSizeSetting( settings.ApplicationLevel, "changefeed.frontier_checkpoint_max_bytes", diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 3cdde1b96141..44a1a90490e5 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -1047,13 +1047,13 @@ func runCDCInitialScanRollingRestart( switch checkpointType { case cdcNormalCheckpoint: setupStmts = append(setupStmts, - `SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '1s'`, + `SET CLUSTER SETTING changefeed.span_checkpoint.interval = '1s'`, `SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'false'`, ) case cdcShutdownCheckpoint: const largeSplitCount = 5 setupStmts = append(setupStmts, - `SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '0'`, + `SET CLUSTER SETTING changefeed.span_checkpoint.interval = '0'`, `SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'true'`, // Split some bigger chunks up to scatter it a bit more. fmt.Sprintf(`ALTER TABLE large SPLIT AT SELECT id FROM large ORDER BY random() LIMIT %d`, largeSplitCount/4), diff --git a/pkg/cmd/roachtest/tests/cdc_bench.go b/pkg/cmd/roachtest/tests/cdc_bench.go index fd17c9ea067f..d501c48835d3 100644 --- a/pkg/cmd/roachtest/tests/cdc_bench.go +++ b/pkg/cmd/roachtest/tests/cdc_bench.go @@ -173,7 +173,7 @@ func makeCDCBenchOptions(c cluster.Cluster) (option.StartOpts, install.ClusterSe // Checkpoint frequently. Some of the larger benchmarks might overload the // cluster. Producing frequent span-level checkpoints helps with recovery. - settings.ClusterSettings["changefeed.frontier_checkpoint_frequency"] = "60s" + settings.ClusterSettings["changefeed.span_checkpoint.interval"] = "60s" settings.ClusterSettings["changefeed.frontier_highwater_lag_checkpoint_threshold"] = "30s" // Bump up the number of allowed catchup scans. Doing catchup for 100k ranges with default