Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
137805: sql/row: use Put instead of CPut when updating value of secondary index r=yuzefovich,stevendanna a=michae2

**sql/row: use Put instead of CPut when updating value of secondary index**

When an UPDATE statement changes the value but not the key of a secondary index (e.g. an update to the stored columns of a secondary index) we need to write a new version of the secondary index KV with the new value.

We were using a CPutAllowingIfNotExists to do this, which verified that if the KV existed, the expected value was the value before update. But there's no need for this verification. We have other mechanisms to detect a write-write conflict with any other transaction that could have changed the value concurrently. We can simply use a Put to overwrite the previous value.

This also matches what we do for the primary index when the PK doesn't change.

Epic: None

Release note: None

---

**sql: change CPutAllowingIfNotExists with nil expValue to CPut**

CPutAllowingIfNotExists with empty expValue is equivalent to CPut with empty expValue, so change a few spots to use regular CPut. This almost gets rid of CPutAllowingIfNotExists entirely, but there is at least one spot in backfill (introduced in #138707) that needs to allow for both a non-empty expValue and non-existence of the KV.

Also drop "(expecting does not exist)" from CPut tracing, as CPut with empty expValue is now overwhelmingly the most common use of CPut. And this matches the tracing in #138707.

Epic: None

Release note: None

139064: changefeedccl: rename span-level checkpoint settings for consistency r=asg0451 a=andyyang890

See individual commits

---

Epic: CRDB-37337

Co-authored-by: Michael Erickson <[email protected]>
Co-authored-by: Andy Yang <[email protected]>
  • Loading branch information
3 people committed Jan 15, 2025
3 parents 5b7eddf + 4850f7f + 64f47b4 commit f1a746c
Show file tree
Hide file tree
Showing 19 changed files with 118 additions and 105 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ changefeed.default_range_distribution_strategy enumeration default configures ho
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer application
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
changefeed.frontier_highwater_lag_checkpoint_threshold duration 10m0s controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled application
changefeed.span_checkpoint.lag_threshold (alias: changefeed.frontier_highwater_lag_checkpoint_threshold) duration 10m0s the amount of time a changefeed's lagging (slowest) spans must lag behind its leading (fastest) spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to lagging spans is disabled application
changefeed.memory.per_changefeed_limit byte size 512 MiB controls amount of data that can be buffered per changefeed application
changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance) duration 0s minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress application
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<tr><td><div id="setting-changefeed-event-consumer-worker-queue-size" class="anchored"><code>changefeed.event_consumer_worker_queue_size</code></div></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-frontier-highwater-lag-checkpoint-threshold" class="anchored"><code>changefeed.frontier_highwater_lag_checkpoint_threshold</code></div></td><td>duration</td><td><code>10m0s</code></td><td>controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-frontier-highwater-lag-checkpoint-threshold" class="anchored"><code>changefeed.span_checkpoint.lag_threshold<br />(alias: changefeed.frontier_highwater_lag_checkpoint_threshold)</code></div></td><td>duration</td><td><code>10m0s</code></td><td>the amount of time a changefeed&#39;s lagging (slowest) spans must lag behind its leading (fastest) spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to lagging spans is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-memory-per-changefeed-limit" class="anchored"><code>changefeed.memory.per_changefeed_limit</code></div></td><td>byte size</td><td><code>512 MiB</code></td><td>controls amount of data that can be buffered per changefeed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-min-highwater-advance" class="anchored"><code>changefeed.resolved_timestamp.min_update_interval<br />(alias: changefeed.min_highwater_advance)</code></div></td><td>duration</td><td><code>0s</code></td><td>minimum amount of time that must have elapsed since the last time a changefeed&#39;s resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,11 +1255,11 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {

var maxCheckpointSize int64 = 100 << 20
// Ensure that checkpoints happen every time by setting a large checkpoint size.
// Because setting 0 for the FrontierCheckpointFrequency disables checkpointing,
// Because setting 0 for the SpanCheckpointInterval disables checkpointing,
// setting 1 nanosecond is the smallest possible value.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1*time.Nanosecond)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)

// Note the tableSpan to avoid resolved events that leave no gaps
Expand Down Expand Up @@ -1421,9 +1421,9 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
}

// Checkpoint progress frequently, and set the checkpoint size limit.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)

registry := s.Server.JobRegistry().(*jobs.Registry)
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1094,11 +1094,11 @@ func newJobState(
}

