Skip to content

Commit

Permalink
Use Netty default allocator whenever it is pooled otherwise fallback …
Browse files Browse the repository at this point in the history
…to adaptive allocator.

Motivation:

Vert.x should use Netty's default allocator whenever possible in order to minimize the resources for pooled allocation (thread-local direct buffers, arenas).

Changes:

VertxByteBufAllocator.POOLED_ALLOCATOR reuses ByteBufAllocator.DEFAULT when it is pooled otherwise uses AdaptiveByteBufAllocator.DEFAULT.

TCP server/client should use VertxByteBufAllocator.POOLED_ALLOCATOR instead of PooledByteBufAllocator.DEFAULT.
  • Loading branch information
vietj committed Oct 29, 2024
1 parent d49eb4c commit ccce366
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@
*/
package io.vertx.core.impl.buffer;

import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.*;
import io.netty.util.internal.PlatformDependent;
import io.vertx.core.buffer.impl.VertxHeapByteBuf;
import io.vertx.core.buffer.impl.VertxUnsafeHeapByteBuf;
Expand All @@ -24,7 +20,16 @@ public abstract class VertxByteBufAllocator extends AbstractByteBufAllocator {
/**
* Vert.x pooled allocator.
*/
public static final ByteBufAllocator POOLED_ALLOCATOR = new PooledByteBufAllocator(true);
public static final ByteBufAllocator POOLED_ALLOCATOR;

static {
ByteBufAllocator pooledAllocator = ByteBufAllocator.DEFAULT;
if (!pooledAllocator.isDirectBufferPooled()) {
// When io.netty.allocator.type == unpooled
pooledAllocator = AdaptiveByteBufAllocator.DEFAULT;
}
POOLED_ALLOCATOR = pooledAllocator;
}

/**
* Vert.x shared un-pooled allocator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.impl.PartialPooledByteBufAllocator;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.CloseSequence;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.impl.buffer.VertxByteBufAllocator;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.net.NetClientInternal;
Expand Down Expand Up @@ -282,7 +282,7 @@ private void connectInternal2(ConnectOptions connectOptions,
Objects.requireNonNull(connectHandler, "No null connectHandler accepted");
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoop);
bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
bootstrap.option(ChannelOption.ALLOCATOR, VertxByteBufAllocator.POOLED_ALLOCATOR);

SocketAddress remoteAddress = connectOptions.getRemoteAddress();
if (remoteAddress == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package io.vertx.core.net.impl;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
Expand All @@ -31,6 +30,7 @@
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.impl.buffer.VertxByteBufAllocator;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.tls.SslContextManager;
Expand Down Expand Up @@ -508,7 +508,7 @@ private void bind(
if (options.isSsl()) {
bootstrap.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
} else {
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.ALLOCATOR, VertxByteBufAllocator.POOLED_ALLOCATOR);
}

bootstrap.childHandler(channelBalancer);
Expand Down
28 changes: 26 additions & 2 deletions vertx-core/src/test/java/io/vertx/tests/net/NetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

package io.vertx.tests.net;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
Expand All @@ -28,6 +27,8 @@
import io.netty.handler.ssl.IdentityCipherSuiteFilter;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.PlatformDependent;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
Expand Down Expand Up @@ -4602,4 +4603,27 @@ public void testConnectToServerShutdown() throws Exception {
assertWaitUntil(closed::get);
fut.await();
}

@Test
public void testDefaultPooledAllocator() {
server.connectHandler(so -> {
NetSocketInternal soi = (NetSocketInternal) so;
soi.messageHandler(msg -> {
try {
ByteBuf bbuf = (ByteBuf) msg;
System.out.println(bbuf.alloc());
assertSame(ByteBufAllocator.DEFAULT, bbuf.alloc());
} finally {
ReferenceCountUtil.release(msg);
}
testComplete();
});
});
server
.listen(1234, "localhost")
.await();
NetSocket so = client.connect(testAddress).await();
so.write(Buffer.buffer("ping"));
await();
}
}

0 comments on commit ccce366

Please sign in to comment.