diff --git a/.github/workflows/ci-5.x.yml b/.github/workflows/ci-5.x.yml index 6023b864909..9e2b308420f 100644 --- a/.github/workflows/ci-5.x.yml +++ b/.github/workflows/ci-5.x.yml @@ -17,14 +17,21 @@ jobs: jdk: 11 - os: ubuntu-latest jdk: 11 - profile: '-PtestNativeTransport' + profile: '-PNativeEpoll' - os: ubuntu-latest jdk: 11 - profile: '-PtestDomainSockets' + profile: '-PNativeIoUring' + - os: ubuntu-latest + jdk: 11 + profile: '-PNativeEpoll+DomainSockets' - os: ubuntu-latest jdk: 21 - os: windows-latest jdk: 11 + stable: true +# - os: macos-latest +# jdk: 11 +# profile: '-PNativeKQueue' uses: ./.github/workflows/ci.yml with: branch: ${{ github.event.pull_request.head.sha || github.ref_name }} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 943a6f9c277..c925cea261d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,16 +4,16 @@ on: inputs: branch: required: true - type: string + type: 'string' jdk: default: 8 - type: string + type: 'string' os: default: ubuntu-latest - type: string + type: 'string' profile: default: '' - type: string + type: 'string' jobs: Test: name: Run tests diff --git a/vertx-core/pom.xml b/vertx-core/pom.xml index e566e90099c..9515dd43386 100644 --- a/vertx-core/pom.xml +++ b/vertx-core/pom.xml @@ -29,7 +29,7 @@ 2.0.0.AM27 ${project.basedir}/src/main/generated 1.37 - false + jdk false @@ -88,6 +88,11 @@ netty-transport-classes-epoll true + + io.netty + netty-transport-classes-io_uring + true + io.netty netty-transport-classes-kqueue @@ -247,7 +252,7 @@ ${project.build.directory} ${project.version} true - ${vertx.testNativeTransport} + ${vertx.testTransport} ${vertx.testDomainSockets} true @@ -360,29 +365,24 @@ io/vertx/it/transport/TransportTest.java + io.netty:netty-transport-classes-io_uring io.netty:netty-transport-classes-epoll io.netty:netty-transport-classes-kqueue - service-loaded-native + io_uring integration-test verify - io/vertx/it/transport/ServiceLoadedTransportTest.java + io/vertx/it/transport/IoUringTest.java - - ${project.basedir}/src/test/classpath/customtransport - - io.netty:netty-transport-native-epoll io.netty:netty-transport-classes-epoll - io.netty:netty-transport-native-kqueue - io.netty:netty-transport-classes-kqueue @@ -648,26 +648,51 @@ - - testNativeTransport + NativeEpoll + + epoll + false + false + + + + + NativeIoUring - true + io_uring false false - - testDomainSockets + NativeEpoll+DomainSockets - true + epoll true false + + NativeKQueue + + kqueue + false + false + + + + + NativeKQueue+DomainSockets + + kqueue + true + false + + + benchmarks @@ -743,10 +768,35 @@ - linux + linux-x86_64 + + + linux + x86_64 + + + + + io.netty + netty-transport-native-epoll + linux-x86_64 + test + + + io.netty + netty-transport-native-io_uring + linux-x86_64 + test + + + + + + linux-amd64 linux + amd64 @@ -756,6 +806,36 @@ linux-x86_64 test + + io.netty + netty-transport-native-io_uring + linux-x86_64 + test + + + + + + linux-aarch64 + + + linux + aarch64 + + + + + io.netty + netty-transport-native-epoll + linux-aarch_64 + test + + + io.netty + netty-transport-native-io_uring + linux-aarch_64 + test + diff --git a/vertx-core/src/main/asciidoc/index.adoc b/vertx-core/src/main/asciidoc/index.adoc index 7b40f02fcaa..fa9828263d0 100644 --- a/vertx-core/src/main/asciidoc/index.adoc +++ b/vertx-core/src/main/asciidoc/index.adoc @@ -1022,10 +1022,24 @@ Vert.x can run with http://netty.io/wiki/native-transports.html[native transport {@link examples.CoreExamples#configureNative()} ---- -NOTE: preferring native transport will not prevent the application to execute (for example if a JAR is missing). -If your application requires native transport, you need to check {@link io.vertx.core.Vertx#isNativeTransportEnabled()}. +NOTE: preferring native transport will not prevent the application to execute (for example a native dependency might be missing). If your application requires native transport, you need to check {@link io.vertx.core.Vertx#isNativeTransportEnabled()}. -=== Native Linux Transport +You can also explicitly configure the transport to use: + +[source,$lang] +---- +{@link examples.CoreExamples#configureTransport()} +---- + +=== Native epoll + +Native on Linux gives you extra networking options: + +* `SO_REUSEPORT` +* `TCP_QUICKACK` +* `TCP_CORK` +* `TCP_FASTOPEN` +* `TCP_USER_TIMEOUT` You need to add the following dependency in your classpath: @@ -1039,20 +1053,21 @@ You need to add the following dependency in your classpath: ---- -Native on Linux gives you extra networking options: +=== Native io_uring -* `SO_REUSEPORT` -* `TCP_QUICKACK` -* `TCP_CORK` -* `TCP_FASTOPEN` -* `TCP_USER_TIMEOUT` +You need to add the following dependency in your classpath: -[source,$lang] +[source,xml] ---- -{@link examples.CoreExamples#configureLinuxOptions} + + io.netty + linux-x86_64 + netty-transport-native-io_uring + + ---- -=== Native BSD Transport +=== Native kqueue You need to add the following dependency in your classpath: diff --git a/vertx-core/src/main/java/examples/CoreExamples.java b/vertx-core/src/main/java/examples/CoreExamples.java index 116ed6bd126..2e71d236574 100644 --- a/vertx-core/src/main/java/examples/CoreExamples.java +++ b/vertx-core/src/main/java/examples/CoreExamples.java @@ -25,6 +25,7 @@ import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.VertxMetricsFactory; import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.core.transport.Transport; import java.util.Arrays; import java.util.concurrent.TimeUnit; @@ -472,6 +473,19 @@ public void configureNative() { System.out.println("Running with native: " + usingNative); } + public void configureTransport() { + + // Use epoll/kqueue/io_uring native transport depending on OS + Transport transport = Transport.nativeTransport(); + + // Or use a very specific transport + transport = Transport.EPOLL; + + Vertx vertx = Vertx.builder() + .withTransport(transport) + .build(); + } + public void configureLinuxOptions(Vertx vertx, boolean fastOpen, boolean cork, boolean quickAck, boolean reusePort) { // Available on Linux vertx.createHttpServer(new HttpServerOptions() diff --git a/vertx-core/src/main/java/io/vertx/core/Vertx.java b/vertx-core/src/main/java/io/vertx/core/Vertx.java index 6e7f642b1df..c6e7cdba84f 100644 --- a/vertx-core/src/main/java/io/vertx/core/Vertx.java +++ b/vertx-core/src/main/java/io/vertx/core/Vertx.java @@ -20,6 +20,7 @@ import io.vertx.core.file.FileSystem; import io.vertx.core.http.*; import io.vertx.core.impl.VertxImpl; +import io.vertx.core.impl.transports.TransportInternal; import io.vertx.core.internal.ContextInternal; import io.vertx.core.dns.impl.DnsAddressResolverProvider; import io.vertx.core.internal.VertxBootstrap; @@ -33,6 +34,7 @@ import io.vertx.core.spi.VertxMetricsFactory; import io.vertx.core.spi.VertxTracerFactory; import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.core.transport.Transport; import java.util.Set; import java.util.concurrent.Callable; @@ -79,6 +81,7 @@ static io.vertx.core.VertxBuilder builder() { private ClusterManager clusterManager; private VertxMetricsFactory metricsFactory; private VertxTracerFactory tracerFactory; + private Transport transport; @Override public io.vertx.core.VertxBuilder with(VertxOptions options) { this.options = options; @@ -100,27 +103,39 @@ public io.vertx.core.VertxBuilder withClusterManager(ClusterManager clusterManag return this; } @Override - public Vertx build() { - VertxBootstrap builder = VertxBootstrap.create(); + public VertxBuilder withTransport(Transport transport) { + this.transport = transport; + return this; + } + private VertxBootstrap bootstrap() { + VertxBootstrap bootstrap = VertxBootstrap.create(); if (options != null) { - builder.options(options); + bootstrap.options(options); + } + bootstrap.metricsFactory(metricsFactory); + bootstrap.tracerFactory(tracerFactory); + Transport tr = transport; + if (tr == null && options != null && options.getPreferNativeTransport()) { + tr = Transport.nativeTransport(); + } + if (tr == null) { + tr = Transport.NIO; } - builder.metricsFactory(metricsFactory); - builder.tracerFactory(tracerFactory); - builder.init(); - return builder.vertx(); + bootstrap.transport(tr.implementation()); + return bootstrap; + } + @Override + public Vertx build() { + return bootstrap() + .init() + .vertx(); } @Override public Future buildClustered() { - VertxBootstrap builder = VertxBootstrap.create(); - if (options != null) { - builder.options(options); - } - builder.clusterManager(clusterManager); - builder.metricsFactory(metricsFactory); - builder.tracerFactory(tracerFactory); - builder.init(); - return builder.clusteredVertx(); + return bootstrap() + .clusterManager(clusterManager) + .init() + .clusteredVertx(); } }; } @@ -141,7 +156,7 @@ static Vertx vertx() { * @return the instance */ static Vertx vertx(VertxOptions options) { - return VertxBootstrap.create().options(options).init().vertx(); + return builder().with(options).build(); } /** @@ -153,7 +168,7 @@ static Vertx vertx(VertxOptions options) { * @return a future completed with the clustered vertx */ static Future clusteredVertx(VertxOptions options) { - return VertxBootstrap.create().options(options).init().clusteredVertx(); + return builder().with(options).buildClustered(); } /** diff --git a/vertx-core/src/main/java/io/vertx/core/VertxBuilder.java b/vertx-core/src/main/java/io/vertx/core/VertxBuilder.java index 87a27322824..f05179f0cf5 100644 --- a/vertx-core/src/main/java/io/vertx/core/VertxBuilder.java +++ b/vertx-core/src/main/java/io/vertx/core/VertxBuilder.java @@ -7,6 +7,7 @@ import io.vertx.core.spi.VertxMetricsFactory; import io.vertx.core.spi.VertxTracerFactory; import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.core.transport.Transport; /** * A builder for creating Vert.x instances, allowing to configure Vert.x plugins: @@ -64,6 +65,16 @@ public interface VertxBuilder { @Fluent VertxBuilder withTracer(VertxTracerFactory factory); + /** + * Programmatically set the transport, this overrides {@link VertxOptions#setPreferNativeTransport(boolean)} + * + * @param transport the transport + * @return a reference to this, so the API can be used fluently + */ + @GenIgnore(GenIgnore.PERMITTED_TYPE) + @Fluent + VertxBuilder withTransport(Transport transport); + /** * Programmatically set the cluster manager to be used when clustering. *

