Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
137947: ccl/changeedccl: Add changefeed options into nemesis tests r=wenyihu6 a=aerfrei

This work makes sure our nemesis tests for changefeeds randomize
over the options we use upon changefeed creation. This randomly adds
the key_in_value option (see below) and full_table_name option half
of the time and checks that the changefeed messages respect them in
the beforeAfter validator.

Note the following limitations: the full_table_name option, when on,
asserts that the topic in the output will be d.public.{table_name}
instead of checking for the actual name of the database/schema.

This change also does not add the key_in_value option when for the
webhook and cloudstorage sinks. Even before this change, since
key_in_value is on by default for those sinks, we remove the key
from the value in those testfeed messages for ease of testing.
Unfortunately, this makes these cases hard to test, so we leave them
out for now.

See also: #134119

Epic: [CRDB-42866](https://cockroachlabs.atlassian.net/browse/CRDB-42866)

Release note: None

138243: changefeedccl: fix PTS test r=stevendanna a=asg0451

Fix failing TestPTSRecordProtectsTargetsAndSystemTables test

Fixes: #135639
Fixes: #138066
Fixes: #137885
Fixes: #137505
Fixes: #136396
Fixes: #135805
Fixes: #135639

Release note: None

138697: crosscluster: add crdb_route parameter for LDR and PCR r=jeffswenson a=jeffswenson

The `crdb_route` query parameter determines how the destination
cluster's stream processor connects to the source cluster. There are two
options for the query parameter: "node" and "gateway". Here is an
example of using the route paraemeter to create an external connection
that is usable for LDR or PCR.

```SQL
-- A connection that routes all replication traffic via the configured
-- connection URI.
CREATE EXTERNAL CONNECTION 'external://source-db' AS
'postgresql://user:[email protected]:26257/sslmode=verify-full&crdb_route=gateway'

-- A connection that enumerates nodes in the source cluster and connects
-- directly to nodes.
CREATE EXTERNAL CONNECTION 'external://source-db' AS
'postgresql://user:[email protected]:26257/sslmode=verify-full&crdb_route=node'
```

The "node" option is the original and default behavior. The "node"
option requires the source and destination clusters to be in the same IP
network. The way it works is the connection string supplied to LDR and
PCR is used to connect to the source cluster and generate a physical sql
plan for the replication. The physical plan includes the
`--sql-addvertise-addr` for nodes in the source cluster and processors
in the destination cluster connect directly to the nodes. Using the
"node" routing is ideal because there are no extra network hops and the
source cluster can control how load is distributed across its nodes.

The "gateway" option is a new option that is introduced in order to
support routing PCR and LDR over a load balancer. When specified, the
destination cluster ignores the node addresses returned by the physical
plan and instead opens a connection for each processor to the URI
supplied by the user. This introduces an extra network hop and does not
distribute load as evenly, but it works in deployments where the source
cluster is only reachable over a load balancer.

Routing over a load balancer only requires changing the destination
clusters behavior. Nodes in the source cluster were always implemented to
act as a gateway and serve rangefeeds that are backed by data stored on
different nodes. This support exists so that the cross cluster
replication does not need to re-plan every time a range moves to a
different node.

Release note (sql change): LDR and PCR may use the `crdb_route=gateway`
query option to route the replication streams over a load balancer.

Epic: [CRDB-40896](https://cockroachlabs.atlassian.net/browse/CRDB-40896)

138877: opt: reduce allocations when filtering histogram buckets r=mgartner a=mgartner

`cat.HistogramBuckets` are now returned and passed by value in
`getFilteredBucket` and `(*Histogram).addBucket`, respectively,
eliminating some heap allocations.

Also, two allocations when building spans from buckets via the
`spanBuilder` have been combined into one. The new `(*spanBuilder).init`
method simplifies the API by no longer requiring that prefix datums are
passed to every invocation of `makeSpanFromBucket`. This also reduces
redundant copying of the prefix.

Epic: None

Release note: None


139029: sql/logictest: disable column family mutations in some cases r=mgartner a=mgartner

Random column family mutations are now disabled for `CREATE TABLE`
statements with unique, hash-sharded indexes. This prevents the AST
from being reserialized with a `UNIQUE` constraint with invalid options,
instead of the original `UNIQUE INDEX`. See #65929 and #107398.

Epic: None

Release note: None


139036: testutils,kvserver: add StartExecTrace and adopt in TestPromoteNonVoterInAddVoter r=tbg a=tbg

Every now and then we end up with tests that fail every once in a blue moon, and we can't reproduce at will.
#138864 was one of them, and execution traces helped a great deal.

This PR introduces a helper for unit tests that execution traces the test and keeps the trace on failure, and adopts it for one of these pesky unit tests.

The trace contains the goroutine ID in the filename. Additionally, the test's main goroutine is marked via a trace region. Sample below:

<img width="1226" alt="image" src="https://github.com/user-attachments/assets/3f641c28-64f7-4fba-9267-ddd48d8dda03" />

Closes #134383.

Epic: None
Release note: None


Co-authored-by: Aerin Freilich <[email protected]>
Co-authored-by: Miles Frankel <[email protected]>
Co-authored-by: Jeff Swenson <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
6 people committed Jan 14, 2025
7 parents 244d979 + 7d9b214 + 812ae98 + 77b879c + 34e39c0 + f7a3d5d + c8d3af7 commit 9d7b5cb
Show file tree
Hide file tree
Showing 43 changed files with 790 additions and 365 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ go_test(
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigjob",
"//pkg/spanconfig/spanconfigptsreader",
"//pkg/sql",
"//pkg/sql/catalog",
Expand Down
72 changes: 62 additions & 10 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,53 @@ import (
"github.com/cockroachdb/errors"
)

type ChangefeedOption struct {
FullTableName bool
Format string
KeyInValue bool
}

func newChangefeedOption(testName string) ChangefeedOption {
isCloudstorage := strings.Contains(testName, "cloudstorage")
isWebhook := strings.Contains(testName, "webhook")
cfo := ChangefeedOption{
FullTableName: rand.Intn(2) < 1,

// Because key_in_value is on by default for cloudstorage and webhook sinks,
// the key in the value is extracted and removed from the test feed
// messages (see extractKeyFromJSONValue function).
// TODO: (#138749) enable testing key_in_value for cloudstorage
// and webhook sinks
KeyInValue: !isCloudstorage && !isWebhook && rand.Intn(2) < 1,
Format: "json",
}

if isCloudstorage && rand.Intn(2) < 1 {
cfo.Format = "parquet"
}

return cfo
}

func (co ChangefeedOption) String() string {
return fmt.Sprintf("full_table_name=%t,key_in_value=%t,format=%s",
co.FullTableName, co.KeyInValue, co.Format)
}

func (cfo ChangefeedOption) OptionString() string {
options := ""
if cfo.Format == "parquet" {
options = ", format=parquet"
}
if cfo.FullTableName {
options = options + ", full_table_name"
}
if cfo.KeyInValue {
options = options + ", key_in_value"
}
return options
}

type NemesesOption struct {
EnableFpValidator bool
EnableSQLSmith bool
Expand All @@ -36,7 +83,8 @@ var NemesesOptions = []NemesesOption{
}

func (no NemesesOption) String() string {
return fmt.Sprintf("fp_validator=%t,sql_smith=%t", no.EnableFpValidator, no.EnableSQLSmith)
return fmt.Sprintf("fp_validator=%t,sql_smith=%t",
no.EnableFpValidator, no.EnableSQLSmith)
}

// RunNemesis runs a jepsen-style validation of whether a changefeed meets our
Expand All @@ -50,8 +98,7 @@ func (no NemesesOption) String() string {
func RunNemesis(
f TestFeedFactory,
db *gosql.DB,
isSinkless bool,
isCloudstorage bool,
testName string,
withLegacySchemaChanger bool,
rng *rand.Rand,
nOp NemesesOption,
Expand All @@ -69,6 +116,8 @@ func RunNemesis(
ctx := context.Background()

eventPauseCount := 10

isSinkless := strings.Contains(testName, "sinkless")
if isSinkless {
// Disable eventPause for sinkless changefeeds because we currently do not
// have "correct" pause and unpause mechanisms for changefeeds that aren't
Expand Down Expand Up @@ -199,11 +248,13 @@ func RunNemesis(
}
}

withFormatParquet := ""
if isCloudstorage && rand.Intn(2) < 1 {
withFormatParquet = ", format=parquet"
}
foo, err := f.Feed(fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff %s`, withFormatParquet))
cfo := newChangefeedOption(testName)
changefeedStatement := fmt.Sprintf(
`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff%s`,
cfo.OptionString(),
)
log.Infof(ctx, "Using changefeed options: %s", changefeedStatement)
foo, err := f.Feed(changefeedStatement)
if err != nil {
return nil, err
}
Expand All @@ -218,7 +269,8 @@ func RunNemesis(
if _, err := db.Exec(createFprintStmtBuf.String()); err != nil {
return nil, err
}
baV, err := NewBeforeAfterValidator(db, `foo`)

baV, err := NewBeforeAfterValidator(db, `foo`, cfo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -817,7 +869,7 @@ func noteFeedMessage(a fsm.Args) error {
}
ns.availableRows--
log.Infof(a.Ctx, "%s->%s", m.Key, m.Value)
return ns.v.NoteRow(m.Partition, string(m.Key), string(m.Value), ts)
return ns.v.NoteRow(m.Partition, string(m.Key), string(m.Value), ts, m.Topic)
}
}
}
Expand Down
65 changes: 55 additions & 10 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// guarantees in a single table.
type Validator interface {
// NoteRow accepts a changed row entry.
NoteRow(partition string, key, value string, updated hlc.Timestamp) error
NoteRow(partition, key, value string, updated hlc.Timestamp, topic string) error
// NoteResolved accepts a resolved timestamp entry.
NoteResolved(partition string, resolved hlc.Timestamp) error
// Failures returns any violations seen so far.
Expand Down Expand Up @@ -64,7 +64,7 @@ var _ StreamValidator = &orderValidator{}
type noOpValidator struct{}

// NoteRow accepts a changed row entry.
func (v *noOpValidator) NoteRow(string, string, string, hlc.Timestamp) error { return nil }
func (v *noOpValidator) NoteRow(string, string, string, hlc.Timestamp, string) error { return nil }

// NoteResolved accepts a resolved timestamp entry.
func (v *noOpValidator) NoteResolved(string, hlc.Timestamp) error { return nil }
Expand Down Expand Up @@ -125,7 +125,9 @@ func (v *orderValidator) GetValuesForKeyBelowTimestamp(
}

// NoteRow implements the Validator interface.
func (v *orderValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
func (v *orderValidator) NoteRow(
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
if prev, ok := v.partitionForKey[key]; ok && prev != partition {
v.failures = append(v.failures, fmt.Sprintf(
`key [%s] received on two partitions: %s and %s`, key, prev, partition,
Expand Down Expand Up @@ -189,14 +191,18 @@ type beforeAfterValidator struct {
table string
primaryKeyCols []string
resolved map[string]hlc.Timestamp
fullTableName bool
keyInValue bool

failures []string
}

// NewBeforeAfterValidator returns a Validator verifies that the "before" and
// "after" fields in each row agree with the source table when performing AS OF
// SYSTEM TIME lookups before and at the row's timestamp.
func NewBeforeAfterValidator(sqlDB *gosql.DB, table string) (Validator, error) {
func NewBeforeAfterValidator(
sqlDB *gosql.DB, table string, option ChangefeedOption,
) (Validator, error) {
primaryKeyCols, err := fetchPrimaryKeyCols(sqlDB, table)
if err != nil {
return nil, errors.Wrap(err, "fetchPrimaryKeyCols failed")
Expand All @@ -205,15 +211,30 @@ func NewBeforeAfterValidator(sqlDB *gosql.DB, table string) (Validator, error) {
return &beforeAfterValidator{
sqlDB: sqlDB,
table: table,
fullTableName: option.FullTableName,
keyInValue: option.KeyInValue,
primaryKeyCols: primaryKeyCols,
resolved: make(map[string]hlc.Timestamp),
}, nil
}

// NoteRow implements the Validator interface.
func (v *beforeAfterValidator) NoteRow(
partition string, key, value string, updated hlc.Timestamp,
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
if v.fullTableName {
if topic != fmt.Sprintf(`d.public.%s`, v.table) {
v.failures = append(v.failures, fmt.Sprintf(
"topic %s does not match expected table d.public.%s", topic, v.table,
))
}
} else {
if topic != v.table {
v.failures = append(v.failures, fmt.Sprintf(
"topic %s does not match expected table %s", topic, v.table,
))
}
}
keyJSON, err := json.ParseJSON(key)
if err != nil {
return err
Expand All @@ -230,6 +251,26 @@ func (v *beforeAfterValidator) NoteRow(
return err
}

if v.keyInValue {
keyString := keyJSON.String()
keyInValueJSON, err := valueJSON.FetchValKey("key")
if err != nil {
return err
}

if keyInValueJSON == nil {
v.failures = append(v.failures, fmt.Sprintf(
"no key in value, expected key value %s", keyString))
} else {
keyInValueString := keyInValueJSON.String()
if keyInValueString != keyString {
v.failures = append(v.failures, fmt.Sprintf(
"key in value %s does not match expected key value %s",
keyInValueString, keyString))
}
}
}

afterJSON, err := valueJSON.FetchValKey("after")
if err != nil {
return err
Expand Down Expand Up @@ -451,7 +492,7 @@ func (v *FingerprintValidator) DBFunc(

// NoteRow implements the Validator interface.
func (v *FingerprintValidator) NoteRow(
ignoredPartition string, key, value string, updated hlc.Timestamp,
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
if v.firstRowTimestamp.IsEmpty() || updated.Less(v.firstRowTimestamp) {
v.firstRowTimestamp = updated
Expand Down Expand Up @@ -663,9 +704,11 @@ func (v *FingerprintValidator) Failures() []string {
type Validators []Validator

// NoteRow implements the Validator interface.
func (vs Validators) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
func (vs Validators) NoteRow(
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
for _, v := range vs {
if err := v.NoteRow(partition, key, value, updated); err != nil {
if err := v.NoteRow(partition, key, value, updated, topic); err != nil {
return err
}
}
Expand Down Expand Up @@ -707,10 +750,12 @@ func NewCountValidator(v Validator) *CountValidator {
}

// NoteRow implements the Validator interface.
func (v *CountValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
func (v *CountValidator) NoteRow(
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
v.NumRows++
v.rowsSinceResolved++
return v.v.NoteRow(partition, key, value, updated)
return v.v.NoteRow(partition, key, value, updated, topic)
}

// NoteResolved implements the Validator interface.
Expand Down
Loading

0 comments on commit 9d7b5cb

Please sign in to comment.