diff --git a/src/main/java/io/nats/client/ForceReconnectOptions.java b/src/main/java/io/nats/client/ForceReconnectOptions.java index f0eb40344..1c9554537 100644 --- a/src/main/java/io/nats/client/ForceReconnectOptions.java +++ b/src/main/java/io/nats/client/ForceReconnectOptions.java @@ -24,17 +24,23 @@ public class ForceReconnectOptions { public static final ForceReconnectOptions FORCE_CLOSE_INSTANCE = ForceReconnectOptions.builder().forceClose().build(); private final boolean forceClose; + private final boolean fairServerDistribution; private final Duration flushWait; private ForceReconnectOptions(Builder b) { this.forceClose = b.forceClose; this.flushWait = b.flushWait; + this.fairServerDistribution = b.fairServerDistribution; } public boolean isForceClose() { return forceClose; } + public boolean isFairServerDistribution() { + return fairServerDistribution; + } + public boolean isFlush() { return flushWait != null; } @@ -56,6 +62,7 @@ public static Builder builder() { */ public static class Builder { boolean forceClose = false; + boolean fairServerDistribution = false; Duration flushWait; /** @@ -68,6 +75,17 @@ public Builder forceClose() { return this; } + /** + * When disabled, client will never try to force reconnect to currently connected server. + * If enabled, list of available servers will be shuffled, so ServerPool#nextServer might return currently + * connected server. + * @return the builder + */ + public Builder fairServerDistribution() { + this.fairServerDistribution = true; + return this; + } + /** * @param flushWait if supplied and at least 1 millisecond, the forceReconnect will try to * flush before closing for the specified wait time. Flush happens before close diff --git a/src/main/java/io/nats/client/ServerPool.java b/src/main/java/io/nats/client/ServerPool.java index c620308e0..5cb035214 100644 --- a/src/main/java/io/nats/client/ServerPool.java +++ b/src/main/java/io/nats/client/ServerPool.java @@ -78,4 +78,9 @@ public interface ServerPool { * @return the flag */ boolean hasSecureServer(); + + /** + * Randomly shuffle list of servers, so that `nextServer` returns arbitrary server from pool + */ + void shuffle(); } diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 8d8fd44f5..bdc50ee2d 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -336,6 +336,9 @@ void forceReconnectImpl(ForceReconnectOptions options) throws InterruptedExcepti closeSocketLock.unlock(); } + if (options != null && options.isFairServerDistribution()) { + serverPool.shuffle(); + } try { // calling connect just starts like a new connection versus reconnect // but we have to manually resubscribe like reconnect once it is connected diff --git a/src/main/java/io/nats/client/impl/NatsServerPool.java b/src/main/java/io/nats/client/impl/NatsServerPool.java index 6b2a6ae78..ff9376ef5 100644 --- a/src/main/java/io/nats/client/impl/NatsServerPool.java +++ b/src/main/java/io/nats/client/impl/NatsServerPool.java @@ -298,6 +298,11 @@ public boolean hasSecureServer() { return hasSecureServer; } + @Override + public void shuffle() { + Collections.shuffle(entryList, ThreadLocalRandom.current()); + } + protected int findEquivalent(List list, NatsUri toFind) { for (int i = 0; i < list.size(); i++) { NatsUri nuri = list.get(i); diff --git a/src/test/java/io/nats/client/impl/ReconnectTests.java b/src/test/java/io/nats/client/impl/ReconnectTests.java index 7ae89ef17..ff6ac742a 100644 --- a/src/test/java/io/nats/client/impl/ReconnectTests.java +++ b/src/test/java/io/nats/client/impl/ReconnectTests.java @@ -721,6 +721,30 @@ public void testForceReconnect() throws Exception { runInJsCluster(tstOpts, (nc0, nc1, nc2) -> _testForceReconnect(nc0, listener)); } + @Test + void testForceReconnectWithFairDistribution() throws Exception { + ListenerForTesting listener = new ListenerForTesting(); + ThreeServerTestOptions tstOpts = makeThreeServerTestOptions(listener, false); + runInJsCluster(tstOpts, (nc0, nc1, nc2) -> { + for (int i = 0; i < 10; i++) { + listener.getConnectionEvents().clear(); + ServerInfo si = nc0.getServerInfo(); + String connectedServer = si.getServerId(); + + nc0.forceReconnect(ForceReconnectOptions.builder().fairServerDistribution().build()); + standardConnectionWait(nc0); + + si = nc0.getServerInfo(); + assertTrue(listener.getConnectionEvents().contains(Events.DISCONNECTED)); + assertTrue(listener.getConnectionEvents().contains(Events.RECONNECTED)); + if (connectedServer.equals(si.getServerId())) { + return; + } + } + fail("Expected that client will reconnect to the same server at least once"); + }); + } + @Test public void testForceReconnectWithAccount() throws Exception { ListenerForTesting listener = new ListenerForTesting(); @@ -774,23 +798,24 @@ public void testForceReconnectQueueBehaviorCheck() throws Exception { String subject = subject(); ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject; - _testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, 0); + _testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, true, 0); subject = subject(); ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject; - _testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, flushWait); + _testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, false, flushWait); subject = subject(); ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject; - _testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, 0); + _testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, false, 0); subject = subject(); ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject; - _testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, flushWait); + _testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, true, flushWait); }); } - private static void _testForceReconnectQueueCheck(String subject, int pubCount, int subscribeTime, int port, boolean forceClose, int flushWait) throws InterruptedException { + private static void _testForceReconnectQueueCheck(String subject, int pubCount, int subscribeTime, int port, + boolean forceClose, boolean fairDistribution, int flushWait) throws InterruptedException { ReconnectQueueCheckSubscriber subscriber = new ReconnectQueueCheckSubscriber(subject, pubCount, port); Thread tsub = new Thread(subscriber); tsub.start(); @@ -802,6 +827,9 @@ private static void _testForceReconnectQueueCheck(String subject, int pubCount, if (forceClose) { froBuilder.forceClose(); } + if (fairDistribution) { + froBuilder.fairServerDistribution(); + } ReconnectQueueCheckConnectionListener listener = new ReconnectQueueCheckConnectionListener(); diff --git a/src/test/java/io/nats/client/utils/CoverageServerPool.java b/src/test/java/io/nats/client/utils/CoverageServerPool.java index 4be92f84b..7f75e9abf 100644 --- a/src/test/java/io/nats/client/utils/CoverageServerPool.java +++ b/src/test/java/io/nats/client/utils/CoverageServerPool.java @@ -64,4 +64,8 @@ public List getServerList() { public boolean hasSecureServer() { return false; } + + @Override + public void shuffle() { + } }