Skip to content

Commit

Permalink
Fixes updates to traffic shaping options with shared servers
Browse files Browse the repository at this point in the history
This fixes the update-path for traffic shaping options to check for the existence of the traffic shaping handler only for the actual server. Currently, updating traffic-shaping options in a shared-server setting results in IllegalStateException, as the traffic-shaping handler is not set for the worker (non-main) servers.
  • Loading branch information
vietj committed Jan 13, 2025
1 parent 56a122a commit 58ab5c5
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 24 deletions.
13 changes: 10 additions & 3 deletions vertx-core/src/main/java/io/vertx/core/http/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,19 @@ default Future<Boolean> updateSSLOptions(ServerSSLOptions options) {
Future<Boolean> updateSSLOptions(ServerSSLOptions options, boolean force);

/**
* Update traffic shaping options {@code options}, the update happens if valid values are passed for traffic
* shaping options. This update happens synchronously and at best effort for rate update to take effect immediately.
* <p>Update the server with new traffic {@code options}, the update happens if the options object is valid and different
* from the existing options object.
*
* <p>The {@code options} object is compared using its {@code equals} method against the existing options to prevent
* an update when the objects are equals since loading options can be costly, this can happen for share TCP servers.
* When object are equals, setting {@code force} to {@code true} forces the update.
*
* <p>The boolean succeeded future result indicates whether the update occurred.
*
* @param options the new traffic shaping options
* @return a future signaling the update success
*/
void updateTrafficShapingOptions(TrafficShapingOptions options);
Future<Boolean> updateTrafficShapingOptions(TrafficShapingOptions options);

/**
* Tell the server to start listening. The server will listen on the port and host specified in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ public Future<Boolean> updateSSLOptions(ServerSSLOptions options, boolean force)
}

@Override
public void updateTrafficShapingOptions(TrafficShapingOptions options) {
public Future<Boolean> updateTrafficShapingOptions(TrafficShapingOptions options) {
NetServer s;
synchronized (this) {
s = tcpServer;
}
if (s == null) {
throw new IllegalStateException("Not listening");
}
s.updateTrafficShapingOptions(options);
return s.updateTrafficShapingOptions(options);
}

@Override
Expand Down
13 changes: 10 additions & 3 deletions vertx-core/src/main/java/io/vertx/core/net/NetServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,17 @@ default Future<Boolean> updateSSLOptions(ServerSSLOptions options) {
Future<Boolean> updateSSLOptions(ServerSSLOptions options, boolean force);

/**
* Update traffic shaping options {@code options}, the update happens if valid values are passed for traffic
* shaping options. This update happens synchronously and at best effort for rate update to take effect immediately.
* <p>Update the server with new traffic {@code options}, the update happens if the options object is valid and different
* from the existing options object.
*
* <p>The {@code options} object is compared using its {@code equals} method against the existing options to prevent
* an update when the objects are equals since loading options can be costly, this can happen for share TCP servers.
* When object are equals, setting {@code force} to {@code true} forces the update.
*
* <p>The boolean succeeded future result indicates whether the update occurred.
*
* @param options the new traffic shaping options
* @return a future signaling the update success
*/
void updateTrafficShapingOptions(TrafficShapingOptions options);
Future<Boolean> updateTrafficShapingOptions(TrafficShapingOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,28 @@ public long getCheckIntervalForStats() {
public TimeUnit getCheckIntervalForStatsTimeUnit() {
return checkIntervalForStatsTimeUnit;
}

@Override
public boolean equals(Object obj) {
TrafficShapingOptions that = (TrafficShapingOptions) obj;
return inboundGlobalBandwidth == that.inboundGlobalBandwidth &&
outboundGlobalBandwidth == that.outboundGlobalBandwidth &&
peakOutboundGlobalBandwidth == that.peakOutboundGlobalBandwidth &&
maxDelayToWait == that.maxDelayToWait &&
maxDelayToWaitTimeUnit == that.maxDelayToWaitTimeUnit &&
checkIntervalForStats == that.checkIntervalForStats &&
checkIntervalForStatsTimeUnit == that.checkIntervalForStatsTimeUnit;
}

@Override
public int hashCode() {
return Objects.hash(inboundGlobalBandwidth,
outboundGlobalBandwidth,
peakOutboundGlobalBandwidth,
maxDelayToWait,
maxDelayToWaitTimeUnit,
checkIntervalForStats,
checkIntervalForStatsTimeUnit);
}

}
37 changes: 29 additions & 8 deletions vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,28 +349,49 @@ public Future<Boolean> updateSSLOptions(ServerSSLOptions options, boolean force)
}
}