func canCheckpointSpans(sv *settings.Values, lastCheckpoint time.Time) bool {
freq := changefeedbase.FrontierCheckpointFrequency.Get(sv)
if freq == 0 {
interval := changefeedbase.SpanCheckpointInterval.Get(sv)
if interval == 0 {
return false
}
return timeutil.Since(lastCheckpoint) > freq
return timeutil.Since(lastCheckpoint) > interval
}

func (j *jobState) canCheckpointSpans() bool {
Expand Down Expand Up @@ -1311,9 +1311,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {
return
}
cf.js.job = job
if changefeedbase.FrontierCheckpointFrequency.Get(&cf.FlowCtx.Cfg.Settings.SV) == 0 {
if changefeedbase.SpanCheckpointInterval.Get(&cf.FlowCtx.Cfg.Settings.SV) == 0 {
log.Warning(ctx,
"Frontier checkpointing disabled; set changefeed.frontier_checkpoint_frequency to non-zero value to re-enable")
"span-level checkpointing disabled; set changefeed.span_checkpoint.interval to positive duration to re-enable")
}

// Recover highwater information from job progress.
Expand Down Expand Up @@ -1675,7 +1675,7 @@ func (cf *changeFrontier) maybeCheckpointJob(
// If the highwater has moved an empty checkpoint will be saved
var checkpoint jobspb.ChangefeedProgress_Checkpoint
if updateCheckpoint {
maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV)
maxBytes := changefeedbase.SpanCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV)
checkpoint = cf.frontier.MakeCheckpoint(maxBytes)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,7 @@ func reconcileJobStateWithLocalState(
}
}

maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Get(&execCfg.Settings.SV)
maxBytes := changefeedbase.SpanCheckpointMaxBytes.Get(&execCfg.Settings.SV)
checkpoint := checkpoint.Make(sf.Frontier(), func(forEachSpan span.Operation) {
for _, fs := range localState.aggregatorFrontier {
forEachSpan(fs.Span, fs.Timestamp)
Expand Down
20 changes: 10 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2336,11 +2336,11 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {

// Checkpoint progress frequently, allow a large enough checkpoint, and
// reduce the lag threshold to allow lag checkpointing to trigger
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.ClusterSettings().SV, 100<<20)
changefeedbase.FrontierHighwaterLagCheckpointThreshold.Override(
changefeedbase.SpanCheckpointLagThreshold.Override(
context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond)

// We'll start changefeed with the cursor.
Expand Down Expand Up @@ -2522,9 +2522,9 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
require.NoError(t, jobFeed.Pause())

// Checkpoint progress frequently, and set the checkpoint size limit.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)

var tableSpan roachpb.Span
Expand Down Expand Up @@ -7341,9 +7341,9 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
}

// Checkpoint progress frequently, and set the checkpoint size limit.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)

registry := s.Server.JobRegistry().(*jobs.Registry)
Expand Down Expand Up @@ -7492,9 +7492,9 @@ func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) {
b.Header.MaxSpanRequestKeys = 1 + rnd.Int63n(25)
return nil
}
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
changefeedbase.SpanCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, 100<<20)

emittedCount := 0
Expand Down Expand Up @@ -9936,7 +9936,7 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
// Checkpoint and trigger potential protected timestamp updates frequently.
// Make the protected timestamp lag long enough that it shouldn't be
// immediately updated after a restart.
changefeedbase.FrontierCheckpointFrequency.Override(
changefeedbase.SpanCheckpointInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
changefeedbase.ProtectTimestampInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
Expand Down
37 changes: 23 additions & 14 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,36 @@ var IdleTimeout = settings.RegisterDurationSetting(
settings.WithName("changefeed.auto_idle.timeout"),
)

// FrontierCheckpointFrequency controls the frequency of frontier checkpoints.
var FrontierCheckpointFrequency = settings.RegisterDurationSetting(
// SpanCheckpointInterval controls how often span-level checkpoints
// can be written.
var SpanCheckpointInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.frontier_checkpoint_frequency",
"controls the frequency with which span level checkpoints will be written; if 0, disabled",
"interval at which span-level checkpoints will be written; "+
"if 0, span-level checkpoints are disabled",
10*time.Minute,
settings.NonNegativeDuration,
settings.WithName("changefeed.span_checkpoint.interval"),
)

// FrontierHighwaterLagCheckpointThreshold controls the amount the high-water
// mark is allowed to lag behind the leading edge of the frontier before we
// begin to attempt checkpointing spans above the high-water mark
var FrontierHighwaterLagCheckpointThreshold = settings.RegisterDurationSetting(
// SpanCheckpointLagThreshold controls the amount of time a changefeed's
// lagging spans must lag behind its leading spans before a span-level
// checkpoint is written.
var SpanCheckpointLagThreshold = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.frontier_highwater_lag_checkpoint_threshold",
"controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled",
"the amount of time a changefeed's lagging (slowest) spans must lag "+
"behind its leading (fastest) spans before a span-level checkpoint "+
"to save leading span progress is written; if 0, span-level checkpoints "+
"due to lagging spans is disabled",
10*time.Minute,
settings.NonNegativeDuration,
settings.WithPublic)
settings.WithPublic,
settings.WithName("changefeed.span_checkpoint.lag_threshold"),
)

// FrontierCheckpointMaxBytes controls the maximum number of key bytes that will be added
// to the checkpoint record.
// SpanCheckpointMaxBytes controls the maximum number of key bytes that will be added
// to a span-level checkpoint record.
// Checkpoint record could be fairly large.
// Assume we have a 10T table, and a 1/2G max range size: 20K spans.
// Span frontier merges adjacent spans, so worst case we have 10K spans.
Expand All @@ -100,12 +108,13 @@ var FrontierHighwaterLagCheckpointThreshold = settings.RegisterDurationSetting(
// - Assume we want to have at most 150MB worth of checkpoints in the job record.
//
// Therefore, we should write at most 6 MB of checkpoint/hour; OR, based on the default
// FrontierCheckpointFrequency setting, 1 MB per checkpoint.
var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting(
// SpanCheckpointInterval setting, 1 MB per checkpoint.
var SpanCheckpointMaxBytes = settings.RegisterByteSizeSetting(
settings.ApplicationLevel,
"changefeed.frontier_checkpoint_max_bytes",
"controls the maximum size of the checkpoint as a total size of key bytes",
"the maximum size of a changefeed span-level checkpoint as measured by the total size of key bytes",
1<<20, // 1 MiB
settings.WithName("changefeed.span_checkpoint.max_bytes"),
)

// ScanRequestLimit is the number of Scan requests that can run at once.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestCheckpointCatchupTime(t *testing.T) {
defer log.Scope(t).Close(t)

const numSpans = 100
maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Default()
maxBytes := changefeedbase.SpanCheckpointMaxBytes.Default()
hwm := hlc.Timestamp{}
rng, _ := randutil.NewTestRand()

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/resolvedspan/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ func (f *resolvedSpanFrontier) assertBoundaryNotEarlier(

// HasLaggingSpans returns whether the frontier has lagging spans as defined
// by whether the frontier trails the latest timestamp by at least
// changefeedbase.FrontierHighwaterLagCheckpointThreshold.
// changefeedbase.SpanCheckpointLagThreshold.
func (f *resolvedSpanFrontier) HasLaggingSpans(sv *settings.Values) bool {
lagThresholdNanos := int64(changefeedbase.FrontierHighwaterLagCheckpointThreshold.Get(sv))
lagThresholdNanos := int64(changefeedbase.SpanCheckpointLagThreshold.Get(sv))
if lagThresholdNanos == 0 {
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ CPut /Table/110/1/"@"/1/0 -> nil (tombstone)
CPut /Table/110/1/"\x80"/1/0 -> nil (tombstone)
CPut /Table/110/1/"\xc0"/1/0 -> nil (tombstone)
CPut /Table/110/2/"\xa0"/6/0 -> /BYTES/0x89
CPut /Table/110/2/"\xa0"/4/0 -> /BYTES/0x8a (expecting does not exist)
CPut /Table/110/2/"\xa0"/4/0 -> /BYTES/0x8a
CPut /Table/110/2/" "/4/0 -> nil (tombstone)
CPut /Table/110/2/"@"/4/0 -> nil (tombstone)
CPut /Table/110/2/"\x80"/4/0 -> nil (tombstone)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,13 +1051,13 @@ func runCDCInitialScanRollingRestart(
switch checkpointType {
case cdcNormalCheckpoint:
setupStmts = append(setupStmts,
`SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '1s'`,
`SET CLUSTER SETTING changefeed.span_checkpoint.interval = '1s'`,
`SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'false'`,
)
case cdcShutdownCheckpoint:
const largeSplitCount = 5
setupStmts = append(setupStmts,
`SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '0'`,
`SET CLUSTER SETTING changefeed.span_checkpoint.interval = '0'`,
`SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'true'`,
// Split some bigger chunks up to scatter it a bit more.
fmt.Sprintf(`ALTER TABLE large SPLIT AT SELECT id FROM large ORDER BY random() LIMIT %d`, largeSplitCount/4),
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ func makeCDCBenchOptions(c cluster.Cluster) (option.StartOpts, install.ClusterSe

// Checkpoint frequently. Some of the larger benchmarks might overload the
// cluster. Producing frequent span-level checkpoints helps with recovery.
settings.ClusterSettings["changefeed.frontier_checkpoint_frequency"] = "60s"
settings.ClusterSettings["changefeed.frontier_highwater_lag_checkpoint_threshold"] = "30s"
settings.ClusterSettings["changefeed.span_checkpoint.interval"] = "60s"
settings.ClusterSettings["changefeed.span_checkpoint.lag_threshold"] = "30s"

// Bump up the number of allowed catchup scans. Doing catchup for 100k ranges with default
// configuration (8 client side, 16 per store) takes a while (~1500-2000 ranges per min minutes).
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/bootstrap/kv_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (w KVWriter) Insert(
if kvTrace {
log.VEventf(ctx, 2, "CPut %s -> %s", kv.Key, kv.Value)
}
b.CPutAllowingIfNotExists(kv.Key, &kv.Value, nil /* expValue */)
b.CPut(kv.Key, &kv.Value, nil /* expValue */)
}
return nil
}
Expand Down
Loading

0 comments on commit f1a746c

Please sign in to comment.