diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index b319585a02da..5731bb6fa7d9 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -17,7 +17,7 @@ changefeed.default_range_distribution_strategy enumeration default configures ho changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer application changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application changefeed.fast_gzip.enabled boolean true use fast gzip implementation application -changefeed.frontier_highwater_lag_checkpoint_threshold duration 10m0s controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled application +changefeed.span_checkpoint.lag_threshold (alias: changefeed.frontier_highwater_lag_checkpoint_threshold) duration 10m0s the amount of time a changefeed's lagging (slowest) spans must lag behind its leading (fastest) spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to lagging spans is disabled application changefeed.memory.per_changefeed_limit byte size 512 MiB controls amount of data that can be buffered per changefeed application changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance) duration 0s minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress application changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 4c11feeb0ea4..494baac071e0 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -22,7 +22,7 @@
changefeed.event_consumer_worker_queue_size
integer16if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can bufferServerless/Dedicated/Self-Hosted
changefeed.event_consumer_workers
integer0the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabledServerless/Dedicated/Self-Hosted
changefeed.fast_gzip.enabled
booleantrueuse fast gzip implementationServerless/Dedicated/Self-Hosted -
changefeed.frontier_highwater_lag_checkpoint_threshold
duration10m0scontrols the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabledServerless/Dedicated/Self-Hosted +
changefeed.span_checkpoint.lag_threshold
(alias: changefeed.frontier_highwater_lag_checkpoint_threshold)
duration10m0sthe amount of time a changefeed's lagging (slowest) spans must lag behind its leading (fastest) spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to lagging spans is disabledServerless/Dedicated/Self-Hosted
changefeed.memory.per_changefeed_limit
byte size512 MiBcontrols amount of data that can be buffered per changefeedServerless/Dedicated/Self-Hosted
changefeed.resolved_timestamp.min_update_interval
(alias: changefeed.min_highwater_advance)
duration0sminimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progressServerless/Dedicated/Self-Hosted
changefeed.node_throttle_config
stringspecifies node level throttling configuration for all changefeeedsServerless/Dedicated/Self-Hosted diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 86a0aa35cd57..96b2d3c39758 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -1255,11 +1255,11 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) { var maxCheckpointSize int64 = 100 << 20 // Ensure that checkpoints happen every time by setting a large checkpoint size. - // Because setting 0 for the FrontierCheckpointFrequency disables checkpointing, + // Because setting 0 for the SpanCheckpointInterval disables checkpointing, // setting 1 nanosecond is the smallest possible value. - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 1*time.Nanosecond) - changefeedbase.FrontierCheckpointMaxBytes.Override( + changefeedbase.SpanCheckpointMaxBytes.Override( context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize) // Note the tableSpan to avoid resolved events that leave no gaps @@ -1421,9 +1421,9 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { } // Checkpoint progress frequently, and set the checkpoint size limit. - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 1) - changefeedbase.FrontierCheckpointMaxBytes.Override( + changefeedbase.SpanCheckpointMaxBytes.Override( context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize) registry := s.Server.JobRegistry().(*jobs.Registry) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 0cc941df7383..94630f8fb3ce 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1094,11 +1094,11 @@ func newJobState( } func canCheckpointSpans(sv *settings.Values, lastCheckpoint time.Time) bool { - freq := changefeedbase.FrontierCheckpointFrequency.Get(sv) - if freq == 0 { + interval := changefeedbase.SpanCheckpointInterval.Get(sv) + if interval == 0 { return false } - return timeutil.Since(lastCheckpoint) > freq + return timeutil.Since(lastCheckpoint) > interval } func (j *jobState) canCheckpointSpans() bool { @@ -1311,9 +1311,9 @@ func (cf *changeFrontier) Start(ctx context.Context) { return } cf.js.job = job - if changefeedbase.FrontierCheckpointFrequency.Get(&cf.FlowCtx.Cfg.Settings.SV) == 0 { + if changefeedbase.SpanCheckpointInterval.Get(&cf.FlowCtx.Cfg.Settings.SV) == 0 { log.Warning(ctx, - "Frontier checkpointing disabled; set changefeed.frontier_checkpoint_frequency to non-zero value to re-enable") + "span-level checkpointing disabled; set changefeed.span_checkpoint.interval to positive duration to re-enable") } // Recover highwater information from job progress. @@ -1675,7 +1675,7 @@ func (cf *changeFrontier) maybeCheckpointJob( // If the highwater has moved an empty checkpoint will be saved var checkpoint jobspb.ChangefeedProgress_Checkpoint if updateCheckpoint { - maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV) + maxBytes := changefeedbase.SpanCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV) checkpoint = cf.frontier.MakeCheckpoint(maxBytes) } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 7ea75cbe4aa1..958bf29d2aa2 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -1499,7 +1499,7 @@ func reconcileJobStateWithLocalState( } } - maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Get(&execCfg.Settings.SV) + maxBytes := changefeedbase.SpanCheckpointMaxBytes.Get(&execCfg.Settings.SV) checkpoint := checkpoint.Make(sf.Frontier(), func(forEachSpan span.Operation) { for _, fs := range localState.aggregatorFrontier { forEachSpan(fs.Span, fs.Timestamp) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index e9ebd82b77cd..3e7fbf2c1bc8 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -2336,11 +2336,11 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) { // Checkpoint progress frequently, allow a large enough checkpoint, and // reduce the lag threshold to allow lag checkpointing to trigger - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond) - changefeedbase.FrontierCheckpointMaxBytes.Override( + changefeedbase.SpanCheckpointMaxBytes.Override( context.Background(), &s.ClusterSettings().SV, 100<<20) - changefeedbase.FrontierHighwaterLagCheckpointThreshold.Override( + changefeedbase.SpanCheckpointLagThreshold.Override( context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond) // We'll start changefeed with the cursor. @@ -2522,9 +2522,9 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { require.NoError(t, jobFeed.Pause()) // Checkpoint progress frequently, and set the checkpoint size limit. - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 1) - changefeedbase.FrontierCheckpointMaxBytes.Override( + changefeedbase.SpanCheckpointMaxBytes.Override( context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize) var tableSpan roachpb.Span @@ -7341,9 +7341,9 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) { } // Checkpoint progress frequently, and set the checkpoint size limit. - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 1) - changefeedbase.FrontierCheckpointMaxBytes.Override( + changefeedbase.SpanCheckpointMaxBytes.Override( context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize) registry := s.Server.JobRegistry().(*jobs.Registry) @@ -7492,9 +7492,9 @@ func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) { b.Header.MaxSpanRequestKeys = 1 + rnd.Int63n(25) return nil } - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 1) - changefeedbase.FrontierCheckpointMaxBytes.Override( + changefeedbase.SpanCheckpointMaxBytes.Override( context.Background(), &s.Server.ClusterSettings().SV, 100<<20) emittedCount := 0 @@ -9936,7 +9936,7 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) { // Checkpoint and trigger potential protected timestamp updates frequently. // Make the protected timestamp lag long enough that it shouldn't be // immediately updated after a restart. - changefeedbase.FrontierCheckpointFrequency.Override( + changefeedbase.SpanCheckpointInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond) changefeedbase.ProtectTimestampInterval.Override( context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index e327c13c9757..99673a19ae2c 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -67,28 +67,36 @@ var IdleTimeout = settings.RegisterDurationSetting( settings.WithName("changefeed.auto_idle.timeout"), ) -// FrontierCheckpointFrequency controls the frequency of frontier checkpoints. -var FrontierCheckpointFrequency = settings.RegisterDurationSetting( +// SpanCheckpointInterval controls how often span-level checkpoints +// can be written. +var SpanCheckpointInterval = settings.RegisterDurationSetting( settings.ApplicationLevel, "changefeed.frontier_checkpoint_frequency", - "controls the frequency with which span level checkpoints will be written; if 0, disabled", + "interval at which span-level checkpoints will be written; "+ + "if 0, span-level checkpoints are disabled", 10*time.Minute, settings.NonNegativeDuration, + settings.WithName("changefeed.span_checkpoint.interval"), ) -// FrontierHighwaterLagCheckpointThreshold controls the amount the high-water -// mark is allowed to lag behind the leading edge of the frontier before we -// begin to attempt checkpointing spans above the high-water mark -var FrontierHighwaterLagCheckpointThreshold = settings.RegisterDurationSetting( +// SpanCheckpointLagThreshold controls the amount of time a changefeed's +// lagging spans must lag behind its leading spans before a span-level +// checkpoint is written. +var SpanCheckpointLagThreshold = settings.RegisterDurationSetting( settings.ApplicationLevel, "changefeed.frontier_highwater_lag_checkpoint_threshold", - "controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled", + "the amount of time a changefeed's lagging (slowest) spans must lag "+ + "behind its leading (fastest) spans before a span-level checkpoint "+ + "to save leading span progress is written; if 0, span-level checkpoints "+ + "due to lagging spans is disabled", 10*time.Minute, settings.NonNegativeDuration, - settings.WithPublic) + settings.WithPublic, + settings.WithName("changefeed.span_checkpoint.lag_threshold"), +) -// FrontierCheckpointMaxBytes controls the maximum number of key bytes that will be added -// to the checkpoint record. +// SpanCheckpointMaxBytes controls the maximum number of key bytes that will be added +// to a span-level checkpoint record. // Checkpoint record could be fairly large. // Assume we have a 10T table, and a 1/2G max range size: 20K spans. // Span frontier merges adjacent spans, so worst case we have 10K spans. @@ -100,12 +108,13 @@ var FrontierHighwaterLagCheckpointThreshold = settings.RegisterDurationSetting( // - Assume we want to have at most 150MB worth of checkpoints in the job record. // // Therefore, we should write at most 6 MB of checkpoint/hour; OR, based on the default -// FrontierCheckpointFrequency setting, 1 MB per checkpoint. -var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting( +// SpanCheckpointInterval setting, 1 MB per checkpoint. +var SpanCheckpointMaxBytes = settings.RegisterByteSizeSetting( settings.ApplicationLevel, "changefeed.frontier_checkpoint_max_bytes", - "controls the maximum size of the checkpoint as a total size of key bytes", + "the maximum size of a changefeed span-level checkpoint as measured by the total size of key bytes", 1<<20, // 1 MiB + settings.WithName("changefeed.span_checkpoint.max_bytes"), ) // ScanRequestLimit is the number of Scan requests that can run at once. diff --git a/pkg/ccl/changefeedccl/checkpoint/checkpoint_test.go b/pkg/ccl/changefeedccl/checkpoint/checkpoint_test.go index aa08933ee509..585da42e1d38 100644 --- a/pkg/ccl/changefeedccl/checkpoint/checkpoint_test.go +++ b/pkg/ccl/changefeedccl/checkpoint/checkpoint_test.go @@ -151,7 +151,7 @@ func TestCheckpointCatchupTime(t *testing.T) { defer log.Scope(t).Close(t) const numSpans = 100 - maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Default() + maxBytes := changefeedbase.SpanCheckpointMaxBytes.Default() hwm := hlc.Timestamp{} rng, _ := randutil.NewTestRand() diff --git a/pkg/ccl/changefeedccl/resolvedspan/frontier.go b/pkg/ccl/changefeedccl/resolvedspan/frontier.go index 93e348e04258..ad8d017e212c 100644 --- a/pkg/ccl/changefeedccl/resolvedspan/frontier.go +++ b/pkg/ccl/changefeedccl/resolvedspan/frontier.go @@ -295,9 +295,9 @@ func (f *resolvedSpanFrontier) assertBoundaryNotEarlier( // HasLaggingSpans returns whether the frontier has lagging spans as defined // by whether the frontier trails the latest timestamp by at least -// changefeedbase.FrontierHighwaterLagCheckpointThreshold. +// changefeedbase.SpanCheckpointLagThreshold. func (f *resolvedSpanFrontier) HasLaggingSpans(sv *settings.Values) bool { - lagThresholdNanos := int64(changefeedbase.FrontierHighwaterLagCheckpointThreshold.Get(sv)) + lagThresholdNanos := int64(changefeedbase.SpanCheckpointLagThreshold.Get(sv)) if lagThresholdNanos == 0 { return false } diff --git a/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed b/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed index 7941d3ee1bad..838a3ae43958 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed +++ b/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed @@ -446,7 +446,7 @@ CPut /Table/110/1/"@"/1/0 -> nil (tombstone) CPut /Table/110/1/"\x80"/1/0 -> nil (tombstone) CPut /Table/110/1/"\xc0"/1/0 -> nil (tombstone) CPut /Table/110/2/"\xa0"/6/0 -> /BYTES/0x89 -CPut /Table/110/2/"\xa0"/4/0 -> /BYTES/0x8a (expecting does not exist) +CPut /Table/110/2/"\xa0"/4/0 -> /BYTES/0x8a CPut /Table/110/2/" "/4/0 -> nil (tombstone) CPut /Table/110/2/"@"/4/0 -> nil (tombstone) CPut /Table/110/2/"\x80"/4/0 -> nil (tombstone) diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 7359b210bc87..31c674bf7957 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -1051,13 +1051,13 @@ func runCDCInitialScanRollingRestart( switch checkpointType { case cdcNormalCheckpoint: setupStmts = append(setupStmts, - `SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '1s'`, + `SET CLUSTER SETTING changefeed.span_checkpoint.interval = '1s'`, `SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'false'`, ) case cdcShutdownCheckpoint: const largeSplitCount = 5 setupStmts = append(setupStmts, - `SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '0'`, + `SET CLUSTER SETTING changefeed.span_checkpoint.interval = '0'`, `SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'true'`, // Split some bigger chunks up to scatter it a bit more. fmt.Sprintf(`ALTER TABLE large SPLIT AT SELECT id FROM large ORDER BY random() LIMIT %d`, largeSplitCount/4), diff --git a/pkg/cmd/roachtest/tests/cdc_bench.go b/pkg/cmd/roachtest/tests/cdc_bench.go index b33c7af80642..1486f70c8bc4 100644 --- a/pkg/cmd/roachtest/tests/cdc_bench.go +++ b/pkg/cmd/roachtest/tests/cdc_bench.go @@ -173,8 +173,8 @@ func makeCDCBenchOptions(c cluster.Cluster) (option.StartOpts, install.ClusterSe // Checkpoint frequently. Some of the larger benchmarks might overload the // cluster. Producing frequent span-level checkpoints helps with recovery. - settings.ClusterSettings["changefeed.frontier_checkpoint_frequency"] = "60s" - settings.ClusterSettings["changefeed.frontier_highwater_lag_checkpoint_threshold"] = "30s" + settings.ClusterSettings["changefeed.span_checkpoint.interval"] = "60s" + settings.ClusterSettings["changefeed.span_checkpoint.lag_threshold"] = "30s" // Bump up the number of allowed catchup scans. Doing catchup for 100k ranges with default // configuration (8 client side, 16 per store) takes a while (~1500-2000 ranges per min minutes). diff --git a/pkg/sql/catalog/bootstrap/kv_writer.go b/pkg/sql/catalog/bootstrap/kv_writer.go index f9f0cf3ccede..4d310ea16d56 100644 --- a/pkg/sql/catalog/bootstrap/kv_writer.go +++ b/pkg/sql/catalog/bootstrap/kv_writer.go @@ -97,7 +97,7 @@ func (w KVWriter) Insert( if kvTrace { log.VEventf(ctx, 2, "CPut %s -> %s", kv.Key, kv.Value) } - b.CPutAllowingIfNotExists(kv.Key, &kv.Value, nil /* expValue */) + b.CPut(kv.Key, &kv.Value, nil /* expValue */) } return nil } diff --git a/pkg/sql/opt/exec/execbuilder/testdata/cascade b/pkg/sql/opt/exec/execbuilder/testdata/cascade index 1caa37fc52bd..9aa082ef2c7d 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/cascade +++ b/pkg/sql/opt/exec/execbuilder/testdata/cascade @@ -175,10 +175,10 @@ Del /Table/112/1/"updated"/0 CPut /Table/112/1/"updated2"/0 -> /TUPLE/ executing cascade for constraint b1_update_cascade_fkey Del /Table/113/2/"updated"/0 -CPut /Table/113/2/"updated2"/0 -> /BYTES/0x1262312d706b310001 (expecting does not exist) +CPut /Table/113/2/"updated2"/0 -> /BYTES/0x1262312d706b310001 executing cascade for constraint b2_update_cascade_fkey Del /Table/114/2/"updated"/0 -CPut /Table/114/2/"updated2"/0 -> /BYTES/0x1262322d706b310001 (expecting does not exist) +CPut /Table/114/2/"updated2"/0 -> /BYTES/0x1262322d706b310001 executing cascade for constraint c1_update_cascade_fkey executing cascade for constraint c2_update_cascade_fkey executing cascade for constraint c3_id_fkey @@ -463,10 +463,10 @@ Del /Table/134/1/"original"/0 CPut /Table/134/1/"updated"/0 -> /TUPLE/ executing cascade for constraint b1_update_cascade_fkey Del /Table/135/2/"original"/0 -CPut /Table/135/2/"updated"/0 -> /BYTES/0x1262312d706b310001 (expecting does not exist) +CPut /Table/135/2/"updated"/0 -> /BYTES/0x1262312d706b310001 executing cascade for constraint b2_update_cascade_fkey Del /Table/136/2/"original"/0 -CPut /Table/136/2/"updated"/0 -> /BYTES/0x1262322d706b310001 (expecting does not exist) +CPut /Table/136/2/"updated"/0 -> /BYTES/0x1262322d706b310001 executing cascade for constraint c1_update_set_null_fkey executing cascade for constraint c2_update_set_null_fkey executing cascade for constraint c3_update_set_null_fkey @@ -751,10 +751,10 @@ Del /Table/156/1/"original"/0 CPut /Table/156/1/"updated"/0 -> /TUPLE/ executing cascade for constraint b1_update_cascade_fkey Del /Table/157/2/"original"/0 -CPut /Table/157/2/"updated"/0 -> /BYTES/0x1262312d706b310001 (expecting does not exist) +CPut /Table/157/2/"updated"/0 -> /BYTES/0x1262312d706b310001 executing cascade for constraint b2_update_cascade_fkey Del /Table/158/2/"original"/0 -CPut /Table/158/2/"updated"/0 -> /BYTES/0x1262322d706b310001 (expecting does not exist) +CPut /Table/158/2/"updated"/0 -> /BYTES/0x1262322d706b310001 executing cascade for constraint c1_update_set_null_fkey executing cascade for constraint c2_update_set_null_fkey executing cascade for constraint c3_update_set_null_fkey diff --git a/pkg/sql/opt/exec/execbuilder/testdata/partial_index b/pkg/sql/opt/exec/execbuilder/testdata/partial_index index 685ea575d905..2831023cfb0f 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/partial_index +++ b/pkg/sql/opt/exec/execbuilder/testdata/partial_index @@ -343,8 +343,8 @@ UPDATE t SET b = 11 WHERE a = 5 Scan /Table/112/1/5/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/5/0 -> /TUPLE/2:2:Int/11/1:3:Bytes/baz Del /Table/112/2/4/5/0 -CPut /Table/112/2/11/5/0 -> /BYTES/ (expecting does not exist) -CPut /Table/112/3/11/5/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/11/5/0 -> /BYTES/ +CPut /Table/112/3/11/5/0 -> /BYTES/ # Update a row that matches the first partial index before and after the update # and the index entry does not change. @@ -362,9 +362,9 @@ UPDATE t SET b = 12 WHERE a = 6 Scan /Table/112/1/6/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/6/0 -> /TUPLE/2:2:Int/12/1:3:Bytes/baz Del /Table/112/2/11/6/0 -CPut /Table/112/2/12/6/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/12/6/0 -> /BYTES/ Del /Table/112/3/11/6/0 -CPut /Table/112/3/12/6/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/3/12/6/0 -> /BYTES/ # Update a row that matches the first partial index before the update, but does # not match after the update. @@ -374,7 +374,7 @@ UPDATE t SET b = 9 WHERE a = 6 Scan /Table/112/1/6/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/6/0 -> /TUPLE/2:2:Int/9/1:3:Bytes/baz Del /Table/112/2/12/6/0 -CPut /Table/112/2/9/6/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/9/6/0 -> /BYTES/ Del /Table/112/3/12/6/0 # Update a row that matches both partial indexes before the update, the first @@ -385,9 +385,9 @@ UPDATE t SET c = 'baz', b = 12 WHERE a = 13 Scan /Table/112/1/13/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/13/0 -> /TUPLE/2:2:Int/12/1:3:Bytes/baz Del /Table/112/2/11/13/0 -CPut /Table/112/2/12/13/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/12/13/0 -> /BYTES/ Del /Table/112/3/11/13/0 -CPut /Table/112/3/12/13/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/3/12/13/0 -> /BYTES/ Del /Table/112/4/"foo"/13/0 # Reversing the previous update should reverse the partial index changes. @@ -397,10 +397,10 @@ UPDATE t SET c = 'foo', b = 11 WHERE a = 13 Scan /Table/112/1/13/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/13/0 -> /TUPLE/2:2:Int/11/1:3:Bytes/foo Del /Table/112/2/12/13/0 -CPut /Table/112/2/11/13/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/11/13/0 -> /BYTES/ Del /Table/112/3/12/13/0 -CPut /Table/112/3/11/13/0 -> /BYTES/ (expecting does not exist) -CPut /Table/112/4/"foo"/13/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/3/11/13/0 -> /BYTES/ +CPut /Table/112/4/"foo"/13/0 -> /BYTES/ # Update a row to match a partial index that does not index the column # referenced in predicate. @@ -412,7 +412,7 @@ UPDATE u SET v = 11 WHERE k = 1 ---- Scan /Table/113/1/1/0 lock Exclusive (Block, Unreplicated) Put /Table/113/1/1/0 -> /TUPLE/2:2:Int/2/1:3:Int/11 -CPut /Table/113/2/2/1/0 -> /BYTES/ (expecting does not exist) +CPut /Table/113/2/2/1/0 -> /BYTES/ # Update a row to no longer match a partial index that does not index the column # referenced in predicate. @@ -572,7 +572,7 @@ INSERT INTO t VALUES (5, 3, 'foo') ON CONFLICT (a) DO UPDATE SET b = 3 Scan /Table/112/1/5/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/5/0 -> /TUPLE/2:2:Int/3/1:3:Bytes/bar Del /Table/112/2/4/5/0 -CPut /Table/112/2/3/5/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/3/5/0 -> /BYTES/ # Insert a conflicting row that does not match the first partial index # before the update, but does match after the update. @@ -582,8 +582,8 @@ INSERT INTO t VALUES (5, 7, 'foo') ON CONFLICT (a) DO UPDATE SET b = 11 Scan /Table/112/1/5/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/5/0 -> /TUPLE/2:2:Int/11/1:3:Bytes/bar Del /Table/112/2/3/5/0 -CPut /Table/112/2/11/5/0 -> /BYTES/ (expecting does not exist) -CPut /Table/112/3/11/5/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/11/5/0 -> /BYTES/ +CPut /Table/112/3/11/5/0 -> /BYTES/ # Insert a conflicting row that currently matches the first partial index before # the update. Update the row so that the row no longer matches the first partial @@ -594,9 +594,9 @@ INSERT INTO t VALUES (5, 11, 'bar') ON CONFLICT (a) DO UPDATE SET b = 4, c = 'fo Scan /Table/112/1/5/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/5/0 -> /TUPLE/2:2:Int/4/1:3:Bytes/foo Del /Table/112/2/11/5/0 -CPut /Table/112/2/4/5/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/4/5/0 -> /BYTES/ Del /Table/112/3/11/5/0 -CPut /Table/112/4/"foo"/5/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/4/"foo"/5/0 -> /BYTES/ # Insert a conflicting row that that matches the second partial index before and # after the update and the index entry does not change. @@ -606,7 +606,7 @@ INSERT INTO t VALUES (5, 11, 'bar') ON CONFLICT (a) DO UPDATE SET b = 3 Scan /Table/112/1/5/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/5/0 -> /TUPLE/2:2:Int/3/1:3:Bytes/foo Del /Table/112/2/4/5/0 -CPut /Table/112/2/3/5/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/3/5/0 -> /BYTES/ # Insert a conflicting row that that matches the second partial index before and # after the update and the index entry changes. @@ -616,7 +616,7 @@ INSERT INTO t VALUES (5, 11, 'bar') ON CONFLICT (a) DO UPDATE SET c = 'foobar' Scan /Table/112/1/5/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/5/0 -> /TUPLE/2:2:Int/3/1:3:Bytes/foobar Del /Table/112/4/"foo"/5/0 -CPut /Table/112/4/"foobar"/5/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/4/"foobar"/5/0 -> /BYTES/ # Insert a non-conflicting row that matches the first partial index. query T kvtrace @@ -660,7 +660,7 @@ INSERT INTO u VALUES (2, 3, 11) ON CONFLICT (k) DO UPDATE SET u = 4, v = 12 Scan /Table/113/1/2/0 lock Exclusive (Block, Unreplicated) Put /Table/113/1/2/0 -> /TUPLE/2:2:Int/4/1:3:Int/12 Del /Table/113/2/3/2/0 -CPut /Table/113/2/4/2/0 -> /BYTES/ (expecting does not exist) +CPut /Table/113/2/4/2/0 -> /BYTES/ # --------------------------------------------------------- # INSERT ON CONFLICT DO UPDATE primary key @@ -758,7 +758,7 @@ UPSERT INTO t VALUES (5, 3, 'bar') Scan /Table/112/1/5/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/5/0 -> /TUPLE/2:2:Int/3/1:3:Bytes/bar Del /Table/112/2/4/5/0 -CPut /Table/112/2/3/5/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/3/5/0 -> /BYTES/ # Upsert a conflicting row that does not match the first partial index before # the update, but does match after the update. @@ -768,8 +768,8 @@ UPSERT INTO t VALUES (5, 11, 'bar') Scan /Table/112/1/5/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/5/0 -> /TUPLE/2:2:Int/11/1:3:Bytes/bar Del /Table/112/2/3/5/0 -CPut /Table/112/2/11/5/0 -> /BYTES/ (expecting does not exist) -CPut /Table/112/3/11/5/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/11/5/0 -> /BYTES/ +CPut /Table/112/3/11/5/0 -> /BYTES/ # Upsert a conflicting row that currently matches the first partial index before # the update. Update the row so that the row no longer matches the first partial @@ -780,9 +780,9 @@ UPSERT INTO t VALUES (5, 3, 'foo') Scan /Table/112/1/5/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/5/0 -> /TUPLE/2:2:Int/3/1:3:Bytes/foo Del /Table/112/2/11/5/0 -CPut /Table/112/2/3/5/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/3/5/0 -> /BYTES/ Del /Table/112/3/11/5/0 -CPut /Table/112/4/"foo"/5/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/4/"foo"/5/0 -> /BYTES/ # Upsert a conflicting row that that matches the second partial index before and # after the update and the index entry does not change. @@ -792,7 +792,7 @@ UPSERT INTO t VALUES (5, 4, 'foo') Scan /Table/112/1/5/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/5/0 -> /TUPLE/2:2:Int/4/1:3:Bytes/foo Del /Table/112/2/3/5/0 -CPut /Table/112/2/4/5/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/4/5/0 -> /BYTES/ # Upsert a conflicting row that that matches the second partial index before and # after the update and the index entry changes. @@ -802,7 +802,7 @@ UPSERT INTO t VALUES (5, 4, 'foobar') Scan /Table/112/1/5/0 lock Exclusive (Block, Unreplicated) Put /Table/112/1/5/0 -> /TUPLE/2:2:Int/4/1:3:Bytes/foobar Del /Table/112/4/"foo"/5/0 -CPut /Table/112/4/"foobar"/5/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/4/"foobar"/5/0 -> /BYTES/ # Upsert a non-conflicting row that matches the first partial index. query T kvtrace @@ -846,7 +846,7 @@ UPSERT INTO u VALUES (2, 4, 12) Scan /Table/113/1/2/0 lock Exclusive (Block, Unreplicated) Put /Table/113/1/2/0 -> /TUPLE/2:2:Int/4/1:3:Int/12 Del /Table/113/2/3/2/0 -CPut /Table/113/2/4/2/0 -> /BYTES/ (expecting does not exist) +CPut /Table/113/2/4/2/0 -> /BYTES/ # Tests for partial inverted indexes. diff --git a/pkg/sql/opt/exec/execbuilder/testdata/secondary_index_column_families b/pkg/sql/opt/exec/execbuilder/testdata/secondary_index_column_families index 5a9707457255..4ee6bea32c5a 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/secondary_index_column_families +++ b/pkg/sql/opt/exec/execbuilder/testdata/secondary_index_column_families @@ -228,10 +228,10 @@ SELECT y FROM t@i WHERE x IS NULL Scan /Table/107/2/{NULL-!NULL} # Ensure that updates only touch the changed column families. -query T kvtrace(CPut,prefix=/Table/107/2/) +query T kvtrace(Put,prefix=/Table/107/2/) UPDATE t SET y = 5 WHERE x = 1 ---- -CPut /Table/107/2/1/1/1 -> /TUPLE/2:2:Int/5 (replacing /TUPLE/2:2:Int/2, if exists) +Put /Table/107/2/1/1/1 -> /TUPLE/2:2:Int/5 # Test composite datatypes. statement ok @@ -404,9 +404,9 @@ CPut /Table/112/3/2/5/1 -> /TUPLE/8:8:Int/8 query T kvtrace(Put,Del,CPut,prefix=/Table/112/2/) UPDATE t SET b = 4, c = NULL, d = NULL, e = 7, f = NULL WHERE y = 2 ---- -CPut /Table/112/2/2/1/2/1 -> /TUPLE/3:3:Int/3/1:4:Int/4 (replacing /TUPLE/3:3:Int/3, if exists) +Put /Table/112/2/2/1/2/1 -> /TUPLE/3:3:Int/3/1:4:Int/4 Del /Table/112/2/2/1/3/1 -CPut /Table/112/2/2/1/4/1 -> /TUPLE/7:7:Int/7 (expecting does not exist) +CPut /Table/112/2/2/1/4/1 -> /TUPLE/7:7:Int/7 Del /Table/112/2/2/1/5/1 query IIIIIIII @@ -422,10 +422,10 @@ INSERT INTO t VALUES (3, 3, NULL, NULL, NULL, NULL, NULL, NULL) query T kvtrace(Put,Del,CPut,prefix=/Table/112/2/) UPDATE t SET a = 10, b = 11, c = 12, d = 13, e = 14, f = 15 WHERE y = 3 ---- -CPut /Table/112/2/3/3/2/1 -> /TUPLE/3:3:Int/10/1:4:Int/11 (expecting does not exist) -CPut /Table/112/2/3/3/3/1 -> /TUPLE/5:5:Int/12/1:6:Int/13 (expecting does not exist) -CPut /Table/112/2/3/3/4/1 -> /TUPLE/7:7:Int/14 (expecting does not exist) -CPut /Table/112/2/3/3/5/1 -> /TUPLE/8:8:Int/15 (expecting does not exist) +CPut /Table/112/2/3/3/2/1 -> /TUPLE/3:3:Int/10/1:4:Int/11 +CPut /Table/112/2/3/3/3/1 -> /TUPLE/5:5:Int/12/1:6:Int/13 +CPut /Table/112/2/3/3/4/1 -> /TUPLE/7:7:Int/14 +CPut /Table/112/2/3/3/5/1 -> /TUPLE/8:8:Int/15 # Test a case where the update causes all k/v's other than # the sentinel k/v to get deleted. @@ -447,13 +447,13 @@ query T kvtrace(Put,Del,CPut,prefix=/Table/112/2/) UPDATE t SET y = 22 WHERE y = 21 ---- Del /Table/112/2/21/20/0 -CPut /Table/112/2/22/20/0 -> /BYTES/ (expecting does not exist) +CPut /Table/112/2/22/20/0 -> /BYTES/ Del /Table/112/2/21/20/2/1 -CPut /Table/112/2/22/20/2/1 -> /TUPLE/3:3:Int/22 (expecting does not exist) +CPut /Table/112/2/22/20/2/1 -> /TUPLE/3:3:Int/22 Del /Table/112/2/21/20/3/1 -CPut /Table/112/2/22/20/3/1 -> /TUPLE/6:6:Int/25 (expecting does not exist) +CPut /Table/112/2/22/20/3/1 -> /TUPLE/6:6:Int/25 Del /Table/112/2/21/20/5/1 -CPut /Table/112/2/22/20/5/1 -> /TUPLE/8:8:Int/27 (expecting does not exist) +CPut /Table/112/2/22/20/5/1 -> /TUPLE/8:8:Int/27 # Ensure that the final results on both indexes make sense. query IIIIIIII rowsort @@ -489,10 +489,10 @@ query T kvtrace(Put,CPut,Del,prefix=/Table/113/2/) UPDATE t SET y = 5 where y = 2 ---- Del /Table/113/2/2/1/0 -CPut /Table/113/2/5/1/0 -> /BYTES/0x33061308 (expecting does not exist) +CPut /Table/113/2/5/1/0 -> /BYTES/0x33061308 # Changing the value just results in a cput. query T kvtrace(Put,Del,CPut,prefix=/Table/113/2/) UPDATE t SET z = 5 where y = 5 ---- -CPut /Table/113/2/5/1/0 -> /BYTES/0x330a1308 (replacing /BYTES/0x33061308, if exists) +Put /Table/113/2/5/1/0 -> /BYTES/0x330a1308 diff --git a/pkg/sql/opt/exec/execbuilder/testdata/upsert b/pkg/sql/opt/exec/execbuilder/testdata/upsert index 32d970efe426..d30539b525c2 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/upsert +++ b/pkg/sql/opt/exec/execbuilder/testdata/upsert @@ -1099,7 +1099,7 @@ colbatchscan Scan /Table/120/1/2/0 lock Exclusive (Block, Unreplicated) colbatchscan fetched: /kv/kv_pkey/2/v -> /3 count Put /Table/120/1/2/0 -> /TUPLE/2:2:Int/2 count Del /Table/120/2/3/0 -count CPut /Table/120/2/2/0 -> /BYTES/0x8a (expecting does not exist) +count CPut /Table/120/2/2/0 -> /BYTES/0x8a sql query execution failed after 0 rows: duplicate key value violates unique constraint "woo" # Test that implicit SELECT FOR UPDATE doesn't spread to subqueries. diff --git a/pkg/sql/row/updater.go b/pkg/sql/row/updater.go index 55cb1e782c0b..7eed190609a1 100644 --- a/pkg/sql/row/updater.go +++ b/pkg/sql/row/updater.go @@ -398,18 +398,18 @@ func (ru *Updater) UpdateRow( oldEntry, newEntry := &oldEntries[oldIdx], &newEntries[newIdx] if oldEntry.Family == newEntry.Family { // If the families are equal, then check if the keys have changed. If so, delete the old key. - // Then, issue a CPut for the new value of the key if the value has changed. + // Then, issue a CPut for the new key or a Put if only the value has changed. // Because the indexes will always have a k/v for family 0, it suffices to only // add foreign key checks in this case, because we are guaranteed to enter here. oldIdx++ newIdx++ - var expValue []byte + var sameKey bool if !bytes.Equal(oldEntry.Key, newEntry.Key) { if err := ru.Helper.deleteIndexEntry(ctx, batch, index, ru.Helper.secIndexValDirs[i], oldEntry, traceKV); err != nil { return nil, err } } else if !newEntry.Value.EqualTagAndData(oldEntry.Value) { - expValue = oldEntry.Value.TagAndDataBytes() + sameKey = true } else if !index.IsTemporaryIndexForBackfill() { // If this is a temporary index for backfill, we want to make sure we write out all // index values even in the case where they should be the same. We do this because the @@ -428,13 +428,17 @@ func (ru *Updater) UpdateRow( if traceKV { k := keys.PrettyPrint(ru.Helper.secIndexValDirs[i], newEntry.Key) v := newEntry.Value.PrettyPrint() - if expValue != nil { - log.VEventf(ctx, 2, "CPut %s -> %v (replacing %v, if exists)", k, v, oldEntry.Value.PrettyPrint()) + if sameKey { + log.VEventf(ctx, 2, "Put %s -> %v", k, v) } else { - log.VEventf(ctx, 2, "CPut %s -> %v (expecting does not exist)", k, v) + log.VEventf(ctx, 2, "CPut %s -> %v", k, v) } } - batch.CPutAllowingIfNotExists(newEntry.Key, &newEntry.Value, expValue) + if sameKey { + batch.Put(newEntry.Key, &newEntry.Value) + } else { + batch.CPut(newEntry.Key, &newEntry.Value, nil /* expValue */) + } } writtenIndexes.Add(i) } else if oldEntry.Family < newEntry.Family { @@ -468,7 +472,7 @@ func (ru *Updater) UpdateRow( if traceKV { k := keys.PrettyPrint(ru.Helper.secIndexValDirs[i], newEntry.Key) v := newEntry.Value.PrettyPrint() - log.VEventf(ctx, 2, "CPut %s -> %v (expecting does not exist)", k, v) + log.VEventf(ctx, 2, "CPut %s -> %v", k, v) } batch.CPut(newEntry.Key, &newEntry.Value, nil) } @@ -502,7 +506,7 @@ func (ru *Updater) UpdateRow( if traceKV { k := keys.PrettyPrint(ru.Helper.secIndexValDirs[i], newEntry.Key) v := newEntry.Value.PrettyPrint() - log.VEventf(ctx, 2, "CPut %s -> %v (expecting does not exist)", k, v) + log.VEventf(ctx, 2, "CPut %s -> %v", k, v) } batch.CPut(newEntry.Key, &newEntry.Value, nil) } diff --git a/pkg/sql/testdata/index_mutations/merging b/pkg/sql/testdata/index_mutations/merging index fecbdff32eb8..ae2b8ef81f09 100644 --- a/pkg/sql/testdata/index_mutations/merging +++ b/pkg/sql/testdata/index_mutations/merging @@ -148,7 +148,7 @@ UPDATE tefp SET a = a + 1, b = b + 100 Scan /Table/108/{1-2} lock Exclusive (Block, Unreplicated) Put /Table/108/1/1/0 -> /TUPLE/2:2:Int/3/1:3:Int/200 Del /Table/108/2/102/0 -CPut /Table/108/2/203/0 -> /BYTES/0x89 (expecting does not exist) +CPut /Table/108/2/203/0 -> /BYTES/0x89 # Update a row with different values without changing the index entry. kvtrace