diff --git a/vertx-core/src/main/java/io/vertx/core/http/HttpServer.java b/vertx-core/src/main/java/io/vertx/core/http/HttpServer.java index c38fddd455c..aeccd5991c0 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/HttpServer.java +++ b/vertx-core/src/main/java/io/vertx/core/http/HttpServer.java @@ -144,12 +144,19 @@ default Future updateSSLOptions(ServerSSLOptions options) { Future updateSSLOptions(ServerSSLOptions options, boolean force); /** - * Update traffic shaping options {@code options}, the update happens if valid values are passed for traffic - * shaping options. This update happens synchronously and at best effort for rate update to take effect immediately. + *

Update the server with new traffic {@code options}, the update happens if the options object is valid and different + * from the existing options object. + * + *

The {@code options} object is compared using its {@code equals} method against the existing options to prevent + * an update when the objects are equals since loading options can be costly, this can happen for share TCP servers. + * When object are equals, setting {@code force} to {@code true} forces the update. + * + *

The boolean succeeded future result indicates whether the update occurred. * * @param options the new traffic shaping options + * @return a future signaling the update success */ - void updateTrafficShapingOptions(TrafficShapingOptions options); + Future updateTrafficShapingOptions(TrafficShapingOptions options); /** * Tell the server to start listening. The server will listen on the port and host specified in the diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java index 83a0743907f..a3c3b89b7f2 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java @@ -73,7 +73,7 @@ public Future updateSSLOptions(ServerSSLOptions options, boolean force) } @Override - public void updateTrafficShapingOptions(TrafficShapingOptions options) { + public Future updateTrafficShapingOptions(TrafficShapingOptions options) { NetServer s; synchronized (this) { s = tcpServer; @@ -81,7 +81,7 @@ public void updateTrafficShapingOptions(TrafficShapingOptions options) { if (s == null) { throw new IllegalStateException("Not listening"); } - s.updateTrafficShapingOptions(options); + return s.updateTrafficShapingOptions(options); } @Override diff --git a/vertx-core/src/main/java/io/vertx/core/net/NetServer.java b/vertx-core/src/main/java/io/vertx/core/net/NetServer.java index d2f9182e3c9..39a1dc4dcd2 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/NetServer.java +++ b/vertx-core/src/main/java/io/vertx/core/net/NetServer.java @@ -173,10 +173,17 @@ default Future updateSSLOptions(ServerSSLOptions options) { Future updateSSLOptions(ServerSSLOptions options, boolean force); /** - * Update traffic shaping options {@code options}, the update happens if valid values are passed for traffic - * shaping options. This update happens synchronously and at best effort for rate update to take effect immediately. + *

Update the server with new traffic {@code options}, the update happens if the options object is valid and different + * from the existing options object. + * + *

The {@code options} object is compared using its {@code equals} method against the existing options to prevent + * an update when the objects are equals since loading options can be costly, this can happen for share TCP servers. + * When object are equals, setting {@code force} to {@code true} forces the update. + * + *

The boolean succeeded future result indicates whether the update occurred. * * @param options the new traffic shaping options + * @return a future signaling the update success */ - void updateTrafficShapingOptions(TrafficShapingOptions options); + Future updateTrafficShapingOptions(TrafficShapingOptions options); } diff --git a/vertx-core/src/main/java/io/vertx/core/net/TrafficShapingOptions.java b/vertx-core/src/main/java/io/vertx/core/net/TrafficShapingOptions.java index e84af6c589b..86e3d1601b3 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/TrafficShapingOptions.java +++ b/vertx-core/src/main/java/io/vertx/core/net/TrafficShapingOptions.java @@ -220,4 +220,28 @@ public long getCheckIntervalForStats() { public TimeUnit getCheckIntervalForStatsTimeUnit() { return checkIntervalForStatsTimeUnit; } + + @Override + public boolean equals(Object obj) { + TrafficShapingOptions that = (TrafficShapingOptions) obj; + return inboundGlobalBandwidth == that.inboundGlobalBandwidth && + outboundGlobalBandwidth == that.outboundGlobalBandwidth && + peakOutboundGlobalBandwidth == that.peakOutboundGlobalBandwidth && + maxDelayToWait == that.maxDelayToWait && + maxDelayToWaitTimeUnit == that.maxDelayToWaitTimeUnit && + checkIntervalForStats == that.checkIntervalForStats && + checkIntervalForStatsTimeUnit == that.checkIntervalForStatsTimeUnit; + } + + @Override + public int hashCode() { + return Objects.hash(inboundGlobalBandwidth, + outboundGlobalBandwidth, + peakOutboundGlobalBandwidth, + maxDelayToWait, + maxDelayToWaitTimeUnit, + checkIntervalForStats, + checkIntervalForStatsTimeUnit); + } + } diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java b/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java index 04898d485eb..810a9f5d33d 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java @@ -349,21 +349,38 @@ public Future updateSSLOptions(ServerSSLOptions options, boolean force) } } - public void updateTrafficShapingOptions(TrafficShapingOptions options) { + public Future updateTrafficShapingOptions(TrafficShapingOptions options) { if (options == null) { throw new IllegalArgumentException("Invalid null value passed for traffic shaping options update"); } - if (trafficShapingHandler == null) { - throw new IllegalStateException("Unable to update traffic shaping options because the server was not configured " + - "to use traffic shaping during startup"); - } NetServerImpl server = actualServer; - if (server != null && server != this) { - server.updateTrafficShapingOptions(options); + ContextInternal ctx = vertx.getOrCreateContext(); + if (server == null) { + // Server not yet started + TrafficShapingOptions prev = this.options.getTrafficShapingOptions(); + boolean updated = prev == null || !prev.equals(options); + this.options.setTrafficShapingOptions(options); + return ctx.succeededFuture(updated); + } + // Update the traffic shaping options only for the actual/main server + if (server != this) { + return server.updateTrafficShapingOptions(options); } else { + Promise promise = ctx.promise(); + ctx.emit(v -> updateTrafficShapingOptions(options, promise)); + return promise.future(); + } + } + + public void updateTrafficShapingOptions(TrafficShapingOptions options, Promise promise) { + if (trafficShapingHandler == null) { + promise.fail(new IllegalStateException("Unable to update traffic shaping options because the server was not configured " + + "to use traffic shaping during startup")); + } else if (!options.equals(this.options.getTrafficShapingOptions())) { + // Compare with existing traffic-shaping options to ensure they are updated only when they differ. + this.options.setTrafficShapingOptions(options); long checkIntervalForStatsInMillis = options.getCheckIntervalForStatsTimeUnit().toMillis(options.getCheckIntervalForStats()); trafficShapingHandler.configure(options.getOutboundGlobalBandwidth(), options.getInboundGlobalBandwidth(), checkIntervalForStatsInMillis); - if (options.getPeakOutboundGlobalBandwidth() != 0) { trafficShapingHandler.setMaxGlobalWriteSize(options.getPeakOutboundGlobalBandwidth()); } @@ -371,6 +388,10 @@ public void updateTrafficShapingOptions(TrafficShapingOptions options) { long maxDelayToWaitInMillis = options.getMaxDelayToWaitTimeUnit().toMillis(options.getMaxDelayToWait()); trafficShapingHandler.setMaxWriteDelay(maxDelayToWaitInMillis); } + promise.complete(true); + } else { + log.info("Not updating traffic shaping options as they have not changed"); + promise.complete(false); } } diff --git a/vertx-core/src/test/java/io/vertx/tests/http/HttpBandwidthLimitingTest.java b/vertx-core/src/test/java/io/vertx/tests/http/HttpBandwidthLimitingTest.java index a4517af67bf..a9bdf27eff1 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/HttpBandwidthLimitingTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/HttpBandwidthLimitingTest.java @@ -14,16 +14,18 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.stream.Collectors; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.DeploymentOptions; -import io.vertx.core.Future; -import io.vertx.core.Promise; +import io.vertx.core.*; import io.vertx.core.http.*; import io.vertx.core.net.TrafficShapingOptions; import org.junit.After; @@ -35,8 +37,6 @@ import org.junit.runners.Parameterized.Parameters; import io.netty.handler.codec.http.HttpHeaderNames; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.test.core.TestUtils; @@ -204,6 +204,55 @@ public void start(Promise startPromise) { Assert.assertTrue(elapsedMillis > expectedTimeMillis(totalReceivedLength.get(), OUTBOUND_LIMIT)); // because there are simultaneous 2 requests } + @Test + public void testDynamicOutboundRateUpdateSharedServers() throws Exception { + int numEventLoops = 5; // We start a shared TCP server with 5 event-loops + List servers = Collections.synchronizedList(new ArrayList<>()); + vertx.deployVerticle(() -> ctx -> { + HttpServer testServer = serverFactory.apply(vertx); + servers.add(testServer); + return testServer + .requestHandler(HANDLERS.getFile(sampleF)) + .listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST); + }, new DeploymentOptions().setInstances(numEventLoops)) + .await(20, TimeUnit.SECONDS); + + // Apply traffic shaping options after the server has started + TrafficShapingOptions updatedTrafficOptions = new TrafficShapingOptions() + .setInboundGlobalBandwidth(INBOUND_LIMIT) + .setOutboundGlobalBandwidth(2 * OUTBOUND_LIMIT); + + List> promises; + promises = servers + .stream() + .map(server -> server.updateTrafficShapingOptions(updatedTrafficOptions)) + .collect(Collectors.toList()); + // Ensure all traffic shaping updates complete before resolving the startPromise + Future.all(promises).await(20, TimeUnit.SECONDS); + + HttpClient testClient = clientFactory.apply(vertx); + CountDownLatch waitForResponse = new CountDownLatch(2); + AtomicLong startTime = new AtomicLong(); + AtomicLong totalReceivedLength = new AtomicLong(); + long expectedLength = Files.size(Paths.get(sampleF.getAbsolutePath())); + startTime.set(System.nanoTime()); + for (int i = 0; i < 2; i++) { + testClient.request(HttpMethod.GET, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/get-file") + .compose(req -> req.send() + .andThen(onSuccess(resp -> assertEquals(200, resp.statusCode()))) + .compose(HttpClientResponse::body)) + .onComplete(onSuccess(body -> { + long receivedBytes = body.getBytes().length; + totalReceivedLength.addAndGet(receivedBytes); + Assert.assertEquals(expectedLength, receivedBytes); + waitForResponse.countDown(); + })); + } + awaitLatch(waitForResponse); + long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime.get()); + Assert.assertTrue(elapsedMillis < expectedUpperBoundTimeMillis(totalReceivedLength.get(), OUTBOUND_LIMIT)); + } + @Test public void testDynamicOutboundRateUpdate() throws Exception { Buffer expectedBuffer = TestUtils.randomBuffer(TEST_CONTENT_SIZE); diff --git a/vertx-core/src/test/java/io/vertx/tests/net/NetBandwidthLimitingTest.java b/vertx-core/src/test/java/io/vertx/tests/net/NetBandwidthLimitingTest.java index d7eeb54c5e1..09fd1c29bd7 100644 --- a/vertx-core/src/test/java/io/vertx/tests/net/NetBandwidthLimitingTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/net/NetBandwidthLimitingTest.java @@ -341,16 +341,22 @@ public void testDynamicOutboundRateUpdate() { } @Test(expected = IllegalStateException.class) - public void testRateUpdateWhenServerStartedWithoutTrafficShaping() { + public void testRateUpdateWhenServerStartedWithoutTrafficShaping() throws Exception { Assume.assumeFalse(TRANSPORT == Transport.IO_URING); NetServerOptions options = new NetServerOptions().setHost(DEFAULT_HOST).setPort(DEFAULT_PORT); NetServer testServer = netServer(options); + testServer + .listen() + .await(20, TimeUnit.SECONDS); + // update inbound rate to twice the limit TrafficShapingOptions trafficOptions = new TrafficShapingOptions() .setOutboundGlobalBandwidth(OUTBOUND_LIMIT) .setInboundGlobalBandwidth(2 * INBOUND_LIMIT); - testServer.updateTrafficShapingOptions(trafficOptions); + testServer + .updateTrafficShapingOptions(trafficOptions) + .await(20, TimeUnit.SECONDS); } /**