Skip to content

Commit

Permalink
- specs: more flaky/stuck tests
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Jan 8, 2025
1 parent abe484c commit ce29b7e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 117 deletions.
174 changes: 59 additions & 115 deletions spec/client_cluster_reconnect_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@
it "should connect to another server if possible before reconnect" do
@s3.kill_server

mon = Monitor.new
reconnected = mon.new_cond
reconnected = Future.new

nats = NATS.connect(servers: [@s1.uri, @s2.uri], dont_randomize_servers: true)

Expand All @@ -100,9 +99,7 @@
reconnects = 0
nats.on_reconnect do
reconnects += 1
mon.synchronize do
reconnected.signal
end
reconnected.set_result(:ok)
end

msgs = []
Expand All @@ -119,9 +116,7 @@
sleep 0.1
end

mon.synchronize do
reconnected.wait(1)
end
expect(reconnected.wait_for(1)).to eq :ok
expect(nats.connected_server).to eql(@s2.uri)
nats.close

Expand All @@ -133,8 +128,7 @@
it "should connect to another server if possible before reconnect using multiple uris" do
@s3.kill_server

mon = Monitor.new
reconnected = mon.new_cond
reconnected = Future.new

nats = NATS::IO::Client.new
nats.connect("nats://secret:[email protected]:4242,nats://secret:[email protected]:4243", dont_randomize_servers: true)
Expand All @@ -152,9 +146,7 @@
reconnects = 0
nats.on_reconnect do
reconnects += 1
mon.synchronize do
reconnected.signal
end
reconnected.set_result(:ok)
end

msgs = []
Expand All @@ -171,9 +163,7 @@
sleep 0.1
end

mon.synchronize do
reconnected.wait(1)
end
expect(reconnected.wait_for(1)).to eq :ok
expect(nats.connected_server.to_s).to eql(@s2.uri.to_s)
nats.close

Expand All @@ -185,8 +175,7 @@
it "should gracefully reconnect to another available server while publishing" do
@s3.kill_server

mon = Monitor.new
reconnected = mon.new_cond
reconnected = Future.new

nats = NATS::IO::Client.new
nats.connect({
Expand All @@ -207,56 +196,40 @@
reconnects = 0
nats.on_reconnect do |s|
reconnects += 1
mon.synchronize do
reconnected.signal
end
reconnected.set_result(:ok)
end

errors = []
nats.on_error do |e|
errors << e
end

msgs = []
msg_counter = 0
nats.subscribe("hello.*") do |msg|
msgs << msg
msg_counter += 1
if msg_counter == 100
@s1.kill_server
end
end
nats.flush
expect(nats.connected_server.to_s).to eql(@s1.uri.to_s)

msg_payload = "A" * 10_000
1000.times do |n|
# Receive 100 messages initially and then failover
if n == 100
nats.flush

# Wait a bit for all messages
sleep 0.5
expect(msgs.count).to eql(100)
@s1.kill_server
elsif n % 100 == 0
# yield a millisecond
sleep 0.001
end

# Messages sent here can be lost
msg_payload = "A" * 1_000
100.times do |n|
nats.publish("hello.#{n}", msg_payload)
end

# Flush everything we have sent so far
nats.flush(5)
errors = []
errors.each do |e|
errors << e
end
mon.synchronize { reconnected.wait(1) }

expect(reconnected.wait_for(2)).to eq :ok
expect(nats.connected_server).to eql(@s2.uri)
nats.close

expect(reconnects).to eql(1)
expect(disconnects).to eql(2)
expect(closes).to eql(1)
expect(errors).to be_empty
expect(errors.size).to eq(1)
end
end

Expand All @@ -267,18 +240,20 @@
end

after do
@s1.kill_server
[@s1, @s2, @s3].each do |s|
s.kill_server
end
end

it "should reconnect to nodes discovered from seed server" do
# Nodes join to cluster before we try to connect
[@s2, @s3].each do |s|
s.start_server(true)
context "with nodes joined before first connect" do
before do
[@s2, @s3].each do |s|
s.start_server(true)
end
end

begin
mon = Monitor.new
reconnected = mon.new_cond
it "should reconnect to nodes discovered from seed server" do
reconnected = Future.new

nats = NATS::IO::Client.new
disconnects = 0
Expand All @@ -294,9 +269,7 @@
reconnects = 0
nats.on_reconnect do
reconnects += 1
mon.synchronize do
reconnected.signal
end
reconnected.set_result(:ok)
end

errors = []
Expand All @@ -309,9 +282,8 @@
expect(nats.connected_server).to eql(@s1.uri)
@s1.kill_server
sleep 0.2
mon.synchronize do
reconnected.wait(5)
end

reconnected.wait_for(3)

# Reconnected...
# expect(nats.connected_server).to eql(@s2.uri)
Expand All @@ -325,23 +297,11 @@
expect(nats.last_error).to eql(nil)

nats.close
ensure
# Wrap up test
[@s2, @s3].each do |s|
s.kill_server
end
end
end

it "should reconnect to nodes discovered from seed server with single uri" do
skip "FIXME: flaky test"

# Nodes join to cluster before we try to connect
[@s2, @s3].each do |s|
s.start_server(true)
end
it "should reconnect to nodes discovered from seed server with single uri" do
skip "FIXME: flaky test"

begin
mon = Monitor.new
reconnected = mon.new_cond

Expand Down Expand Up @@ -390,17 +350,11 @@
expect(nats.last_error).to eql(nil)

nats.close
ensure
# Wrap up test
[@s2, @s3].each do |s|
s.kill_server
end
end
end

it "should reconnect to nodes discovered in the cluster after first connect" do
mon = Monitor.new
reconnected = mon.new_cond
reconnected = Future.new

nats = NATS::IO::Client.new
disconnects = 0
Expand All @@ -416,9 +370,7 @@
reconnects = 0
nats.on_reconnect do
reconnects += 1
mon.synchronize do
reconnected.signal
end
reconnected.set_result(:ok)
end

errors = []
Expand All @@ -436,42 +388,34 @@
})
expect(nats.connected_server).to eql(@s1.uri)

