Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
132967: schemachanger: add support discarding database/table zc r=rafiss a=annrpom

### scbuild: version gate SetZoneConfig with isV243Active
New support was added in 24.3 in `SetZoneConfig` for subzone
configs. Since this work was a significant change, we should
gate `SetZoneConfig` to minimize the risk of compatibility
issues with earlier versions.

Epic: none

Release note: None

---

### schemachanger: re-enable dsc zone config
Since 24.3 has been cut, we can turn zone configs
on in the declarative schema changer by default.

Epic: none

Release note: None

---

### scexec: extend UpdateZoneConfig to allow for deletes
This patch extends UpdateZoneConfig to allow for deletes in
the system.zones table.

Epic: None

Release note: None

---

### scbuildstmt: refactor zoneConfigObjBuilder method
This patch refactors the `addZoneConfigToBuildCtx` method
of our `zoneConfigObjBuilder` to be less specific; we pull
the `b.add` out so that we can decide in `SetZoneConfig`
whether to add or drop this element.

Epic: none

Release note: None

---

### schemachanger: add support discarding database/table zc
This patch supports discarding a database/table zone config
in the declarative schema changer.

Informs: #133157

Release note: None

133725: kvserver/rangefeed: add metric for processor-level send timeout r=wenyihu6a a=stevendanna

Epic: none
Release note: None

133848: bazel-github-helper: sort list of failed tests in summary r=rail a=rickystewart

Epic: CRDB-17171
Release note: None
Release justification: Non-production code changes

133874: raft: don't panic on defortifying snapshot from new term r=nvanbenschoten a=nvanbenschoten

Informs #132762.

This commit fixes a bug introduced by 58a9f53. Now that we no longer assume that snapshots are coming from the leader, we were not defortifying the raft leadership when receiving a snapshot. This meant that if a snapshot came from a new term, we would hit an assertion failure, which the new test reproduces.

This commit addresses this bug by distinguishing between messages that are always sent from the leader of the message's term and messages that indicate that there is a new leader of the message's term, even if the message is not from the leader itself.

This is temporary, and can be removed when we address #127349.

Release note: None

133878: revert "logictest: deflake TestLogic_union" r=yuzefovich a=yuzefovich

This reverts commit 6218f23.

I don't think this commit did anything to remove or reduce flakiness in the union logic test because the error occurs on the SELECT query _after_ the DDL and DML statements modified by this commit. Reverting it to allow for cleaner backports.

Epic: None
Release note: None

Co-authored-by: Annie Pompa <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
6 people committed Oct 30, 2024
6 parents a94d017 + 7aba686 + 3bf870a + aa453b8 + 8105a38 + 6db8137 commit ef26d15
Show file tree
Hide file tree
Showing 36 changed files with 706 additions and 76 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@
<tr><td>STORAGE</td><td>kv.rangefeed.processors_goroutine</td><td>Number of active RangeFeed processors using goroutines</td><td>Processors</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.processors_scheduler</td><td>Number of active RangeFeed processors using scheduler</td><td>Processors</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.registrations</td><td>Number of active RangeFeed registrations</td><td>Registrations</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.scheduled_processor.queue_timeout</td><td>Number of times the RangeFeed processor shutdown because of a queue send timeout</td><td>Failure Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.scheduler.normal.latency</td><td>KV RangeFeed normal scheduler latency</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.scheduler.normal.queue_size</td><td>Number of entries in the KV RangeFeed normal scheduler queue</td><td>Pending Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.scheduler.system.latency</td><td>KV RangeFeed system scheduler latency</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
Expand Down
28 changes: 28 additions & 0 deletions pkg/ccl/schemachangerccl/backup_base_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions pkg/cmd/bazci/bazel-github-helper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"os"
"os/exec"
"sort"
"strings"

"github.com/cockroachdb/cockroach/pkg/build/engflow"
Expand Down Expand Up @@ -182,6 +183,18 @@ func dumpSummary(f *os.File, invocation *engflow.InvocationInfo) error {
}
}