diff --git a/vertx-core/src/main/java/io/vertx/core/VertxOptions.java b/vertx-core/src/main/java/io/vertx/core/VertxOptions.java index ec2dbb38453..0f577ca2e44 100644 --- a/vertx-core/src/main/java/io/vertx/core/VertxOptions.java +++ b/vertx-core/src/main/java/io/vertx/core/VertxOptions.java @@ -521,14 +521,14 @@ public VertxOptions setAddressResolverOptions(AddressResolverOptions addressReso } /** - * @return wether to prefer the native transport to the JDK transport + * @return whether to prefer the native transport to the NIO transport */ public boolean getPreferNativeTransport() { return preferNativeTransport; } /** - * Set wether to prefer the native transport to the JDK transport. + * Set whether to prefer the native transport to the NIO transport. * * @param preferNativeTransport {@code true} to prefer the native transport * @return a reference to this, so the API can be used fluently diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketConnectionImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketConnectionImpl.java index 82cbd6675b5..9bb52c09d9f 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketConnectionImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketConnectionImpl.java @@ -144,7 +144,7 @@ protected void handleClosed() { if (timeout != null) { timeout.cancel(false); } - if (closePromise != null && !closePromise.isDone()) { + if (closePromise != null) { closePromise.setSuccess(); } Object metric = null; @@ -203,7 +203,9 @@ private void finishClose() { ScheduledFuture timeout = closingTimeout; if (timeout == null || timeout.cancel(false)) { closingTimeout = null; - super.handleClose(closeReason, closePromise); + ChannelPromise p = closePromise; + closePromise = null; + super.handleClose(closeReason, p); } } diff --git a/vertx-core/src/main/java/io/vertx/core/impl/Utils.java b/vertx-core/src/main/java/io/vertx/core/impl/Utils.java index 9b604093d99..01c90b2d24f 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/Utils.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/Utils.java @@ -25,26 +25,35 @@ public class Utils { private static final boolean isLinux; private static final boolean isWindows; + private static final boolean isOsx; static { isLinux = "linux".equals(PlatformDependent.normalizedOs()); isWindows = PlatformDependent.isWindows(); + isOsx = PlatformDependent.isOsx(); } /** - * @return true, if running on Linux + * @return {@code true}, if running on Linux */ public static boolean isLinux() { return isLinux; } /** - * @return true, if running on Windows + * @return {@code true}, if running on Windows */ public static boolean isWindows() { return isWindows; } + /** + * @return {@code true}, if running on Mac + */ + public static boolean isOsx() { + return isOsx; + } + @SuppressWarnings("unchecked") public static void throwAsUnchecked(Throwable t) throws E { throw (E) t; diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java index 9770ef70a7c..a1873c66cac 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java @@ -12,9 +12,7 @@ package io.vertx.core.impl; import io.vertx.core.*; -import io.vertx.core.impl.transports.EpollTransport; -import io.vertx.core.impl.transports.JDKTransport; -import io.vertx.core.impl.transports.KQueueTransport; +import io.vertx.core.impl.transports.NioTransport; import io.vertx.core.internal.VertxBootstrap; import io.vertx.core.spi.context.executor.EventExecutorProvider; import io.vertx.core.spi.file.FileResolver; @@ -35,7 +33,6 @@ import io.vertx.core.spi.tracing.VertxTracer; import java.util.Collection; -import java.util.Iterator; import java.util.List; /** @@ -50,7 +47,6 @@ public class VertxBootstrapImpl implements VertxBootstrap { private VertxOptions options; private JsonObject config; private Transport transport; - private Throwable transportUnavailabilityCause; private EventExecutorProvider eventExecutorProvider; private ClusterManager clusterManager; private NodeSelector clusterNodeSelector; @@ -211,20 +207,34 @@ public VertxBootstrapImpl executorServiceFactory(ExecutorServiceFactory factory) return this; } - public Vertx vertx() { + private VertxImpl instantiateVertx(ClusterManager clusterManager, NodeSelector nodeSelector) { checkBeforeInstantiating(); - VertxImpl vertx = new VertxImpl( + Transport tr = transport; + Throwable transportUnavailabilityCause = null; + if (tr != null) { + if (!tr.isAvailable()) { + transportUnavailabilityCause = tr.unavailabilityCause(); + tr = NioTransport.INSTANCE; + } + } else { + tr = NioTransport.INSTANCE; + } + return new VertxImpl( options, - null, - null, + clusterManager, + nodeSelector, metrics, tracer, - transport, + tr, transportUnavailabilityCause, fileResolver, threadFactory, executorServiceFactory, eventExecutorProvider); + } + + public Vertx vertx() { + VertxImpl vertx = instantiateVertx(null, null); vertx.init(); return vertx; } @@ -233,22 +243,14 @@ public Vertx vertx() { * Build and return the clustered vertx instance */ public Future clusteredVertx() { - checkBeforeInstantiating(); if (clusterManager == null) { throw new IllegalStateException("No ClusterManagerFactory instances found on classpath"); } - VertxImpl vertx = new VertxImpl( - options, - clusterManager, - clusterNodeSelector == null ? new DefaultNodeSelector() : clusterNodeSelector, - metrics, - tracer, - transport, - transportUnavailabilityCause, - fileResolver, - threadFactory, - executorServiceFactory, - eventExecutorProvider); + NodeSelector nodeSelector = clusterNodeSelector; + if (nodeSelector == null) { + nodeSelector = new DefaultNodeSelector(); + } + VertxImpl vertx = instantiateVertx(clusterManager, nodeSelector); return vertx.initClustered(options); } @@ -294,20 +296,6 @@ private void initTracing() { } private void initTransport() { - if (transport != null) { - return; - } - Transport t = findTransport(options.getPreferNativeTransport()); - if (t != null) { - if (t.isAvailable()) { - transport = t; - } else { - transport = JDKTransport.INSTANCE; - transportUnavailabilityCause = t.unavailabilityCause(); - } - } else { - transport = JDKTransport.INSTANCE; - } } private void initFileResolver() { @@ -351,48 +339,4 @@ private void checkMetrics() { "contains the factory FQCN, or metricsOptions.getFactory() returns a non null value"); } } - - /** - * The native transport, it may be {@code null} or failed. - */ - public static Transport nativeTransport() { - Transport transport = null; - try { - Transport epoll = new EpollTransport(); - if (epoll.isAvailable()) { - return epoll; - } else { - transport = epoll; - } - } catch (Throwable ignore) { - // Jar not here - } - try { - Transport kqueue = new KQueueTransport(); - if (kqueue.isAvailable()) { - return kqueue; - } else if (transport == null) { - transport = kqueue; - } - } catch (Throwable ignore) { - // Jar not here - } - return transport; - } - - static Transport findTransport(boolean preferNative) { - if (preferNative) { - Collection transports = ServiceHelper.loadFactories(Transport.class); - Iterator it = transports.iterator(); - while (it.hasNext()) { - Transport transport = it.next(); - if (transport.isAvailable()) { - return transport; - } - } - return nativeTransport(); - } else { - return JDKTransport.INSTANCE; - } - } } diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java index ccfc8614b0e..5512c91588e 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -43,7 +43,7 @@ import io.vertx.core.internal.threadchecker.BlockedThreadChecker; import io.vertx.core.net.*; import io.vertx.core.net.impl.*; -import io.vertx.core.impl.transports.JDKTransport; +import io.vertx.core.impl.transports.NioTransport; import io.vertx.core.spi.context.executor.EventExecutorProvider; import io.vertx.core.spi.file.FileResolver; import io.vertx.core.file.impl.FileSystemImpl; @@ -370,7 +370,7 @@ public Cleaner cleaner() { @Override public boolean isNativeTransportEnabled() { - return !(transport instanceof JDKTransport); + return !(transport instanceof NioTransport); } @Override diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/IoUringTransport.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/IoUringTransport.java new file mode 100644 index 00000000000..5660d389076 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/impl/transports/IoUringTransport.java @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.impl.transports; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.InternetProtocolFamily; +import io.netty.channel.unix.DomainSocketAddress; +import io.netty.channel.uring.*; +import io.vertx.core.datagram.DatagramSocketOptions; +import io.vertx.core.net.ClientOptionsBase; +import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.impl.SocketAddressImpl; +import io.vertx.core.spi.transport.Transport; + +import java.net.SocketAddress; + +/** + * @author Julien Viet + */ +public class IoUringTransport implements Transport { + + private static volatile int pendingFastOpenRequestsThreshold = 256; + + /** + * Return the number of pending TFO connections in SYN-RCVD state for TCP_FASTOPEN. + *

+ * {@see #setPendingFastOpenRequestsThreshold} + */ + public static int getPendingFastOpenRequestsThreshold() { + return pendingFastOpenRequestsThreshold; + } + + /** + * Set the number of pending TFO connections in SYN-RCVD state for TCP_FASTOPEN + *

+ * If this value goes over a certain limit the server disables all TFO connections. + */ + public static void setPendingFastOpenRequestsThreshold(int value) { + if (value < 0) { + throw new IllegalArgumentException("Invalid " + value); + } + pendingFastOpenRequestsThreshold = value; + } + + public IoUringTransport() { + } + + @Override + public boolean supportsDomainSockets() { + return false; + } + + @Override + public boolean supportFileRegion() { + return false; + } + + @Override + public SocketAddress convert(io.vertx.core.net.SocketAddress address) { + if (address.isDomainSocket()) { + throw new IllegalArgumentException("Domain socket not supported by IOUring transport"); + } + return Transport.super.convert(address); + } + + @Override + public io.vertx.core.net.SocketAddress convert(SocketAddress address) { + if (address instanceof DomainSocketAddress) { + return new SocketAddressImpl(((DomainSocketAddress) address).path()); + } + return Transport.super.convert(address); + } + + @Override + public boolean isAvailable() { + return IoUring.isAvailable(); + } + + @Override + public Throwable unavailabilityCause() { + return IoUring.unavailabilityCause(); + } + + @Override + public IoHandlerFactory ioHandlerFactory() { + return IoUringIoHandler.newFactory(); + } + + @Override + public DatagramChannel datagramChannel() { + return new IoUringDatagramChannel(); + } + + @Override + public DatagramChannel datagramChannel(InternetProtocolFamily family) { + return new IoUringDatagramChannel(); + } + + @Override + public ChannelFactory channelFactory(boolean domainSocket) { + if (domainSocket) { + throw new IllegalArgumentException(); + } + return IoUringSocketChannel::new; + } + + @Override + public ChannelFactory serverChannelFactory(boolean domainSocket) { + if (domainSocket) { + throw new IllegalArgumentException(); + } + return IoUringServerSocketChannel::new; + } + + @Override + public void configure(DatagramChannel channel, DatagramSocketOptions options) { + channel.config().setOption(IoUringChannelOption.SO_REUSEPORT, options.isReusePort()); + Transport.super.configure(channel, options); + } + + @Override + public void configure(NetServerOptions options, boolean domainSocket, ServerBootstrap bootstrap) { + if (domainSocket) { + throw new IllegalArgumentException(); + } + bootstrap.option(IoUringChannelOption.SO_REUSEPORT, options.isReusePort()); + if (options.isTcpFastOpen()) { + bootstrap.option(IoUringChannelOption.TCP_FASTOPEN, options.isTcpFastOpen() ? pendingFastOpenRequestsThreshold : 0); + } + bootstrap.childOption(IoUringChannelOption.TCP_QUICKACK, options.isTcpQuickAck()); + bootstrap.childOption(IoUringChannelOption.TCP_CORK, options.isTcpCork()); + Transport.super.configure(options, false, bootstrap); + } + + @Override + public void configure(ClientOptionsBase options, int connectTimeout, boolean domainSocket, Bootstrap bootstrap) { + if (domainSocket) { + throw new IllegalArgumentException(); + } + if (options.isTcpFastOpen()) { + bootstrap.option(IoUringChannelOption.TCP_FASTOPEN_CONNECT, options.isTcpFastOpen()); + } + bootstrap.option(IoUringChannelOption.TCP_QUICKACK, options.isTcpQuickAck()); + bootstrap.option(IoUringChannelOption.TCP_CORK, options.isTcpCork()); + Transport.super.configure(options, connectTimeout, false, bootstrap); + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/JDKTransport.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java similarity index 85% rename from vertx-core/src/main/java/io/vertx/core/impl/transports/JDKTransport.java rename to vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java index e214c118b0b..51068bd60f2 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/transports/JDKTransport.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java @@ -19,11 +19,11 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.vertx.core.spi.transport.Transport; -public class JDKTransport implements Transport { +public class NioTransport implements Transport { /** - * The JDK transport, always there. + * The NIO transport, always there. */ - public static final Transport INSTANCE = new JDKTransport(); + public static final Transport INSTANCE = new NioTransport(); @Override public IoHandlerFactory ioHandlerFactory() { @@ -54,7 +54,7 @@ public ChannelFactory channelFactory(boolean domainSocket) { public ChannelFactory serverChannelFactory(boolean domainSocket) { if (domainSocket) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("The Vertx instance must be created with the preferNativeTransport option set to true to create domain sockets"); } return NioServerSocketChannel::new; } diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/TransportInternal.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/TransportInternal.java new file mode 100644 index 00000000000..a96be8cbfb7 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/impl/transports/TransportInternal.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.impl.transports; + +import io.vertx.core.transport.Transport; + +public final class TransportInternal implements Transport { + + private final String name; + private final boolean available; + private final Throwable unavailabilityCause; + private final io.vertx.core.spi.transport.Transport implementation; + + public TransportInternal(String name, + boolean available, + Throwable unavailabilityCause, + io.vertx.core.spi.transport.Transport implementation) { + this.name = name; + this.available = available; + this.unavailabilityCause = unavailabilityCause; + this.implementation = implementation; + } + + @Override + public String name() { + return name; + } + + @Override + public boolean available() { + return available; + } + + @Override + public Throwable unavailabilityCause() { + return unavailabilityCause; + } + + /** + * @return the transport implementation + */ + public io.vertx.core.spi.transport.Transport implementation() { + return implementation; + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/TransportLoader.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/TransportLoader.java new file mode 100644 index 00000000000..20022f42b43 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/impl/transports/TransportLoader.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.impl.transports; + +import io.vertx.core.transport.Transport; + +/** + * @author Julien Viet + */ +public class TransportLoader { + + public static Transport epoll() { + try { + EpollTransport impl = new EpollTransport(); + boolean available = impl.isAvailable(); + Throwable unavailabilityCause = impl.unavailabilityCause(); + return new TransportInternal("epoll", available, unavailabilityCause, impl); + } catch (Throwable ignore) { + // Jar not here + } + return null; + } + + public static Transport io_uring() { + try { + IoUringTransport impl = new IoUringTransport(); + boolean available = impl.isAvailable(); + Throwable unavailabilityCause = impl.unavailabilityCause(); + return new TransportInternal("io_uring", available, unavailabilityCause, impl); + } catch (Throwable ignore) { + // Jar not here + } + return null; + } + + public static Transport kqueue() { + try { + KQueueTransport impl = new KQueueTransport(); + boolean available = impl.isAvailable(); + Throwable unavailabilityCause = impl.unavailabilityCause(); + return new TransportInternal("kqueue", available, unavailabilityCause, impl); + } catch (Throwable ignore) { + // Jar not here + } + return null; + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java b/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java index 787c5ab85ac..747a7c6fb06 100644 --- a/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java +++ b/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java @@ -21,7 +21,7 @@ import io.vertx.core.net.NetServerOptions; import io.vertx.core.buffer.impl.PartialPooledByteBufAllocator; import io.vertx.core.net.impl.SocketAddressImpl; -import io.vertx.core.impl.transports.JDKTransport; +import io.vertx.core.impl.transports.NioTransport; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -65,7 +65,7 @@ default Throwable unavailabilityCause() { default SocketAddress convert(io.vertx.core.net.SocketAddress address) { if (address.isDomainSocket()) { - throw new IllegalArgumentException("Domain socket are not supported by JDK transport, you need to use native transport to use them"); + throw new IllegalArgumentException("Domain socket are not supported by NIO transport, you need to use native transport to use them"); } else { InetAddress ip = ((SocketAddressImpl) address).ipAddress(); if (ip != null) { @@ -134,7 +134,7 @@ default void configure(DatagramChannel channel, DatagramSocketOptions options) { channel.config().setTrafficClass(options.getTrafficClass()); } channel.config().setBroadcast(options.isBroadcast()); - if (this instanceof JDKTransport) { + if (this instanceof NioTransport) { channel.config().setLoopbackModeDisabled(options.isLoopbackModeDisabled()); if (options.getMulticastTimeToLive() != -1) { channel.config().setTimeToLive(options.getMulticastTimeToLive()); diff --git a/vertx-core/src/main/java/io/vertx/core/transport/Transport.java b/vertx-core/src/main/java/io/vertx/core/transport/Transport.java new file mode 100644 index 00000000000..35fb6bc390b --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/transport/Transport.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.transport; + +import io.vertx.core.impl.Utils; +import io.vertx.core.impl.transports.NioTransport; +import io.vertx.core.impl.transports.TransportInternal; +import io.vertx.core.impl.transports.TransportLoader; + +/** + * The transport used by a {@link io.vertx.core.Vertx} instance. + * + * @author Julien Viet + */ +public interface Transport { + + /** + * Nio transport, always available based on ${code java.nio} API. + */ + Transport NIO = new TransportInternal("nio", true, null, NioTransport.INSTANCE); + + /** + * Native transport based on Netty native kqueue transport. + */ + Transport KQUEUE = TransportLoader.kqueue(); + + /** + * Native transport based on Netty native epoll transport. + */ + Transport EPOLL = TransportLoader.epoll(); + + /** + * Native transport based on Netty native io_uring transport. + */ + Transport IO_URING = TransportLoader.io_uring(); + + /** + * @return the name among {@code nio, kqueue, epoll, io_uring} + */ + String name(); + + /** + * Return a native transport suitable for the OS + * + *

+ * + * @return a native transport, it might return an unavailable transport ({@link Transport#available()}) then {@link Transport#unavailabilityCause()} + * can be used to check the error preventing its unsafe, {@code null} can be returned when no native transport can be loaded. + */ + static Transport nativeTransport() { + Transport transport; + if (Utils.isLinux()) { + transport = EPOLL; + if (transport != null) { + if (!transport.available() && IO_URING != null && IO_URING.available()) { + transport = IO_URING; + } + } else { + transport = IO_URING; + } + } else if (Utils.isOsx()) { + transport = KQUEUE; + } else { + transport = null; + } + return transport; + } + + /** + * @return whether the transport can be used by a Vert.x instance + */ + boolean available(); + + /** + * @return the unavailability cause when {#link {@link #available()}} returns true, otherwise {@code null} + */ + Throwable unavailabilityCause(); + + /** + * @return the implementation + */ + io.vertx.core.spi.transport.Transport implementation(); +} diff --git a/vertx-core/src/main/java/module-info.java b/vertx-core/src/main/java/module-info.java index 99dabb11867..e0ce7ede921 100644 --- a/vertx-core/src/main/java/module-info.java +++ b/vertx-core/src/main/java/module-info.java @@ -21,6 +21,7 @@ // Optional requires static com.fasterxml.jackson.databind; + requires static io.netty.transport.classes.io_uring; requires static io.netty.transport.classes.epoll; requires static io.netty.transport.classes.kqueue; requires static io.netty.transport.unix.common; @@ -42,7 +43,6 @@ uses io.vertx.core.spi.VertxServiceProvider; uses io.vertx.core.spi.VerticleFactory; uses io.vertx.core.spi.JsonFactory; - uses io.vertx.core.spi.transport.Transport; // API @@ -64,6 +64,7 @@ exports io.vertx.core.streams; exports io.vertx.core.spi; exports io.vertx.core.file; + exports io.vertx.core.transport; // SPI @@ -94,6 +95,7 @@ // Testing + exports io.vertx.core.impl to io.vertx.core.tests; exports io.vertx.core.impl.cpu to io.vertx.core.tests; exports io.vertx.core.impl.future to io.vertx.core.tests; exports io.vertx.core.impl.utils to io.vertx.core.tests; @@ -114,6 +116,5 @@ exports io.vertx.core.spi.cluster.impl.selector to io.vertx.core.tests; exports io.vertx.core.impl.verticle to io.vertx.core.tests; exports io.vertx.core.impl.deployment to io.vertx.core.tests; - exports io.vertx.core.impl to io.vertx.core.tests; } diff --git a/vertx-core/src/test/classpath/customtransport/META-INF/services/io.vertx.core.spi.transport.Transport b/vertx-core/src/test/classpath/customtransport/META-INF/services/io.vertx.core.spi.transport.Transport deleted file mode 100644 index 62c342da433..00000000000 --- a/vertx-core/src/test/classpath/customtransport/META-INF/services/io.vertx.core.spi.transport.Transport +++ /dev/null @@ -1 +0,0 @@ -io.vertx.it.transport.CustomTransport diff --git a/vertx-core/src/test/java/io/vertx/it/transport/CustomTransport.java b/vertx-core/src/test/java/io/vertx/it/transport/CustomTransport.java deleted file mode 100644 index 6962a4e90bc..00000000000 --- a/vertx-core/src/test/java/io/vertx/it/transport/CustomTransport.java +++ /dev/null @@ -1,45 +0,0 @@ -package io.vertx.it.transport; - -import io.netty.channel.*; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.InternetProtocolFamily; -import io.vertx.core.impl.transports.JDKTransport; -import io.vertx.core.spi.transport.Transport; - -public class CustomTransport implements Transport { - - @Override - public boolean isAvailable() { - return true; - } - - @Override - public Throwable unavailabilityCause() { - return null; - } - - @Override - public IoHandlerFactory ioHandlerFactory() { - return JDKTransport.INSTANCE.ioHandlerFactory(); - } - - @Override - public DatagramChannel datagramChannel() { - throw new UnsupportedOperationException(); - } - - @Override - public DatagramChannel datagramChannel(InternetProtocolFamily family) { - throw new UnsupportedOperationException(); - } - - @Override - public ChannelFactory channelFactory(boolean domainSocket) { - throw new UnsupportedOperationException(); - } - - @Override - public ChannelFactory serverChannelFactory(boolean domainSocket) { - throw new UnsupportedOperationException(); - } -} diff --git a/vertx-core/src/test/java/io/vertx/it/transport/IoUringTest.java b/vertx-core/src/test/java/io/vertx/it/transport/IoUringTest.java new file mode 100644 index 00000000000..a986c9b1895 --- /dev/null +++ b/vertx-core/src/test/java/io/vertx/it/transport/IoUringTest.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.it.transport; + +import io.vertx.core.impl.Utils; +import io.vertx.core.transport.Transport; +import io.vertx.test.core.AsyncTestBase; +import org.junit.Test; + +public class IoUringTest extends AsyncTestBase { + @Test + public void testNativeTransportFallback() { + if (Utils.isLinux()) { + Transport nativeTransport = Transport.nativeTransport(); + assertEquals("io_uring", nativeTransport.name()); + } + } +} diff --git a/vertx-core/src/test/java/io/vertx/it/transport/ServiceLoadedTransportTest.java b/vertx-core/src/test/java/io/vertx/it/transport/ServiceLoadedTransportTest.java deleted file mode 100644 index bd85eee2579..00000000000 --- a/vertx-core/src/test/java/io/vertx/it/transport/ServiceLoadedTransportTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 - * which is available at https://www.apache.org/licenses/LICENSE-2.0. - * - * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 - */ - -package io.vertx.it.transport; - -import io.vertx.core.Vertx; -import io.vertx.core.VertxOptions; -import io.vertx.core.internal.VertxInternal; -import io.vertx.test.core.AsyncTestBase; -import org.junit.Test; - -/** - * @author Julien Viet - */ -public class ServiceLoadedTransportTest extends AsyncTestBase { - - private Vertx vertx; - - @Override - protected void tearDown() throws Exception { - close(vertx); - super.tearDown(); - } - - - @Test - public void testFallbackOnJDK() throws Exception { - vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true)); - assertTrue(vertx.isNativeTransportEnabled()); - assertNotSame(CustomTransport.class, ((VertxInternal)vertx).transport()); - } -} diff --git a/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java b/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java index c8f39f03fe7..31581abc39d 100644 --- a/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java +++ b/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java @@ -21,6 +21,7 @@ import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.tracing.VertxTracer; import io.vertx.core.tracing.TracingOptions; +import io.vertx.core.transport.Transport; import io.vertx.test.fakecluster.FakeClusterManager; import junit.framework.AssertionFailedError; import org.junit.Assert; @@ -31,6 +32,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -41,11 +43,76 @@ */ public class VertxTestBase extends AsyncTestBase { - public static final boolean USE_NATIVE_TRANSPORT = Boolean.getBoolean("vertx.useNativeTransport"); + public static final Transport TRANSPORT; public static final boolean USE_DOMAIN_SOCKETS = Boolean.getBoolean("vertx.useDomainSockets"); public static final boolean USE_JAVA_MODULES = VertxTestBase.class.getModule().isNamed(); private static final Logger log = LoggerFactory.getLogger(VertxTestBase.class); + static { + + Transport transport = null; + String transportName = System.getProperty("vertx.transport"); + if (transportName != null) { + String t = transportName.toLowerCase(Locale.ROOT); + switch (t) { + case "jdk": + transport = Transport.NIO; + break; + case "kqueue": + transport = Transport.KQUEUE; + break; + case "epoll": + transport = Transport.EPOLL; + break; + case "io_uring": + transport = Transport.IO_URING; + break; + default: + transport = new Transport() { + @Override + public String name() { + return transportName; + } + @Override + public boolean available() { + return false; + } + @Override + public Throwable unavailabilityCause() { + return new RuntimeException("Transport " + transportName + " does not exist"); + } + @Override + public io.vertx.core.spi.transport.Transport implementation() { + throw new IllegalStateException("Transport " + transportName + " does not exist"); + } + }; + } + if (transport == null) { + transport = new Transport() { + @Override + public String name() { + return t; + } + @Override + public boolean available() { + return false; + } + @Override + public Throwable unavailabilityCause() { + return new IllegalStateException("Transport " + t + " not available"); + } + @Override + public io.vertx.core.spi.transport.Transport implementation() { + return null; + } + }; + } + } else { + transport = Transport.NIO; + } + TRANSPORT = transport; + } + @Rule public RepeatRule repeatRule = new RepeatRule(); @@ -92,9 +159,7 @@ protected VertxMetricsFactory getMetrics() { } protected VertxOptions getOptions() { - VertxOptions options = new VertxOptions(); - options.setPreferNativeTransport(USE_NATIVE_TRANSPORT); - return options; + return new VertxOptions(); } protected void tearDown() throws Exception { @@ -141,11 +206,18 @@ protected VertxBuilder createVertxBuilder(VertxOptions options) { builder.withMetrics(metrics); options = new VertxOptions(options).setMetricsOptions(new MetricsOptions().setEnabled(true)); } + builder.withTransport(TRANSPORT); return builder.with(options); } protected Vertx createVertx(VertxOptions options) { - return createVertxBuilder(options).build(); + Vertx vertx = createVertxBuilder(options).build(); + if (TRANSPORT != Transport.NIO) { + if (!vertx.isNativeTransportEnabled()) { + fail(vertx.unavailableNativeTransportCause()); + } + } + return vertx; } /** diff --git a/vertx-core/src/test/java/io/vertx/test/http/HttpTestBase.java b/vertx-core/src/test/java/io/vertx/test/http/HttpTestBase.java index 97b931fc8f1..6811955ac35 100644 --- a/vertx-core/src/test/java/io/vertx/test/http/HttpTestBase.java +++ b/vertx-core/src/test/java/io/vertx/test/http/HttpTestBase.java @@ -15,6 +15,7 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.http.*; +import io.vertx.core.impl.transports.TransportInternal; import io.vertx.core.net.ProxyType; import io.vertx.core.net.SocketAddress; import io.vertx.test.core.TestUtils; @@ -74,7 +75,7 @@ public void setUp() throws Exception { */ protected void configureDomainSockets() throws Exception { if (USE_DOMAIN_SOCKETS) { - assertTrue("Native transport not enabled", USE_NATIVE_TRANSPORT); + assertTrue("Native transport not enabled", TRANSPORT.implementation().supportsDomainSockets()); tmp = TestUtils.tmpFile(".sock"); testAddress = SocketAddress.domainSocketAddress(tmp.getAbsolutePath()); requestOptions.setServer(testAddress); diff --git a/vertx-core/src/test/java/io/vertx/test/proxy/HttpProxy.java b/vertx-core/src/test/java/io/vertx/test/proxy/HttpProxy.java index e1a095c0df6..3cb442cd331 100644 --- a/vertx-core/src/test/java/io/vertx/test/proxy/HttpProxy.java +++ b/vertx-core/src/test/java/io/vertx/test/proxy/HttpProxy.java @@ -84,16 +84,16 @@ public HttpProxy start(Vertx vertx) throws Exception { String auth = request.getHeader("Proxy-Authorization"); String expected = "Basic " + Base64.getEncoder().encodeToString((username + ":" + username).getBytes()); if (auth == null || !auth.equals(expected)) { - request.response().setStatusCode(407).end("proxy authentication failed"); + request.response().setStatusCode(407).end("Proxy authentication failed"); return; } } lastRequestHeaders = HttpHeaders.headers().addAll(request.headers()); if (error != 0) { - request.response().setStatusCode(error).end("proxy request failed"); + request.response().setStatusCode(error).end("Proxy request failed"); } else if (method == HttpMethod.CONNECT) { if (!uri.contains(":")) { - request.response().setStatusCode(403).end("invalid request"); + request.response().setStatusCode(403).end("Invalid request"); } else { lastUri = uri; lastMethod = HttpMethod.CONNECT; diff --git a/vertx-core/src/test/java/io/vertx/tests/addressresolver/ResolvingHttpClientTest.java b/vertx-core/src/test/java/io/vertx/tests/addressresolver/ResolvingHttpClientTest.java index 70deb6eb6c1..c3fa98453f4 100644 --- a/vertx-core/src/test/java/io/vertx/tests/addressresolver/ResolvingHttpClientTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/addressresolver/ResolvingHttpClientTest.java @@ -329,15 +329,16 @@ public void testStatistics() throws Exception { .withAddressResolver(resolver) .withLoadBalancer(lb) .build(); - awaitFuture(client.request(new RequestOptions().setServer(new FakeAddress("example.com"))).compose(req -> req - .send() - .expecting(HttpResponseExpectation.SC_OK) - .compose(HttpClientResponse::body) - )); + client.request(new RequestOptions().setServer(new FakeAddress("example.com"))) + .compose(req -> req + .send() + .expecting(HttpResponseExpectation.SC_OK) + .compose(HttpClientResponse::body) + ).await(); FakeLoadBalancer.FakeLoadBalancerMetrics endpoint = (FakeLoadBalancer.FakeLoadBalancerMetrics) ((EndpointServer) lb.endpoints().get(0)).metrics(); FakeLoadBalancer.FakeMetric metric = endpoint.metrics2().get(0); assertTrue(metric.requestEnd() - metric.requestBegin() >= 0); - assertTrue(metric.responseBegin() - metric.requestEnd() > 500); + assertTrue(metric.responseBegin() - metric.requestEnd() >= 500); assertTrue(metric.responseEnd() - metric.responseBegin() >= 0); } diff --git a/vertx-core/src/test/java/io/vertx/tests/datagram/DatagramTest.java b/vertx-core/src/test/java/io/vertx/tests/datagram/DatagramTest.java index a9d965823b4..181069ce249 100644 --- a/vertx-core/src/test/java/io/vertx/tests/datagram/DatagramTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/datagram/DatagramTest.java @@ -21,6 +21,7 @@ import io.vertx.core.json.JsonObject; import io.vertx.core.net.NetworkOptions; import io.vertx.core.streams.WriteStream; +import io.vertx.core.transport.Transport; import io.vertx.test.core.TestUtils; import io.vertx.test.core.VertxTestBase; import io.vertx.test.netty.TestLoggerFactory; @@ -251,7 +252,7 @@ public void testSendAfterCloseFails() { @Test public void testBroadcast() { - if (USE_NATIVE_TRANSPORT) { + if (TRANSPORT != Transport.NIO) { return; } peer1 = vertx.createDatagramSocket(new DatagramSocketOptions().setBroadcast(true)); @@ -318,7 +319,7 @@ private void testMulticastJoinLeave(String bindAddress, DatagramSocketOptions options2, BiConsumer>> join, BiConsumer>> leave) { - if (USE_NATIVE_TRANSPORT) { + if (TRANSPORT != Transport.NIO) { return; } Buffer buffer = Buffer.buffer("HELLO"); diff --git a/vertx-core/src/test/java/io/vertx/tests/deployment/VirtualThreadDeploymentTest.java b/vertx-core/src/test/java/io/vertx/tests/deployment/VirtualThreadDeploymentTest.java index ab61d4a91e3..525e02d4016 100644 --- a/vertx-core/src/test/java/io/vertx/tests/deployment/VirtualThreadDeploymentTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/deployment/VirtualThreadDeploymentTest.java @@ -76,6 +76,7 @@ public void start() { @Test public void testExecuteBlocking() { Assume.assumeTrue(isVirtualThreadAvailable()); + Promise p = Promise.promise(); vertx.deployVerticle(new AbstractVerticle() { @Override public void start() { @@ -83,12 +84,17 @@ public void start() { assertTrue(isVirtual(Thread.currentThread())); return Thread.currentThread().getName(); }); - String res = fut.await(); + String res; + try { + res = fut.await(); + } catch (Exception e) { + p.fail(e); + return; + } assertNotSame(Thread.currentThread().getName(), res); - testComplete(); + p.complete(); } - }, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD)); - await(); + }, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD)).await(); } @Test diff --git a/vertx-core/src/test/java/io/vertx/tests/http/Http1xProxyTest.java b/vertx-core/src/test/java/io/vertx/tests/http/Http1xProxyTest.java index 9b3add0f99f..cc2867014d6 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/Http1xProxyTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/Http1xProxyTest.java @@ -309,7 +309,7 @@ public void testHttpProxyPooling() throws Exception { .setHost("localhost") .setPort(proxy2.port()); List res = testPooling(req1, req2, proxy1, proxy2); - assertEquals(Arrays.asList(proxy1.lastLocalAddress(), proxy2.lastLocalAddress()), res); + assertEquals(Set.of(proxy1.lastLocalAddress(), proxy2.lastLocalAddress()), new HashSet<>(res)); } @Test @@ -320,7 +320,7 @@ public void testHttpProxyPooling2() throws Exception { .setHost("localhost") .setPort(proxy.port()); List res = testPooling(req, req, proxy); - assertEquals(proxy.localAddresses(), res); + assertEquals(new HashSet<>(proxy.localAddresses()), new HashSet<>(res)); } @Test @@ -359,7 +359,7 @@ public void testHttpProxyAuthPooling2() throws Exception { .setPort(proxy.port()); List res = testPooling(req1, req2, proxy); assertEquals(2, proxy.localAddresses().size()); - assertEquals(proxy.localAddresses(), res); + assertEquals(new HashSet<>(proxy.localAddresses()), new HashSet<>(res)); } @Test @@ -375,7 +375,7 @@ public void testSocksProxyPooling1() throws Exception { .setHost("localhost") .setPort(proxy2.port()); List res = testPooling(req1, req2, proxy1, proxy2); - assertEquals(Arrays.asList(proxy1.lastLocalAddress(), proxy2.lastLocalAddress()), res); + assertEquals(Set.of(proxy1.lastLocalAddress(), proxy2.lastLocalAddress()), new HashSet<>(res)); } @Test @@ -386,7 +386,7 @@ public void testSocksProxyPooling2() throws Exception { .setHost("localhost") .setPort(proxy.port()); List res = testPooling(req, req, proxy); - assertEquals(proxy.localAddresses(), res); + assertEquals(new HashSet<>(proxy.localAddresses()), new HashSet<>(res)); } @Test @@ -405,7 +405,7 @@ public void testSocksProxyAuthPooling1() throws Exception { .setHost("localhost") .setPort(proxy.port()); List res = testPooling(req1, req2, proxy); - assertEquals(proxy.localAddresses(), res); + assertEquals(new HashSet<>(proxy.localAddresses()), new HashSet<>(res)); } @Test @@ -425,7 +425,7 @@ public void testSocksProxyAuthPooling2() throws Exception { .setPort(proxy.port()); List res = testPooling(req1, req2, proxy); assertEquals(2, proxy.localAddresses().size()); - assertEquals(proxy.localAddresses(), res); + assertEquals(new HashSet<>(proxy.localAddresses()), new HashSet<>(res)); } public List testPooling(ProxyOptions request1, ProxyOptions request2, TestProxyBase... proxies) throws Exception { @@ -434,7 +434,7 @@ public List testPooling(ProxyOptions request1, ProxyOptions request2, Te } client.close(); - client = vertx.createHttpClient(new HttpClientOptions().setKeepAlive(true), new PoolOptions().setHttp2MaxSize(2)); + client = vertx.createHttpClient(new HttpClientOptions().setKeepAlive(true), new PoolOptions().setHttp1MaxSize(2)); CompletableFuture> ret = new CompletableFuture<>(); @@ -448,24 +448,34 @@ public List testPooling(ProxyOptions request1, ProxyOptions request2, Te request.response().end("" + addr); }); } - }).listen().onComplete(onSuccess(s -> { - RequestOptions baseOptions = new RequestOptions() - .setHost(DEFAULT_HTTP_HOST) - .setPort(DEFAULT_HTTP_PORT) - .setURI("/"); - List responses = new ArrayList<>(); - for (int i = 0;i < 2;i++) { - client.request(new RequestOptions(baseOptions).setProxyOptions(i == 0 ? request1 : request2)) - .compose(HttpClientRequest::send) - .compose(HttpClientResponse::body) - .onComplete(onSuccess(res2 -> { - responses.add(res2.toString()); - if (responses.size() == 2) { - ret.complete(responses); - } - })); - } - })); + }).listen() + .await(); + + RequestOptions baseOptions = new RequestOptions() + .setHost(DEFAULT_HTTP_HOST) + .setPort(DEFAULT_HTTP_PORT) + .setURI("/"); + List> clientRequests = new ArrayList<>(); + for (int i = 0;i < 2;i++) { + Future request = client + .request(new RequestOptions(baseOptions).setProxyOptions(i == 0 ? request1 : request2)); + clientRequests.add(request); + // Avoid races with the proxy username provider + request.await(); + } + List responses = new ArrayList<>(); + for (int i = 0;i < 2;i++) { + clientRequests.get(i) + .compose(HttpClientRequest::send) + .expecting(HttpResponseExpectation.SC_OK) + .compose(HttpClientResponse::body) + .onComplete(onSuccess(res2 -> { + responses.add(res2.toString()); + if (responses.size() == 2) { + ret.complete(responses); + } + })); + } return ret.get(40, TimeUnit.SECONDS); } finally { diff --git a/vertx-core/src/test/java/io/vertx/tests/http/Http2Test.java b/vertx-core/src/test/java/io/vertx/tests/http/Http2Test.java index bc339f84655..39f5b332502 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/Http2Test.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/Http2Test.java @@ -901,18 +901,12 @@ public void testConnectionCloseEvictsConnectionFromThePoolBeforeStreamsAreClosed .compose(HttpClientResponse::body)); f1.onComplete(onSuccess(v -> { Future f2 = client.request(new RequestOptions(requestOptions).setURI("/2")) - .compose(req -> { - System.out.println(req.connection()); - return req.send() - .compose(HttpClientResponse::body); - }); + .compose(req -> req.send() + .compose(HttpClientResponse::body)); f2.onComplete(onFailure(v2 -> { Future f3 = client.request(new RequestOptions(requestOptions).setURI("/3")) - .compose(req -> { - System.out.println(req.connection()); - return req.send() - .compose(HttpClientResponse::body); - }); + .compose(req -> req.send() + .compose(HttpClientResponse::body)); f3.onComplete(onSuccess(vvv -> { testComplete(); })); diff --git a/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java b/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java index b932b6b19b3..ec45aeb927c 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java @@ -69,6 +69,7 @@ import io.vertx.core.net.NetServer; import io.vertx.core.net.NetSocket; import io.vertx.core.net.SocketAddress; +import io.vertx.core.transport.Transport; import io.vertx.test.core.CheckingSender; import io.vertx.test.core.TestUtils; import io.vertx.test.core.VertxTestBase; @@ -119,6 +120,7 @@ import static io.vertx.test.http.HttpTestBase.DEFAULT_HTTP_HOST; import static io.vertx.test.http.HttpTestBase.DEFAULT_HTTP_HOST_AND_PORT; import static io.vertx.test.http.HttpTestBase.DEFAULT_HTTP_PORT; +import static org.junit.Assume.assumeTrue; /** * @author Tim Fox @@ -580,7 +582,6 @@ public void testHandleWSManually() throws Exception { @Test public void testSharedServersRoundRobin() throws Exception { - int numServers = VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE / 2- 1; int numConnections = numServers * 100; diff --git a/vertx-core/src/test/java/io/vertx/tests/net/ConnectionBaseTest.java b/vertx-core/src/test/java/io/vertx/tests/net/ConnectionBaseTest.java index f23f6c24c67..14dec4106f6 100644 --- a/vertx-core/src/test/java/io/vertx/tests/net/ConnectionBaseTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/net/ConnectionBaseTest.java @@ -25,6 +25,7 @@ import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.core.net.impl.VertxConnection; import io.vertx.core.net.impl.VertxHandler; +import io.vertx.core.transport.Transport; import io.vertx.test.core.TestUtils; import io.vertx.test.core.VertxTestBase; import org.junit.Ignore; @@ -41,6 +42,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; +import static org.junit.Assume.assumeTrue; + public class ConnectionBaseTest extends VertxTestBase { private NetClient client; @@ -287,23 +290,26 @@ public void testDrainReentrancy() throws Exception { chctx.pipeline().addBefore("handler", "myhandler", new ChannelDuplexHandler() { int reentrant; @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { assertEquals(0, reentrant++); - switch (msg.toString()) { - case "msg1": - // Fill outbound buffer - assertTrue(ctx.channel().isWritable()); - ctx.write(BufferInternal.buffer(TestUtils.randomAlphaString((int)ctx.channel().bytesBeforeUnwritable())).getByteBuf()); - assertFalse(ctx.channel().isWritable()); - // Flush to trigger writability change - ctx.flush(); - assertTrue(ctx.channel().isWritable()); - break; - case "msg2": - testComplete(); - break; + try { + switch (msg.toString()) { + case "msg1": + // Fill outbound buffer + assertTrue(ctx.channel().isWritable()); + ChannelFuture f = ctx.write(BufferInternal.buffer(TestUtils.randomAlphaString((int) ctx.channel().bytesBeforeUnwritable())).getByteBuf()); + assertFalse(ctx.channel().isWritable()); + // Flush to trigger writability change + ctx.flush(); + assertEquals(TRANSPORT != Transport.IO_URING, ctx.channel().isWritable()); + break; + case "msg2": + testComplete(); + break; + } + } finally { + reentrant--; } - reentrant--; } }); diff --git a/vertx-core/src/test/java/io/vertx/tests/net/NetBandwidthLimitingTest.java b/vertx-core/src/test/java/io/vertx/tests/net/NetBandwidthLimitingTest.java index c47f12b35e3..445042b4758 100644 --- a/vertx-core/src/test/java/io/vertx/tests/net/NetBandwidthLimitingTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/net/NetBandwidthLimitingTest.java @@ -54,7 +54,7 @@ public class NetBandwidthLimitingTest extends VertxTestBase { public void setUp() throws Exception { super.setUp(); if (USE_DOMAIN_SOCKETS) { - assertTrue("Native transport not enabled", USE_NATIVE_TRANSPORT); + assertTrue("Native transport not enabled", TRANSPORT.implementation().supportsDomainSockets()); File tmp = TestUtils.tmpFile(".sock"); testAddress = SocketAddress.domainSocketAddress(tmp.getAbsolutePath()); } else { diff --git a/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java b/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java index 433b57e794c..fd077d58cd6 100755 --- a/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java @@ -111,7 +111,7 @@ public class NetTest extends VertxTestBase { public void setUp() throws Exception { super.setUp(); if (USE_DOMAIN_SOCKETS) { - assertTrue("Native transport not enabled", USE_NATIVE_TRANSPORT); + assertTrue("Native transport not enabled", TRANSPORT.implementation().supportsDomainSockets()); tmp = TestUtils.tmpFile(".sock"); testAddress = SocketAddress.domainSocketAddress(tmp.getAbsolutePath()); } else { @@ -2100,7 +2100,7 @@ public void testSharedServersRoundRobinButFirstStartAndStopServer() throws Excep @Test public void testClosingVertxCloseSharedServers() throws Exception { int numServers = 2; - Vertx vertx = Vertx.vertx(getOptions()); + Vertx vertx = createVertx(getOptions()); List servers = new ArrayList<>(); for (int i = 0;i < numServers;i++) { NetServer server = vertx.createNetServer().connectHandler(so -> { diff --git a/vertx-core/src/test/java/io/vertx/tests/tls/HttpTLSTest.java b/vertx-core/src/test/java/io/vertx/tests/tls/HttpTLSTest.java index 20e41f9b26a..5e3e7cf9835 100755 --- a/vertx-core/src/test/java/io/vertx/tests/tls/HttpTLSTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/tls/HttpTLSTest.java @@ -12,6 +12,7 @@ package io.vertx.tests.tls; import static org.hamcrest.core.StringEndsWith.endsWith; +import static org.junit.Assume.assumeTrue; import java.io.*; import java.lang.reflect.UndeclaredThrowableException; @@ -39,6 +40,8 @@ import io.vertx.core.impl.VertxThread; import io.vertx.core.net.*; import io.vertx.core.net.impl.KeyStoreHelper; +import io.vertx.core.transport.Transport; +import io.vertx.test.core.Repeat; import io.vertx.test.http.HttpTestBase; import org.junit.Assume; import org.junit.Ignore; @@ -319,7 +322,7 @@ public void testTLSMatchingProtocolVersions() throws Exception { @Test // Provide an host name with a trailing dot public void testTLSTrailingDotHost() throws Exception { - Assume.assumeTrue(PlatformDependent.javaVersion() < 9); + assumeTrue(PlatformDependent.javaVersion() < 9); // We just need a vanilla cert for this test SelfSignedCertificate cert = SelfSignedCertificate.create("host2.com"); TLSTest test = testTLS(Cert.NONE, cert::trustOptions, cert::keyCertOptions, Trust.NONE) @@ -1283,9 +1286,7 @@ TLSTest run(boolean shouldPass) { server.connectionHandler(conn -> complete()); AtomicInteger count = new AtomicInteger(); server.exceptionHandler(err -> { - if (shouldPass) { - HttpTLSTest.this.fail(err); - } else { + if (!shouldPass) { if (count.incrementAndGet() == 1) { complete(); } diff --git a/vertx-core/src/test/java/io/vertx/tests/vertx/GlobalEventExecutorNotificationTest.java b/vertx-core/src/test/java/io/vertx/tests/vertx/GlobalEventExecutorNotificationTest.java index d20eb66dbee..a1a5b500e56 100644 --- a/vertx-core/src/test/java/io/vertx/tests/vertx/GlobalEventExecutorNotificationTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/vertx/GlobalEventExecutorNotificationTest.java @@ -19,7 +19,7 @@ import io.vertx.core.net.NetClientOptions; import io.vertx.core.net.ProxyOptions; import io.vertx.core.net.ProxyType; -import io.vertx.core.impl.transports.JDKTransport; +import io.vertx.core.impl.transports.NioTransport; import io.vertx.test.core.AsyncTestBase; import org.junit.After; import org.junit.Test; @@ -55,7 +55,7 @@ public void testProxyConnectError() { private void testConnectErrorNotifiesOnEventLoop(NetClientOptions options) { RuntimeException cause = new RuntimeException(); - vertx = VertxBootstrap.create().transport(new JDKTransport() { + vertx = VertxBootstrap.create().transport(new NioTransport() { @Override public ChannelFactory channelFactory(boolean domainSocket) { return (ChannelFactory) () -> { @@ -78,7 +78,7 @@ public ChannelFactory channelFactory(boolean domainSocket) { @Test public void testNetBindError() { RuntimeException cause = new RuntimeException(); - vertx = VertxBootstrap.create().transport(new JDKTransport() { + vertx = VertxBootstrap.create().transport(new NioTransport() { @Override public ChannelFactory serverChannelFactory(boolean domainSocket) { return (ChannelFactory) () -> { @@ -98,7 +98,7 @@ public ChannelFactory serverChannelFactory(boolean doma @Test public void testHttpBindError() { RuntimeException cause = new RuntimeException(); - vertx = VertxBootstrap.create().transport(new JDKTransport() { + vertx = VertxBootstrap.create().transport(new NioTransport() { @Override public ChannelFactory serverChannelFactory(boolean domainSocket) { return (ChannelFactory) () -> { diff --git a/vertx-core/src/test/java/io/vertx/tests/vertx/VertxBootstrapTest.java b/vertx-core/src/test/java/io/vertx/tests/vertx/VertxBootstrapTest.java index 5f071381574..c257fec8e66 100644 --- a/vertx-core/src/test/java/io/vertx/tests/vertx/VertxBootstrapTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/vertx/VertxBootstrapTest.java @@ -16,7 +16,7 @@ import io.vertx.core.internal.VertxBootstrap; import io.vertx.core.internal.VertxInternal; import io.vertx.core.metrics.MetricsOptions; -import io.vertx.core.impl.transports.JDKTransport; +import io.vertx.core.impl.transports.NioTransport; import io.vertx.core.spi.transport.Transport; import io.vertx.core.spi.ExecutorServiceFactory; import io.vertx.core.spi.VertxMetricsFactory; @@ -155,8 +155,8 @@ public void testFactoryClusterManagerOverridesMetaInf() throws Exception { @Test public void testFactoryTransportOverridesDefault() { VertxBootstrap factory = VertxBootstrap.create(); - // JDK transport - Transport override = new JDKTransport() { + // NIO transport + Transport override = new NioTransport() { }; factory.transport(override); factory.init(); diff --git a/vertx-core/src/test/java/io/vertx/tests/vertx/VertxStartFailureTest.java b/vertx-core/src/test/java/io/vertx/tests/vertx/VertxStartFailureTest.java index 0b098ae1f52..59d0a5b58c3 100644 --- a/vertx-core/src/test/java/io/vertx/tests/vertx/VertxStartFailureTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/vertx/VertxStartFailureTest.java @@ -16,7 +16,7 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; -import io.vertx.core.impl.transports.JDKTransport; +import io.vertx.core.impl.transports.NioTransport; import io.vertx.core.internal.VertxBootstrap; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.transport.Transport; @@ -107,7 +107,7 @@ public void nodeListener(NodeListener listener) { private Throwable failStart(VertxOptions options, ClusterManager clusterManager) throws Exception { List loops = new ArrayList<>(); CountDownLatch latch = new CountDownLatch(1); - Transport transport = new JDKTransport() { + Transport transport = new NioTransport() { @Override public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ioRatio) { EventLoopGroup eventLoop = super.eventLoopGroup(type, nThreads, threadFactory, ioRatio);