begin
# Couple of servers join...
[@s2, @s3].each do |s|
s.start_server(true)
end
nats.flush
# Couple of servers join...
[@s2, @s3].each do |s|
s.start_server(true)
end
nats.flush

# Wait for a bit before disconnecting from original server
nats.flush
@s1.kill_server
mon.synchronize do
reconnected.wait(3)
end
# Wait for a bit before disconnecting from original server
nats.flush
@s1.kill_server

# We still consider the original node and we have new ones
# which can be used to failover.
expect(nats.servers.count).to eql(3)
reconnected.wait_for(3)

# Only 2 new ones should be discovered servers even after reconnect
expect(nats.discovered_servers.count).to eql(2)
expect(nats.connected_server).to eql(@s2.uri)
expect(reconnects).to eql(1)
expect(disconnects).to eql(1)
expect(closes).to eql(0)
expect(errors.count).to eql(2)
expect(errors.first).to be_a(Errno::ECONNRESET)
expect(errors.last).to be_a(Errno::ECONNREFUSED)
expect(nats.last_error).to be_a(Errno::ECONNREFUSED)
# We still consider the original node and we have new ones
# which can be used to failover.
expect(nats.servers.count).to eql(3)

nats.close
ensure
# Wrap up test
[@s2, @s3].each do |s|
s.kill_server
end
end
# Only 2 new ones should be discovered servers even after reconnect
expect(nats.discovered_servers.count).to eql(2)
expect(nats.connected_server).to eql(@s2.uri)
expect(reconnects).to eql(1)
expect(disconnects).to eql(1)
expect(closes).to eql(0)
expect(errors.count).to eql(2)
expect(errors.first).to be_a(Errno::ECONNRESET)
expect(errors.last).to be_a(Errno::ECONNREFUSED)
expect(nats.last_error).to be_a(Errno::ECONNREFUSED)

nats.close
end
end
end
4 changes: 2 additions & 2 deletions spec/client_drain_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@
# Start draining process asynchronously.
nc.drain

# Release the queue (we have 38 messages left)
# Release the queue
80.times { sub_queue.push(Future.new) }
result = future.wait_for(2)
result = future.wait_for(5)
expect(result).to eql(:closed)
expect(wait_reqs.wait_for(2)).to eql(:ok)
end
Expand Down

0 comments on commit ce29b7e

Please sign in to comment.