sort.Slice(failedTests, func(i, j int) bool {
t1 := failedTests[i]
t2 := failedTests[2]
if t1.label < t2.label {
return true
} else if t1.label == t2.label {
return t1.name < t2.name
} else {
return false
}
})

if len(failedTests) != 0 {
_, err := f.WriteString(`| Label | TestName | Status | Link |
| --- | --- | --- | --- |
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/rangefeed/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,19 @@ var (
Measurement: "Pending Ranges",
Unit: metric.Unit_COUNT,
}
metaQueueTimeout = metric.Metadata{
Name: "kv.rangefeed.scheduled_processor.queue_timeout",
Help: "Number of times the RangeFeed processor shutdown because of a queue send timeout",
Measurement: "Failure Count",
Unit: metric.Unit_COUNT,
}
)

// Metrics are for production monitoring of RangeFeeds.
type Metrics struct {
RangeFeedCatchUpScanNanos *metric.Counter
RangeFeedBudgetExhausted *metric.Counter
RangefeedProcessorQueueTimeout *metric.Counter
RangeFeedBudgetBlocked *metric.Counter
RangeFeedRegistrations *metric.Gauge
RangeFeedClosedTimestampMaxBehindNanos *metric.Gauge
Expand All @@ -106,6 +113,7 @@ func (*Metrics) MetricStruct() {}
func NewMetrics() *Metrics {
return &Metrics{
RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos),
RangefeedProcessorQueueTimeout: metric.NewCounter(metaQueueTimeout),
RangeFeedBudgetExhausted: metric.NewCounter(metaRangeFeedExhausted),
RangeFeedBudgetBlocked: metric.NewCounter(metaRangeFeedBudgetBlocked),
RangeFeedRegistrations: metric.NewGauge(metaRangeFeedRegistrations),
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ func (p *ScheduledProcessor) enqueueEventInternal(
case <-p.stoppedC:
// Already stopped. Do nothing.
case <-ctx.Done():
p.Metrics.RangefeedProcessorQueueTimeout.Inc(1)
p.sendStop(newErrBufferCapacityExceeded())
return false
}
Expand Down Expand Up @@ -528,6 +529,7 @@ func (p *ScheduledProcessor) enqueueEventInternal(
case <-ctx.Done():
// Sending on the eventC channel would have blocked.
// Instead, tear down the processor and return immediately.
p.Metrics.RangefeedProcessorQueueTimeout.Inc(1)
p.sendStop(newErrBufferCapacityExceeded())
return false
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,8 +1065,7 @@ func (r *raft) setTerm(term uint64) {
assertTrue(term > r.Term, "term cannot regress")
r.Term = term
r.Vote = None
r.lead = None
r.leadEpoch = 0
r.resetLead()
}

func (r *raft) setVote(id pb.PeerID) {
Expand All @@ -1087,7 +1086,7 @@ func (r *raft) setLead(lead pb.PeerID) {

func (r *raft) resetLead() {
r.lead = None
r.leadEpoch = 0
r.resetLeadEpoch()
}

func (r *raft) setLeadEpoch(leadEpoch pb.Epoch) {
Expand Down Expand Up @@ -1537,13 +1536,21 @@ func (r *raft) Step(m pb.Message) error {
default:
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
if IsMsgFromLeader(m.Type) {
// We've just received a message from a leader which was elected
// at a higher term. The old leader is no longer fortified, so it's
// safe to de-fortify at this point.
if IsMsgIndicatingLeader(m.Type) {
// We've just received a message that indicates that a new leader
// was elected at a higher term, but the message may not be from the
// leader itself. Either way, the old leader is no longer fortified,
// so it's safe to de-fortify at this point.
r.deFortify(m.From, m.Term)
r.becomeFollower(m.Term, m.From)
var lead pb.PeerID
if IsMsgFromLeader(m.Type) {
lead = m.From
}
r.becomeFollower(m.Term, lead)
} else {
// We've just received a message that does not indicate that a new
// leader was elected at a higher term. All it means is that some
// other peer has this term.
r.becomeFollower(m.Term, None)
}
}
Expand Down
155 changes: 155 additions & 0 deletions pkg/raft/testdata/snapshot_new_term.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Test that creates a scenario where a peer learns about a new leadership term
# via a snapshot.

log-level none
----
ok

add-nodes 3 voters=(1,2,3) index=10
----
ok

# Elect 1 as leader.
campaign 1
----
ok

stabilize
----
ok

log-level debug
----
ok

raft-state
----
1: StateLeader (Voter) Term:1 Lead:1 LeadEpoch:1
2: StateFollower (Voter) Term:1 Lead:1 LeadEpoch:1
3: StateFollower (Voter) Term:1 Lead:1 LeadEpoch:1

# Transfer leadership to 2, without 3 hearing about it.
transfer-leadership from=1 to=2
----
INFO 1 [term 1] starts to transfer leadership to 2
INFO 1 sends MsgTimeoutNow to 2 immediately as 2 already has up-to-date log
INFO 1 became follower at term 1

stabilize 1 2
----
> 1 handling Ready
Ready MustSync=false:
State:StateFollower
Messages:
1->2 MsgTimeoutNow Term:1 Log:0/0
> 2 receiving messages
1->2 MsgTimeoutNow Term:1 Log:0/0
INFO 2 [term 1] received MsgTimeoutNow from 1 and starts an election to get leadership
INFO 2 is starting a new election at term 1
INFO 2 became candidate at term 2
INFO 2 [logterm: 1, index: 11] sent MsgVote request to 1 at term 2
INFO 2 [logterm: 1, index: 11] sent MsgVote request to 3 at term 2
> 2 handling Ready
Ready MustSync=true:
State:StateCandidate
HardState Term:2 Vote:2 Commit:11 Lead:0 LeadEpoch:0
Messages:
2->1 MsgVote Term:2 Log:1/11
2->3 MsgVote Term:2 Log:1/11
INFO 2 received MsgVoteResp from 2 at term 2
INFO 2 has received 1 MsgVoteResp votes and 0 vote rejections
> 1 receiving messages
2->1 MsgVote Term:2 Log:1/11
INFO 1 [term: 1] received a MsgVote message with higher term from 2 [term: 2]
INFO 1 became follower at term 2
INFO 1 [logterm: 1, index: 11, vote: 0] cast MsgVote for 2 [logterm: 1, index: 11] at term 2
> 1 handling Ready
Ready MustSync=true:
HardState Term:2 Vote:2 Commit:11 Lead:0 LeadEpoch:0
Messages:
1->2 MsgVoteResp Term:2 Log:0/0
> 2 receiving messages
1->2 MsgVoteResp Term:2 Log:0/0
INFO 2 received MsgVoteResp from 1 at term 2
INFO 2 has received 2 MsgVoteResp votes and 0 vote rejections
INFO 2 became leader at term 2
> 2 handling Ready
Ready MustSync=true:
State:StateLeader
HardState Term:2 Vote:2 Commit:11 Lead:2 LeadEpoch:1
Entries:
2/12 EntryNormal ""
Messages:
2->1 MsgFortifyLeader Term:2 Log:0/0
2->3 MsgFortifyLeader Term:2 Log:0/0
2->1 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
2->3 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
> 1 receiving messages
2->1 MsgFortifyLeader Term:2 Log:0/0
2->1 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
> 1 handling Ready
Ready MustSync=true:
HardState Term:2 Vote:2 Commit:11 Lead:2 LeadEpoch:1
Entries:
2/12 EntryNormal ""
Messages:
1->2 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:1
1->2 MsgAppResp Term:2 Log:0/12 Commit:11
> 2 receiving messages
1->2 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:1
1->2 MsgAppResp Term:2 Log:0/12 Commit:11
> 2 handling Ready
Ready MustSync=true:
HardState Term:2 Vote:2 Commit:12 Lead:2 LeadEpoch:1
CommittedEntries:
2/12 EntryNormal ""
Messages:
2->1 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
2->1 MsgApp Term:2 Log:2/12 Commit:12
> 1 receiving messages
2->1 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
2->1 MsgApp Term:2 Log:2/12 Commit:12
> 1 handling Ready
Ready MustSync=true:
HardState Term:2 Vote:2 Commit:12 Lead:2 LeadEpoch:1
CommittedEntries:
2/12 EntryNormal ""
Messages:
1->2 MsgAppResp Term:2 Log:0/12 Commit:11
1->2 MsgAppResp Term:2 Log:0/12 Commit:12
> 2 receiving messages
1->2 MsgAppResp Term:2 Log:0/12 Commit:11
1->2 MsgAppResp Term:2 Log:0/12 Commit:12

# Drop inflight messages to 3.
deliver-msgs drop=(3)
----
dropped: 2->3 MsgVote Term:2 Log:1/11
dropped: 2->3 MsgFortifyLeader Term:2 Log:0/0
dropped: 2->3 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]

# Send a manual snapshot from 2 to 3, which will be at term 2.
send-snapshot 2 3
----
2->3 MsgSnap Term:2 Log:0/0
Snapshot: Index:12 Term:2 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false

stabilize
----
> 3 receiving messages
2->3 MsgSnap Term:2 Log:0/0
Snapshot: Index:12 Term:2 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
INFO 3 [term: 1] received a MsgSnap message with higher term from 2 [term: 2]
INFO 3 became follower at term 2
INFO log [committed=11, applied=11, applying=11, unstable.offset=12, unstable.offsetInProgress=12, len(unstable.Entries)=0] starts to restore snapshot [index: 12, term: 2]
INFO 3 switched to configuration voters=(1 2 3)
INFO 3 [commit: 12, lastindex: 12, lastterm: 2] restored snapshot [index: 12, term: 2]
INFO 3 [commit: 12] restored snapshot [index: 12, term: 2]
> 3 handling Ready
Ready MustSync=true:
HardState Term:2 Commit:12 Lead:0 LeadEpoch:0
Snapshot Index:12 Term:2 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
Messages:
3->2 MsgAppResp Term:2 Log:0/12 Commit:12
> 2 receiving messages
3->2 MsgAppResp Term:2 Log:0/12 Commit:12
20 changes: 20 additions & 0 deletions pkg/raft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var isResponseMsg = [...]bool{
pb.MsgFortifyLeaderResp: true,
}

// isMsgFromLeader contains message types that come from the leader of the
// message's term.
var isMsgFromLeader = [...]bool{
pb.MsgApp: true,
// TODO(nvanbenschoten): we can't consider MsgSnap to be from the leader of
Expand All @@ -60,6 +62,20 @@ var isMsgFromLeader = [...]bool{
pb.MsgDeFortifyLeader: true,
}

// isMsgIndicatingLeader contains message types that indicate that there is a
// leader at the message's term, even if the message is not from the leader
// itself.
//
// TODO(nvanbenschoten): remove this when we address the TODO above.
var isMsgIndicatingLeader = [...]bool{
pb.MsgApp: true,
pb.MsgSnap: true,
pb.MsgHeartbeat: true,
pb.MsgTimeoutNow: true,
pb.MsgFortifyLeader: true,
pb.MsgDeFortifyLeader: true,
}

func isMsgInArray(msgt pb.MessageType, arr []bool) bool {
i := int(msgt)
return i < len(arr) && arr[i]
Expand All @@ -77,6 +93,10 @@ func IsMsgFromLeader(msgt pb.MessageType) bool {
return isMsgInArray(msgt, isMsgFromLeader[:])
}

func IsMsgIndicatingLeader(msgt pb.MessageType) bool {
return isMsgInArray(msgt, isMsgIndicatingLeader[:])
}

func IsLocalMsgTarget(id pb.PeerID) bool {
return id == LocalAppendThread || id == LocalApplyThread
}
Expand Down
Loading

0 comments on commit ef26d15

Please sign in to comment.