Skip to content

Commit

Permalink
changefeedccl: rename changefeed.frontier_checkpoint_frequency setting
Browse files Browse the repository at this point in the history
This patch renames `changefeed.frontier_checkpoint_frequency` to
`changefeed.span_checkpoint.interval` for consistency.

Release note: None
  • Loading branch information
andyyang890 committed Jan 14, 2025
1 parent 3c7e235 commit bf60815
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 20 deletions.
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,9 +1219,9 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {

var maxCheckpointSize int64 = 100 << 20
// Ensure that checkpoints happen every time by setting a large checkpoint size.
// Because setting 0 for the FrontierCheckpointFrequency disables checkpointing,
// Because setting 0 for the SpanCheckpointInterval disables checkpointing,
// setting 1 nanosecond is the smallest possible value.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1*time.Nanosecond)
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)
Expand Down Expand Up @@ -1385,7 +1385,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
}

// Checkpoint progress frequently, and set the checkpoint size limit.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1094,11 +1094,11 @@ func newJobState(
}

func canCheckpointSpans(sv *settings.Values, lastCheckpoint time.Time) bool {
freq := changefeedbase.FrontierCheckpointFrequency.Get(sv)
if freq == 0 {
interval := changefeedbase.SpanCheckpointInterval.Get(sv)
if interval == 0 {
return false
}
return timeutil.Since(lastCheckpoint) > freq
return timeutil.Since(lastCheckpoint) > interval
}

func (j *jobState) canCheckpointSpans() bool {
Expand Down Expand Up @@ -1311,9 +1311,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {
return
}
cf.js.job = job
if changefeedbase.FrontierCheckpointFrequency.Get(&cf.FlowCtx.Cfg.Settings.SV) == 0 {
if changefeedbase.SpanCheckpointInterval.Get(&cf.FlowCtx.Cfg.Settings.SV) == 0 {
log.Warning(ctx,
"Frontier checkpointing disabled; set changefeed.frontier_checkpoint_frequency to non-zero value to re-enable")
"span-level checkpointing disabled; set changefeed.span_checkpoint.interval to positive duration to re-enable")
}

// Recover highwater information from job progress.
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2336,7 +2336,7 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {

// Checkpoint progress frequently, allow a large enough checkpoint, and
// reduce the lag threshold to allow lag checkpointing to trigger
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond)
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.ClusterSettings().SV, 100<<20)
Expand Down Expand Up @@ -2522,7 +2522,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
require.NoError(t, jobFeed.Pause())

// Checkpoint progress frequently, and set the checkpoint size limit.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)
Expand Down Expand Up @@ -7341,7 +7341,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
}

// Checkpoint progress frequently, and set the checkpoint size limit.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)
Expand Down Expand Up @@ -7492,7 +7492,7 @@ func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) {
b.Header.MaxSpanRequestKeys = 1 + rnd.Int63n(25)
return nil
}
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, 100<<20)
Expand Down Expand Up @@ -9936,7 +9936,7 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
// Checkpoint and trigger potential protected timestamp updates frequently.
// Make the protected timestamp lag long enough that it shouldn't be
// immediately updated after a restart.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
changefeedbase.ProtectTimestampInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,16 @@ var IdleTimeout = settings.RegisterDurationSetting(
settings.WithName("changefeed.auto_idle.timeout"),
)

// FrontierCheckpointFrequency controls the frequency of frontier checkpoints.
var FrontierCheckpointFrequency = settings.RegisterDurationSetting(
// SpanCheckpointInterval controls how often span-level checkpoints
// can be written.
var SpanCheckpointInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.frontier_checkpoint_frequency",
"controls the frequency with which span level checkpoints will be written; if 0, disabled",
"interval at which span-level checkpoints will be written; "+
"if 0, span-level checkpoints are disabled",
10*time.Minute,
settings.NonNegativeDuration,
settings.WithName("changefeed.span_checkpoint.interval"),
)

// FrontierHighwaterLagCheckpointThreshold controls the amount the high-water
Expand All @@ -100,7 +103,7 @@ var FrontierHighwaterLagCheckpointThreshold = settings.RegisterDurationSetting(
// - Assume we want to have at most 150MB worth of checkpoints in the job record.
//
// Therefore, we should write at most 6 MB of checkpoint/hour; OR, based on the default
// FrontierCheckpointFrequency setting, 1 MB per checkpoint.
// SpanCheckpointInterval setting, 1 MB per checkpoint.
var SpanCheckpointMaxBytes = settings.RegisterByteSizeSetting(
settings.ApplicationLevel,
"changefeed.frontier_checkpoint_max_bytes",
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,13 +1047,13 @@ func runCDCInitialScanRollingRestart(
switch checkpointType {
case cdcNormalCheckpoint:
setupStmts = append(setupStmts,
`SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '1s'`,
`SET CLUSTER SETTING changefeed.span_checkpoint.interval = '1s'`,
`SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'false'`,
)
case cdcShutdownCheckpoint:
const largeSplitCount = 5
setupStmts = append(setupStmts,
`SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '0'`,
`SET CLUSTER SETTING changefeed.span_checkpoint.interval = '0'`,
`SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'true'`,
// Split some bigger chunks up to scatter it a bit more.
fmt.Sprintf(`ALTER TABLE large SPLIT AT SELECT id FROM large ORDER BY random() LIMIT %d`, largeSplitCount/4),
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func makeCDCBenchOptions(c cluster.Cluster) (option.StartOpts, install.ClusterSe

// Checkpoint frequently. Some of the larger benchmarks might overload the
// cluster. Producing frequent span-level checkpoints helps with recovery.
settings.ClusterSettings["changefeed.frontier_checkpoint_frequency"] = "60s"
settings.ClusterSettings["changefeed.span_checkpoint.interval"] = "60s"
settings.ClusterSettings["changefeed.frontier_highwater_lag_checkpoint_threshold"] = "30s"

// Bump up the number of allowed catchup scans. Doing catchup for 100k ranges with default
Expand Down

0 comments on commit bf60815

Please sign in to comment.