From 068293cc2e009250bca26051ef8e2b579307e6d0 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Fri, 18 Oct 2024 15:47:40 +0200 Subject: [PATCH 1/3] Unix Domain Socket support with JDK16+ Since Netty 4.1.110.Final, Unix Domain Sockets are available with NIO transport. https://netty.io/news/2024/05/22/4-1-110-Final.html This changes the Vert.x JDKTransport to support Unix Domain Sockets without native transports when a JDK16+ is used. Signed-off-by: Thomas Segismont --- .../core/impl/transports/NioTransport.java | 52 +++++++++++-- .../UnixDomainSocketJdkSupport.java | 78 +++++++++++++++++++ .../vertx/core/spi/transport/Transport.java | 14 ++-- .../io/vertx/it/transport/TransportTest.java | 13 +++- .../java/io/vertx/tests/http/HttpTest.java | 43 ++++++---- .../test/java/io/vertx/tests/net/NetTest.java | 71 +++++++++-------- 6 files changed, 201 insertions(+), 70 deletions(-) create mode 100644 vertx-core/src/main/java/io/vertx/core/impl/transports/UnixDomainSocketJdkSupport.java diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java index 51068bd60f2..9a6d609eff6 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java @@ -10,30 +10,59 @@ */ package io.vertx.core.impl.transports; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; +import io.netty.channel.IoHandlerFactory; +import io.netty.channel.ServerChannel; import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.InternetProtocolFamily; -import io.netty.channel.socket.nio.NioDatagramChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.socket.nio.*; import io.vertx.core.spi.transport.Transport; +import java.net.SocketAddress; + public class NioTransport implements Transport { /** * The NIO transport, always there. */ public static final Transport INSTANCE = new NioTransport(); + private final UnixDomainSocketJdkSupport unixDomainSocketJdkSupport = UnixDomainSocketJdkSupport.load(); + + @Override + public boolean supportsDomainSockets() { + return unixDomainSocketJdkSupport != null; + } + + @Override + public SocketAddress convert(io.vertx.core.net.SocketAddress address) { + if (address.isDomainSocket() && unixDomainSocketJdkSupport != null) { + return unixDomainSocketJdkSupport.convert(address); + } else { + return Transport.super.convert(address); + } + } + + @Override + public io.vertx.core.net.SocketAddress convert(SocketAddress address) { + if (unixDomainSocketJdkSupport != null && unixDomainSocketJdkSupport.isUnixDomainSocketAddress(address)) { + return unixDomainSocketJdkSupport.convert(address); + } + return Transport.super.convert(address); + } + @Override public IoHandlerFactory ioHandlerFactory() { return NioIoHandler.newFactory(); } + @Override public DatagramChannel datagramChannel() { return new NioDatagramChannel(); } + @Override public DatagramChannel datagramChannel(InternetProtocolFamily family) { switch (family) { case IPv4: @@ -45,16 +74,25 @@ public DatagramChannel datagramChannel(InternetProtocolFamily family) { } } + @Override public ChannelFactory channelFactory(boolean domainSocket) { if (domainSocket) { - throw new IllegalArgumentException("The Vertx instance must be created with the preferNativeTransport option set to true to create domain sockets"); + if (unixDomainSocketJdkSupport == null) { + throw new IllegalArgumentException("Domain sockets require JDK 16 and above, or the usage of a native transport"); + } + return NioDomainSocketChannel::new; + } else { + return NioSocketChannel::new; } - return NioSocketChannel::new; } + @Override public ChannelFactory serverChannelFactory(boolean domainSocket) { if (domainSocket) { - throw new IllegalArgumentException("The Vertx instance must be created with the preferNativeTransport option set to true to create domain sockets"); + if (unixDomainSocketJdkSupport == null) { + throw new IllegalArgumentException("Domain sockets require JDK 16 and above, or the usage of a native transport"); + } + return NioServerDomainSocketChannel::new; } return NioServerSocketChannel::new; } diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/UnixDomainSocketJdkSupport.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/UnixDomainSocketJdkSupport.java new file mode 100644 index 00000000000..16c61c95ed6 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/impl/transports/UnixDomainSocketJdkSupport.java @@ -0,0 +1,78 @@ +package io.vertx.core.impl.transports; + +import io.netty.util.internal.PlatformDependent; +import io.vertx.core.internal.logging.Logger; +import io.vertx.core.internal.logging.LoggerFactory; +import io.vertx.core.net.impl.SocketAddressImpl; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.net.SocketAddress; +import java.nio.file.Path; + +import static java.lang.invoke.MethodType.methodType; + +class UnixDomainSocketJdkSupport { + + private static final Logger LOG = LoggerFactory.getLogger(UnixDomainSocketJdkSupport.class); + + private final Class unixDomainSocketAddressClass; + private final MethodHandle ofMethodHandle; + private final MethodHandle getPathMethodHandle; + + private UnixDomainSocketJdkSupport(Class unixDomainSocketAddressClass, MethodHandle ofMethodHandle, MethodHandle getPathMethodHandle) { + this.unixDomainSocketAddressClass = unixDomainSocketAddressClass; + this.ofMethodHandle = ofMethodHandle; + this.getPathMethodHandle = getPathMethodHandle; + } + + static UnixDomainSocketJdkSupport load() { + Class unixDomainSocketAddressClass; + MethodHandle ofMethodHandle; + MethodHandle getPathMethodHandle; + if (PlatformDependent.javaVersion() >= 16) { + try { + unixDomainSocketAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); + MethodHandles.Lookup lookup = MethodHandles.publicLookup(); + ofMethodHandle = lookup.findStatic(unixDomainSocketAddressClass, "of", methodType(unixDomainSocketAddressClass, Path.class)); + getPathMethodHandle = lookup.findVirtual(unixDomainSocketAddressClass, "getPath", methodType(Path.class)); + return new UnixDomainSocketJdkSupport(unixDomainSocketAddressClass, ofMethodHandle, getPathMethodHandle); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException e) { + LOG.warn("JDK Unix Domain Socket support is not available", e); + } + } + return null; + } + + SocketAddress convert(io.vertx.core.net.SocketAddress address) { + try { + return (SocketAddress) ofMethodHandle.invoke(Path.of(address.path())); + } catch (Throwable cause) { + rethrowIfPossible(cause); + throw new LinkageError("java.net.UnixDomainSocketAddress.of not available", cause); + } + } + + boolean isUnixDomainSocketAddress(SocketAddress address) { + return unixDomainSocketAddressClass.isAssignableFrom(address.getClass()); + } + + io.vertx.core.net.SocketAddress convert(SocketAddress address) { + try { + Path path = (Path) getPathMethodHandle.invoke(address); + return new SocketAddressImpl(path.toAbsolutePath().toString()); + } catch (Throwable cause) { + rethrowIfPossible(cause); + throw new LinkageError("java.net.UnixDomainSocketAddress.getPath not available", cause); + } + } + + private static void rethrowIfPossible(Throwable cause) { + if (cause instanceof Error) { + throw (Error) cause; + } + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java b/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java index 747a7c6fb06..45bd360439d 100644 --- a/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java +++ b/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java @@ -16,18 +16,14 @@ import io.netty.channel.*; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.InternetProtocolFamily; +import io.vertx.core.buffer.impl.PartialPooledByteBufAllocator; import io.vertx.core.datagram.DatagramSocketOptions; +import io.vertx.core.impl.transports.NioTransport; import io.vertx.core.net.ClientOptionsBase; import io.vertx.core.net.NetServerOptions; -import io.vertx.core.buffer.impl.PartialPooledByteBufAllocator; import io.vertx.core.net.impl.SocketAddressImpl; -import io.vertx.core.impl.transports.NioTransport; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.NetworkInterface; -import java.net.SocketAddress; -import java.net.SocketException; +import java.net.*; import java.util.concurrent.ThreadFactory; /** @@ -65,7 +61,7 @@ default Throwable unavailabilityCause() { default SocketAddress convert(io.vertx.core.net.SocketAddress address) { if (address.isDomainSocket()) { - throw new IllegalArgumentException("Domain socket are not supported by NIO transport, you need to use native transport to use them"); + throw new IllegalArgumentException("Domain sockets require JDK 16 and above, or the usage of a native transport"); } else { InetAddress ip = ((SocketAddressImpl) address).ipAddress(); if (ip != null) { @@ -175,8 +171,8 @@ default void configure(ClientOptionsBase options, int connectTimeout, boolean do } default void configure(NetServerOptions options, boolean domainSocket, ServerBootstrap bootstrap) { - bootstrap.option(ChannelOption.SO_REUSEADDR, options.isReuseAddress()); if (!domainSocket) { + bootstrap.option(ChannelOption.SO_REUSEADDR, options.isReuseAddress()); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); bootstrap.childOption(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); } diff --git a/vertx-core/src/test/java/io/vertx/it/transport/TransportTest.java b/vertx-core/src/test/java/io/vertx/it/transport/TransportTest.java index 9c67eddd595..3931ee54698 100644 --- a/vertx-core/src/test/java/io/vertx/it/transport/TransportTest.java +++ b/vertx-core/src/test/java/io/vertx/it/transport/TransportTest.java @@ -11,6 +11,7 @@ package io.vertx.it.transport; +import io.netty.util.internal.PlatformDependent; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.net.NetClient; @@ -22,6 +23,8 @@ import java.io.File; +import static org.junit.Assume.assumeTrue; + /** * @author Julien Viet */ @@ -31,7 +34,9 @@ public class TransportTest extends AsyncTestBase { @Override protected void tearDown() throws Exception { - close(vertx); + if (vertx != null) { + close(vertx); + } super.tearDown(); } @@ -86,6 +91,7 @@ private void testNetServer(VertxOptions options) throws InterruptedException { @Test public void testDomainSocketServer() throws Exception { + assumeTrue(PlatformDependent.javaVersion() < 16); File sock = TestUtils.tmpFile(".sock"); vertx = Vertx.vertx(); NetServer server = vertx.createNetServer(); @@ -93,7 +99,7 @@ public void testDomainSocketServer() throws Exception { server .listen(SocketAddress.domainSocketAddress(sock.getAbsolutePath())) .onComplete(onFailure(err -> { - assertEquals(err.getClass(), IllegalArgumentException.class); + assertEquals(IllegalArgumentException.class, err.getClass()); testComplete(); })); await(); @@ -101,12 +107,13 @@ public void testDomainSocketServer() throws Exception { @Test public void testDomainSocketClient() throws Exception { + assumeTrue(PlatformDependent.javaVersion() < 16); File sock = TestUtils.tmpFile(".sock"); vertx = Vertx.vertx(); NetClient client = vertx.createNetClient(); client.connect(SocketAddress.domainSocketAddress(sock.getAbsolutePath())) .onComplete(onFailure(err -> { - assertEquals(err.getClass(), IllegalArgumentException.class); + assertEquals(IllegalArgumentException.class, err.getClass()); testComplete(); })); await(); diff --git a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java index 40861dd2ad0..117e08b0651 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java @@ -25,11 +25,11 @@ import io.vertx.core.http.*; import io.vertx.core.http.impl.CleanableHttpClient; import io.vertx.core.http.impl.HttpClientImpl; -import io.vertx.core.internal.ContextInternal; -import io.vertx.core.internal.http.HttpServerRequestInternal; import io.vertx.core.http.impl.ServerCookie; import io.vertx.core.http.impl.headers.HeadersMultiMap; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.VertxInternal; +import io.vertx.core.internal.http.HttpServerRequestInternal; import io.vertx.core.net.*; import io.vertx.core.net.impl.HAProxyMessageCompletionHandler; import io.vertx.core.streams.ReadStream; @@ -68,6 +68,8 @@ import static io.vertx.test.core.AssertExpectations.that; import static io.vertx.test.core.TestUtils.*; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assume.assumeFalse; +import static org.junit.Assume.assumeTrue; /** * @author Julien Viet @@ -184,10 +186,21 @@ public void testListenSocketAddress() throws Exception { } @Test - public void testListenDomainSocketAddress() throws Exception { - Vertx vx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true)); - Assume.assumeTrue("Native transport must be enabled", vx.isNativeTransportEnabled()); - Assume.assumeTrue("Transport must support domain sockets", ((VertxInternal) vx).transport().supportsDomainSockets()); + public void testListenDomainSocketAddressNative() throws Exception { + VertxInternal vx = (VertxInternal) Vertx.vertx(new VertxOptions().setPreferNativeTransport(true)); + assumeTrue("Native transport must be enabled", vx.isNativeTransportEnabled()); + testListenDomainSocketAddress(vx); + } + + @Test + public void testListenDomainSocketAddressJdk() throws Exception { + VertxInternal vx = (VertxInternal) Vertx.vertx(new VertxOptions().setPreferNativeTransport(false)); + assumeFalse("Native transport must not be enabled", vx.isNativeTransportEnabled()); + testListenDomainSocketAddress(vx); + } + + private void testListenDomainSocketAddress(VertxInternal vx) throws Exception { + assumeTrue("Transport must support domain sockets", vx.transport().supportsDomainSockets()); int len = 3; waitFor(len * len); List addresses = new ArrayList<>(); @@ -277,7 +290,7 @@ public void testLowerCaseHeaders() throws Exception { @Test public void testServerActualPortWhenSet() throws Exception { - Assume.assumeTrue(testAddress.isInetSocket()); + assumeTrue(testAddress.isInetSocket()); server.close(); server .requestHandler(request -> { @@ -303,7 +316,7 @@ public void testServerActualPortWhenSet() throws Exception { @Test public void testServerActualPortWhenZero() throws Exception { - Assume.assumeTrue(testAddress.isInetSocket()); + assumeTrue(testAddress.isInetSocket()); server.close(); server = vertx.createHttpServer(createBaseServerOptions().setPort(0).setHost(DEFAULT_HTTP_HOST)); server @@ -330,7 +343,7 @@ public void testServerActualPortWhenZero() throws Exception { @Test public void testServerActualPortWhenZeroPassedInListen() throws Exception { - Assume.assumeTrue(testAddress.isInetSocket()); + assumeTrue(testAddress.isInetSocket()); server.close(); server = vertx.createHttpServer(new HttpServerOptions(createBaseServerOptions()).setHost(DEFAULT_HTTP_HOST)); server @@ -357,7 +370,7 @@ public void testServerActualPortWhenZeroPassedInListen() throws Exception { @Test public void testClientRequestOptionsSocketAddressOnly() throws Exception { - Assume.assumeTrue(testAddress.isInetSocket()); + assumeTrue(testAddress.isInetSocket()); Integer port = requestOptions.getPort(); String host = requestOptions.getHost(); server @@ -4243,7 +4256,7 @@ public void testFollowRedirectSendHeadThenBody() throws Exception { @Test public void testFollowRedirectLimit() throws Exception { - Assume.assumeTrue(testAddress.isInetSocket()); + assumeTrue(testAddress.isInetSocket()); AtomicInteger numberOfRequests = new AtomicInteger(); server.requestHandler(req -> { int val = numberOfRequests.incrementAndGet(); @@ -4270,7 +4283,7 @@ public void testFollowRedirectLimit() throws Exception { @Test public void testFollowRedirectPropagatesTimeout() throws Exception { - Assume.assumeTrue(testAddress.isInetSocket()); + assumeTrue(testAddress.isInetSocket()); AtomicInteger redirections = new AtomicInteger(); server.requestHandler(req -> { switch (redirections.getAndIncrement()) { @@ -4491,7 +4504,7 @@ class MockResp implements HttpClientResponse { @Test public void testFollowRedirectEncodedParams() throws Exception { - Assume.assumeTrue(testAddress.isInetSocket()); + assumeTrue(testAddress.isInetSocket()); String value1 = "\ud55c\uae00", value2 = "A B+C", value3 = "123 \u20ac"; server.requestHandler(req -> { switch (req.path()) { @@ -6361,7 +6374,7 @@ public void testHAProxyProtocolVersion1TCP6() throws Exception { @Test public void testHAProxyProtocolVersion1Unknown() throws Exception { - Assume.assumeTrue(testAddress.isInetSocket()); + assumeTrue(testAddress.isInetSocket()); Buffer header = HAProxy.createVersion1UnknownProtocolHeader(); testHAProxyProtocolAccepted(header, null, null); } @@ -6392,7 +6405,7 @@ public void testHAProxyProtocolVersion2UnixSocket() throws Exception { @Test public void testHAProxyProtocolVersion2Unknown() throws Exception { - Assume.assumeTrue(testAddress.isInetSocket()); + assumeTrue(testAddress.isInetSocket()); Buffer header = HAProxy.createVersion2UnknownProtocolHeader(); testHAProxyProtocolAccepted(header, null, null); } diff --git a/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java b/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java index 0bc079bf52b..aea819b034d 100755 --- a/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java @@ -11,47 +11,41 @@ package io.vertx.tests.net; -import io.netty.buffer.*; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.*; -import io.netty.handler.codec.http.DefaultFullHttpRequest; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.HttpClientCodec; -import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.*; import io.netty.handler.ssl.ApplicationProtocolConfig; import io.netty.handler.ssl.IdentityCipherSuiteFilter; import io.netty.handler.ssl.JdkSslContext; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.internal.PlatformDependent; +import io.vertx.core.Future; import io.vertx.core.*; import io.vertx.core.buffer.Buffer; -import io.vertx.core.internal.buffer.BufferInternal; import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.http.*; import io.vertx.core.impl.Utils; import io.vertx.core.internal.VertxInternal; +import io.vertx.core.internal.buffer.BufferInternal; import io.vertx.core.internal.net.NetClientInternal; +import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.core.net.*; -import io.vertx.core.net.impl.*; -import io.vertx.core.internal.net.NetSocketInternal; +import io.vertx.core.net.impl.CleanableNetClient; +import io.vertx.core.net.impl.HAProxyMessageCompletionHandler; +import io.vertx.core.net.impl.NetServerImpl; +import io.vertx.core.net.impl.VertxHandler; import io.vertx.core.spi.tls.SslContextFactory; import io.vertx.test.core.CheckingSender; import io.vertx.test.core.TestUtils; import io.vertx.test.core.VertxTestBase; import io.vertx.test.netty.TestLoggerFactory; -import io.vertx.test.proxy.HAProxy; -import io.vertx.test.proxy.HttpProxy; -import io.vertx.test.proxy.Socks4Proxy; -import io.vertx.test.proxy.SocksProxy; -import io.vertx.test.proxy.TestProxyBase; +import io.vertx.test.proxy.*; import io.vertx.test.tls.Cert; import io.vertx.test.tls.Trust; import org.junit.Assume; @@ -60,23 +54,14 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLException; -import javax.net.ssl.SSLPeerUnverifiedException; -import javax.net.ssl.SSLSession; -import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.*; import java.io.*; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.security.KeyStore; import java.security.cert.Certificate; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -86,11 +71,13 @@ import java.util.function.LongPredicate; import java.util.function.Supplier; +import static io.vertx.test.core.TestUtils.*; import static io.vertx.test.http.HttpTestBase.DEFAULT_HTTPS_HOST; import static io.vertx.test.http.HttpTestBase.DEFAULT_HTTPS_PORT; -import static io.vertx.test.core.TestUtils.*; import static io.vertx.tests.tls.HttpTLSTest.testPeerHostServerCert; import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assume.assumeFalse; +import static org.junit.Assume.assumeTrue; /** * @author Tim Fox @@ -1387,7 +1374,7 @@ public void testSpecificTlsProtocolVersion() throws Exception { @Test public void testTLSTrailingDotHost() throws Exception { - Assume.assumeTrue(PlatformDependent.javaVersion() < 9); + assumeTrue(PlatformDependent.javaVersion() < 9); // We just need a vanilla cert for this test SelfSignedCertificate cert = SelfSignedCertificate.create("host2.com"); TLSTest test = new TLSTest() @@ -1971,9 +1958,21 @@ public void start(Promise startPromise) { } @Test - public void testListenDomainSocketAddress() throws Exception { + public void testListenDomainSocketAddressNative() throws Exception { VertxInternal vx = (VertxInternal) Vertx.vertx(new VertxOptions().setPreferNativeTransport(true)); - Assume.assumeTrue("Transport must support domain sockets", vx.transport().supportsDomainSockets()); + assumeTrue("Native transport must be enabled", vx.isNativeTransportEnabled()); + testListenDomainSocketAddress(vx); + } + + @Test + public void testListenDomainSocketAddressJdk() throws Exception { + VertxInternal vx = (VertxInternal) Vertx.vertx(new VertxOptions().setPreferNativeTransport(false)); + assumeFalse("Native transport must not be enabled", vx.isNativeTransportEnabled()); + testListenDomainSocketAddress(vx); + } + + private void testListenDomainSocketAddress(VertxInternal vx) throws Exception { + assumeTrue("Transport must support domain sockets", vx.transport().supportsDomainSockets()); int len = 3; waitFor(len * len); List addresses = new ArrayList<>(); @@ -3371,7 +3370,7 @@ public void testClientLocalAddress() { @Test public void testSelfSignedCertificate() throws Exception { - Assume.assumeTrue(PlatformDependent.javaVersion() < 9); + assumeTrue(PlatformDependent.javaVersion() < 9); CountDownLatch latch = new CountDownLatch(2); @@ -4181,7 +4180,7 @@ public void testHAProxyProtocolIdleTimeoutNotHappened() throws Exception { @Test public void testHAProxyProtocolConnectSSL() throws Exception { - Assume.assumeTrue(testAddress.isInetSocket()); + assumeTrue(testAddress.isInetSocket()); waitFor(2); SocketAddress remote = SocketAddress.inetSocketAddress(56324, "192.168.0.1"); SocketAddress local = SocketAddress.inetSocketAddress(443, "192.168.0.11"); @@ -4243,7 +4242,7 @@ public void testHAProxyProtocolVersion1TCP6() throws Exception { @Test public void testHAProxyProtocolVersion1Unknown() throws Exception { - Assume.assumeTrue(testAddress.isInetSocket()); + assumeTrue(testAddress.isInetSocket()); Buffer header = HAProxy.createVersion1UnknownProtocolHeader(); testHAProxyProtocolAccepted(header, null, null); } @@ -4274,7 +4273,7 @@ public void testHAProxyProtocolVersion2UnixSocket() throws Exception { @Test public void testHAProxyProtocolVersion2Unknown() throws Exception { - Assume.assumeTrue(testAddress.isInetSocket()); + assumeTrue(testAddress.isInetSocket()); Buffer header = HAProxy.createVersion2UnknownProtocolHeader(); testHAProxyProtocolAccepted(header, null, null); } From b5954664cf87883f9598000f4aaaba246838c4aa Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Tue, 29 Oct 2024 16:47:44 +0100 Subject: [PATCH 2/3] Rename UnixDomainSocketJdkSupport to UnixDomainSocketNioTransport Signed-off-by: Thomas Segismont --- .../vertx/core/impl/transports/NioTransport.java | 16 ++++++++-------- ...rt.java => UnixDomainSocketNioTransport.java} | 10 +++++----- 2 files changed, 13 insertions(+), 13 deletions(-) rename vertx-core/src/main/java/io/vertx/core/impl/transports/{UnixDomainSocketJdkSupport.java => UnixDomainSocketNioTransport.java} (87%) diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java index 9a6d609eff6..5cb69a6ca41 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java @@ -28,17 +28,17 @@ public class NioTransport implements Transport { */ public static final Transport INSTANCE = new NioTransport(); - private final UnixDomainSocketJdkSupport unixDomainSocketJdkSupport = UnixDomainSocketJdkSupport.load(); + private final UnixDomainSocketNioTransport unixDomainSocketNioTransport = UnixDomainSocketNioTransport.load(); @Override public boolean supportsDomainSockets() { - return unixDomainSocketJdkSupport != null; + return unixDomainSocketNioTransport != null; } @Override public SocketAddress convert(io.vertx.core.net.SocketAddress address) { - if (address.isDomainSocket() && unixDomainSocketJdkSupport != null) { - return unixDomainSocketJdkSupport.convert(address); + if (address.isDomainSocket() && unixDomainSocketNioTransport != null) { + return unixDomainSocketNioTransport.convert(address); } else { return Transport.super.convert(address); } @@ -46,8 +46,8 @@ public SocketAddress convert(io.vertx.core.net.SocketAddress address) { @Override public io.vertx.core.net.SocketAddress convert(SocketAddress address) { - if (unixDomainSocketJdkSupport != null && unixDomainSocketJdkSupport.isUnixDomainSocketAddress(address)) { - return unixDomainSocketJdkSupport.convert(address); + if (unixDomainSocketNioTransport != null && unixDomainSocketNioTransport.isUnixDomainSocketAddress(address)) { + return unixDomainSocketNioTransport.convert(address); } return Transport.super.convert(address); } @@ -77,7 +77,7 @@ public DatagramChannel datagramChannel(InternetProtocolFamily family) { @Override public ChannelFactory channelFactory(boolean domainSocket) { if (domainSocket) { - if (unixDomainSocketJdkSupport == null) { + if (unixDomainSocketNioTransport == null) { throw new IllegalArgumentException("Domain sockets require JDK 16 and above, or the usage of a native transport"); } return NioDomainSocketChannel::new; @@ -89,7 +89,7 @@ public ChannelFactory channelFactory(boolean domainSocket) { @Override public ChannelFactory serverChannelFactory(boolean domainSocket) { if (domainSocket) { - if (unixDomainSocketJdkSupport == null) { + if (unixDomainSocketNioTransport == null) { throw new IllegalArgumentException("Domain sockets require JDK 16 and above, or the usage of a native transport"); } return NioServerDomainSocketChannel::new; diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/UnixDomainSocketJdkSupport.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/UnixDomainSocketNioTransport.java similarity index 87% rename from vertx-core/src/main/java/io/vertx/core/impl/transports/UnixDomainSocketJdkSupport.java rename to vertx-core/src/main/java/io/vertx/core/impl/transports/UnixDomainSocketNioTransport.java index 16c61c95ed6..747ac79283e 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/transports/UnixDomainSocketJdkSupport.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/transports/UnixDomainSocketNioTransport.java @@ -12,21 +12,21 @@ import static java.lang.invoke.MethodType.methodType; -class UnixDomainSocketJdkSupport { +class UnixDomainSocketNioTransport { - private static final Logger LOG = LoggerFactory.getLogger(UnixDomainSocketJdkSupport.class); + private static final Logger LOG = LoggerFactory.getLogger(UnixDomainSocketNioTransport.class); private final Class unixDomainSocketAddressClass; private final MethodHandle ofMethodHandle; private final MethodHandle getPathMethodHandle; - private UnixDomainSocketJdkSupport(Class unixDomainSocketAddressClass, MethodHandle ofMethodHandle, MethodHandle getPathMethodHandle) { + private UnixDomainSocketNioTransport(Class unixDomainSocketAddressClass, MethodHandle ofMethodHandle, MethodHandle getPathMethodHandle) { this.unixDomainSocketAddressClass = unixDomainSocketAddressClass; this.ofMethodHandle = ofMethodHandle; this.getPathMethodHandle = getPathMethodHandle; } - static UnixDomainSocketJdkSupport load() { + static UnixDomainSocketNioTransport load() { Class unixDomainSocketAddressClass; MethodHandle ofMethodHandle; MethodHandle getPathMethodHandle; @@ -36,7 +36,7 @@ static UnixDomainSocketJdkSupport load() { MethodHandles.Lookup lookup = MethodHandles.publicLookup(); ofMethodHandle = lookup.findStatic(unixDomainSocketAddressClass, "of", methodType(unixDomainSocketAddressClass, Path.class)); getPathMethodHandle = lookup.findVirtual(unixDomainSocketAddressClass, "getPath", methodType(Path.class)); - return new UnixDomainSocketJdkSupport(unixDomainSocketAddressClass, ofMethodHandle, getPathMethodHandle); + return new UnixDomainSocketNioTransport(unixDomainSocketAddressClass, ofMethodHandle, getPathMethodHandle); } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException e) { LOG.warn("JDK Unix Domain Socket support is not available", e); } From f51fed0b5896f7be0c0b9bfd834f49e98b4bd7a1 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Tue, 29 Oct 2024 19:13:44 +0100 Subject: [PATCH 3/3] Update documentation of Unix domain sockets support Removed the domain sockets section nested in native transports. Added examples of domain socket client/server in both the net.adoc and http.adoc files, with a reference to the native transports section for those who don't use JDK 16 and above. Signed-off-by: Thomas Segismont --- vertx-core/src/main/asciidoc/http.adoc | 35 +++++++++++---- vertx-core/src/main/asciidoc/index.adoc | 33 +------------- vertx-core/src/main/asciidoc/net.adoc | 20 ++++++++- .../src/main/java/examples/CoreExamples.java | 45 ++++++++++++++----- 4 files changed, 81 insertions(+), 52 deletions(-) diff --git a/vertx-core/src/main/asciidoc/http.adoc b/vertx-core/src/main/asciidoc/http.adoc index 5874776d652..dd5d89f0ef7 100644 --- a/vertx-core/src/main/asciidoc/http.adoc +++ b/vertx-core/src/main/asciidoc/http.adoc @@ -120,6 +120,15 @@ For example: {@link examples.HTTPExamples#example5} ---- +=== Listening to Unix domain sockets + +When running on JDK 16+, or using a <<_native_transports,native transport>>, a server can listen to Unix domain sockets: + +[source,$lang] +---- +{@link examples.CoreExamples#httpServerWithDomainSockets} +---- + === Getting notified of incoming requests To be notified when a request arrives you need to set a {@link io.vertx.core.http.HttpServer#requestHandler}: @@ -872,21 +881,31 @@ the same preface from the server. The http server may not support HTTP/2, the actual version can be checked with {@link io.vertx.core.http.HttpClientResponse#version()} when the response arrives. -When a clients connects to an HTTP/2 server, it sends to the server its {@link io.vertx.core.http.HttpClientOptions#getInitialSettings initial settings}. +When a client connects to an HTTP/2 server, it sends to the server its {@link io.vertx.core.http.HttpClientOptions#getInitialSettings initial settings}. The settings define how the server can use the connection, the default initial settings for a client are the default values defined by the HTTP/2 RFC. +=== Making connections to Unix domain sockets + +When running on JDK 16+, or using a <<_native_transports,native transport>>, a client can connect to Unix domain sockets: + +[source,$lang] +---- +{@link examples.CoreExamples#httpClientWithDomainSockets} +---- + === Pool configuration - For performance purpose, the client uses connection pooling when interacting with HTTP/1.1 servers. The pool creates up - to 5 connections per server. You can override the pool configuration like this: +For performance purpose, the client uses connection pooling when interacting with HTTP/1.1 servers. +The pool creates up to 5 connections per server. +You can override the pool configuration like this: - [source,$lang] - ---- - {@link examples.HTTPExamples#examplePoolConfiguration} - ---- +[source,$lang] +---- +{@link examples.HTTPExamples#examplePoolConfiguration} +---- - You can configure various pool {@link io.vertx.core.http.PoolOptions options} as follows +You can configure various pool {@link io.vertx.core.http.PoolOptions options} as follows - {@link io.vertx.core.http.PoolOptions options#setHttp1MaxSize} the maximum number of opened per HTTP/1.x server (5 by default) - {@link io.vertx.core.http.PoolOptions options#setHttp2MaxSize} the maximum number of opened per HTTP/2 server (1 by default), you *should* not change this value since a single HTTP/2 connection is capable of delivering the same performance level than multiple HTTP/1.x connections diff --git a/vertx-core/src/main/asciidoc/index.adoc b/vertx-core/src/main/asciidoc/index.adoc index fa9828263d0..b8ab388b442 100644 --- a/vertx-core/src/main/asciidoc/index.adoc +++ b/vertx-core/src/main/asciidoc/index.adoc @@ -1013,6 +1013,7 @@ You can add it to your classpath to improve the integration and remove the warni ---- +[#_native_transports] == Native transports Vert.x can run with http://netty.io/wiki/native-transports.html[native transports] (when available) on BSD (OSX) and Linux: @@ -1022,7 +1023,7 @@ Vert.x can run with http://netty.io/wiki/native-transports.html[native transport {@link examples.CoreExamples#configureNative()} ---- -NOTE: preferring native transport will not prevent the application to execute (for example a native dependency might be missing). If your application requires native transport, you need to check {@link io.vertx.core.Vertx#isNativeTransportEnabled()}. +NOTE: Preferring native transport will not prevent the application to execute (for example a native dependency might be missing).If your application requires native transport, you need to check {@link io.vertx.core.Vertx#isNativeTransportEnabled()}. You can also explicitly configure the transport to use: @@ -1104,36 +1105,6 @@ Native on BSD gives you extra networking options: {@link examples.CoreExamples#configureBSDOptions} ---- -=== Domain sockets - -Natives provide domain sockets support for servers: - -[source,$lang] ----- -{@link examples.CoreExamples#tcpServerWithDomainSockets} ----- - -or for http: - -[source,$lang] ----- -{@link examples.CoreExamples#httpServerWithDomainSockets} ----- - -As well as clients: - -[source,$lang] ----- -{@link examples.CoreExamples#tcpClientWithDomainSockets} ----- - -or for http: - -[source,$lang] ----- -{@link examples.CoreExamples#httpClientWithDomainSockets} ----- - == Security notes Vert.x is a toolkit, not an opinionated framework where we force you to do things in a certain way. This gives you diff --git a/vertx-core/src/main/asciidoc/net.adoc b/vertx-core/src/main/asciidoc/net.adoc index 31ca2b1e3dd..8bda021e18a 100644 --- a/vertx-core/src/main/asciidoc/net.adoc +++ b/vertx-core/src/main/asciidoc/net.adoc @@ -1,6 +1,6 @@ == Writing TCP servers and clients -Vert.x allows you to easily write non blocking TCP clients and servers. +Vert.x allows you to easily write non-blocking TCP clients and servers. === Creating a TCP server @@ -65,6 +65,15 @@ To find out the real port the server is listening on you can call {@link io.vert {@link examples.NetExamples#example5_1} ---- +=== Listening to Unix domain sockets + +When running on JDK 16+, or using a <<_native_transports,native transport>>, a server can listen to Unix domain sockets: + +[source,$lang] +---- +{@link examples.CoreExamples#tcpServerWithDomainSockets} +---- + === Getting notified of incoming connections To be notified when a connection is made you need to set a {@link io.vertx.core.net.NetServer#connectHandler(io.vertx.core.Handler)}: @@ -260,6 +269,15 @@ specifying the port and host of the server and a handler that will be called wit {@link examples.NetExamples#example15} ---- +=== Making connections to Unix domain sockets + +When running on JDK 16+, or using a <<_native_transports,native transport>>, a client can connect to Unix domain sockets: + +[source,$lang] +---- +{@link examples.CoreExamples#tcpClientWithDomainSockets} +---- + === Configuring connection attempts A client can be configured to automatically retry connecting to the server in the event that it cannot connect. diff --git a/vertx-core/src/main/java/examples/CoreExamples.java b/vertx-core/src/main/java/examples/CoreExamples.java index c7fd0dba004..0f00ef4f6fb 100644 --- a/vertx-core/src/main/java/examples/CoreExamples.java +++ b/vertx-core/src/main/java/examples/CoreExamples.java @@ -501,23 +501,41 @@ public void configureBSDOptions(Vertx vertx, boolean reusePort) { } public void tcpServerWithDomainSockets(Vertx vertx) { - // Only available on BSD and Linux - vertx.createNetServer().connectHandler(so -> { + NetServer netServer = vertx.createNetServer(); + + // Only available when running on JDK16+, or using a native transport + SocketAddress address = SocketAddress.domainSocketAddress("/var/tmp/myservice.sock"); + + netServer + .connectHandler(so -> { // Handle application - }).listen(SocketAddress.domainSocketAddress("/var/tmp/myservice.sock")); + }) + .listen(address) + .onComplete(ar -> { + if (ar.succeeded()) { + // Bound to socket + } else { + // Handle failure + } + }); } public void httpServerWithDomainSockets(Vertx vertx) { - vertx.createHttpServer() + HttpServer httpServer = vertx.createHttpServer(); + + // Only available when running on JDK16+, or using a native transport + SocketAddress address = SocketAddress.domainSocketAddress("/var/tmp/myservice.sock"); + + httpServer .requestHandler(req -> { // Handle application }) - .listen(SocketAddress.domainSocketAddress("/var/tmp/myservice.sock")) + .listen(address) .onComplete(ar -> { if (ar.succeeded()) { // Bound to socket } else { - ar.cause().printStackTrace(); + // Handle failure } }); } @@ -525,7 +543,7 @@ public void httpServerWithDomainSockets(Vertx vertx) { public void tcpClientWithDomainSockets(Vertx vertx) { NetClient netClient = vertx.createNetClient(); - // Only available on BSD and Linux + // Only available when running on JDK16+, or using a native transport SocketAddress addr = SocketAddress.domainSocketAddress("/var/tmp/myservice.sock"); // Connect to the server @@ -535,7 +553,7 @@ public void tcpClientWithDomainSockets(Vertx vertx) { if (ar.succeeded()) { // Connected } else { - ar.cause().printStackTrace(); + // Handle failure } }); } @@ -543,7 +561,7 @@ public void tcpClientWithDomainSockets(Vertx vertx) { public void httpClientWithDomainSockets(Vertx vertx) { HttpClient httpClient = vertx.createHttpClient(); - // Only available on BSD and Linux + // Only available when running on JDK16+, or using a native transport SocketAddress addr = SocketAddress.domainSocketAddress("/var/tmp/myservice.sock"); // Send request to the server @@ -552,10 +570,13 @@ public void httpClientWithDomainSockets(Vertx vertx) { .setHost("localhost") .setPort(8080) .setURI("/")) - .onSuccess(request -> { - request.send().onComplete(response -> { + .compose(request -> request.send().compose(HttpClientResponse::body)) + .onComplete(ar -> { + if (ar.succeeded()) { // Process response - }); + } else { + // Handle failure + } }); } }