public void updateTrafficShapingOptions(TrafficShapingOptions options) {
public Future<Boolean> updateTrafficShapingOptions(TrafficShapingOptions options) {
if (options == null) {
throw new IllegalArgumentException("Invalid null value passed for traffic shaping options update");
}
if (trafficShapingHandler == null) {
throw new IllegalStateException("Unable to update traffic shaping options because the server was not configured " +
"to use traffic shaping during startup");
}
NetServerImpl server = actualServer;
if (server != null && server != this) {
server.updateTrafficShapingOptions(options);
ContextInternal ctx = vertx.getOrCreateContext();
if (server == null) {
// Server not yet started
TrafficShapingOptions prev = this.options.getTrafficShapingOptions();
boolean updated = prev == null || !prev.equals(options);
this.options.setTrafficShapingOptions(options);
return ctx.succeededFuture(updated);
}
// Update the traffic shaping options only for the actual/main server
if (server != this) {
return server.updateTrafficShapingOptions(options);
} else {
Promise<Boolean> promise = ctx.promise();
ctx.emit(v -> updateTrafficShapingOptions(options, promise));
return promise.future();
}
}

public void updateTrafficShapingOptions(TrafficShapingOptions options, Promise<Boolean> promise) {
if (trafficShapingHandler == null) {
promise.fail(new IllegalStateException("Unable to update traffic shaping options because the server was not configured " +
"to use traffic shaping during startup"));
} else if (!options.equals(this.options.getTrafficShapingOptions())) {
// Compare with existing traffic-shaping options to ensure they are updated only when they differ.
this.options.setTrafficShapingOptions(options);
long checkIntervalForStatsInMillis = options.getCheckIntervalForStatsTimeUnit().toMillis(options.getCheckIntervalForStats());
trafficShapingHandler.configure(options.getOutboundGlobalBandwidth(), options.getInboundGlobalBandwidth(), checkIntervalForStatsInMillis);

if (options.getPeakOutboundGlobalBandwidth() != 0) {
trafficShapingHandler.setMaxGlobalWriteSize(options.getPeakOutboundGlobalBandwidth());
}
if (options.getMaxDelayToWait() != 0) {
long maxDelayToWaitInMillis = options.getMaxDelayToWaitTimeUnit().toMillis(options.getMaxDelayToWait());
trafficShapingHandler.setMaxWriteDelay(maxDelayToWaitInMillis);
}
promise.complete(true);
} else {
log.info("Not updating traffic shaping options as they have not changed");
promise.complete(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.*;
import io.vertx.core.http.*;
import io.vertx.core.net.TrafficShapingOptions;
import org.junit.After;
Expand All @@ -35,8 +37,6 @@
import org.junit.runners.Parameterized.Parameters;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.test.core.TestUtils;

Expand Down Expand Up @@ -204,6 +204,55 @@ public void start(Promise<Void> startPromise) {
Assert.assertTrue(elapsedMillis > expectedTimeMillis(totalReceivedLength.get(), OUTBOUND_LIMIT)); // because there are simultaneous 2 requests
}

@Test
public void testDynamicOutboundRateUpdateSharedServers() throws Exception {
int numEventLoops = 5; // We start a shared TCP server with 5 event-loops
List<HttpServer> servers = Collections.synchronizedList(new ArrayList<>());
vertx.deployVerticle(() -> ctx -> {
HttpServer testServer = serverFactory.apply(vertx);
servers.add(testServer);
return testServer
.requestHandler(HANDLERS.getFile(sampleF))
.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST);
}, new DeploymentOptions().setInstances(numEventLoops))
.await(20, TimeUnit.SECONDS);

// Apply traffic shaping options after the server has started
TrafficShapingOptions updatedTrafficOptions = new TrafficShapingOptions()
.setInboundGlobalBandwidth(INBOUND_LIMIT)
.setOutboundGlobalBandwidth(2 * OUTBOUND_LIMIT);

List<Future<?>> promises;
promises = servers
.stream()
.map(server -> server.updateTrafficShapingOptions(updatedTrafficOptions))
.collect(Collectors.toList());
// Ensure all traffic shaping updates complete before resolving the startPromise
Future.all(promises).await(20, TimeUnit.SECONDS);

HttpClient testClient = clientFactory.apply(vertx);
CountDownLatch waitForResponse = new CountDownLatch(2);
AtomicLong startTime = new AtomicLong();
AtomicLong totalReceivedLength = new AtomicLong();
long expectedLength = Files.size(Paths.get(sampleF.getAbsolutePath()));
startTime.set(System.nanoTime());
for (int i = 0; i < 2; i++) {
testClient.request(HttpMethod.GET, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/get-file")
.compose(req -> req.send()
.andThen(onSuccess(resp -> assertEquals(200, resp.statusCode())))
.compose(HttpClientResponse::body))
.onComplete(onSuccess(body -> {
long receivedBytes = body.getBytes().length;
totalReceivedLength.addAndGet(receivedBytes);
Assert.assertEquals(expectedLength, receivedBytes);
waitForResponse.countDown();
}));
}
awaitLatch(waitForResponse);
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime.get());
Assert.assertTrue(elapsedMillis < expectedUpperBoundTimeMillis(totalReceivedLength.get(), OUTBOUND_LIMIT));
}

@Test
public void testDynamicOutboundRateUpdate() throws Exception {
Buffer expectedBuffer = TestUtils.randomBuffer(TEST_CONTENT_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,16 +341,22 @@ public void testDynamicOutboundRateUpdate() {
}

@Test(expected = IllegalStateException.class)
public void testRateUpdateWhenServerStartedWithoutTrafficShaping() {
public void testRateUpdateWhenServerStartedWithoutTrafficShaping() throws Exception {
Assume.assumeFalse(TRANSPORT == Transport.IO_URING);
NetServerOptions options = new NetServerOptions().setHost(DEFAULT_HOST).setPort(DEFAULT_PORT);
NetServer testServer = netServer(options);

testServer
.listen()
.await(20, TimeUnit.SECONDS);

// update inbound rate to twice the limit
TrafficShapingOptions trafficOptions = new TrafficShapingOptions()
.setOutboundGlobalBandwidth(OUTBOUND_LIMIT)
.setInboundGlobalBandwidth(2 * INBOUND_LIMIT);
testServer.updateTrafficShapingOptions(trafficOptions);
testServer
.updateTrafficShapingOptions(trafficOptions)
.await(20, TimeUnit.SECONDS);
}

/**
Expand Down

0 comments on commit 58ab5c5

Please sign in to comment.