From d542a61cb1d864a858a63db7225286f05b5f32e1 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Wed, 19 Apr 2023 14:34:16 +0200 Subject: [PATCH] Fix memory leak with one-shot connection factories (#1311) Fixes #1302 After creating an unpooled connection, add it as a close hook to the creating context. Also, in this case, make sure the connection factory is closed after usage. Signed-off-by: Thomas Segismont --- .../db2client/impl/DB2ConnectionImpl.java | 3 +- .../mssqlclient/impl/MSSQLConnectionImpl.java | 5 +- .../mysqlclient/impl/MySQLConnectionImpl.java | 5 +- .../impl/OracleConnectionImpl.java | 5 +- .../vertx/pgclient/impl/PgConnectionImpl.java | 12 +-- .../vertx/pgclient/PgConnectionTestBase.java | 27 +----- .../pgclient/PgPooledConnectionTest.java | 6 +- .../sqlclient/impl/SqlConnectionBase.java | 31 ++++++- .../sqlclient/tck/ConnectionTestBase.java | 83 +++++++++++++++++-- 9 files changed, 116 insertions(+), 61 deletions(-) diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionImpl.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionImpl.java index 4d67a229b..94def3b58 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionImpl.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionImpl.java @@ -36,8 +36,7 @@ public static Future connect(Vertx vertx, DB2ConnectOptions optio } catch (Exception e) { return ctx.failedFuture(e); } - ctx.addCloseHook(client); - return (Future) client.connect(ctx, options); + return prepareForClose(ctx, client.connect(ctx, options)).map(DB2Connection::cast); } public DB2ConnectionImpl(ContextInternal context, ConnectionFactory factory, Connection conn) { diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionImpl.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionImpl.java index 4fac45cdc..6853b5740 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionImpl.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License 2.0 which is available at @@ -35,8 +35,7 @@ public MSSQLConnectionImpl(ContextInternal context, ConnectionFactory factory, C public static Future connect(Vertx vertx, MSSQLConnectOptions options) { ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext(); MSSQLConnectionFactory client = new MSSQLConnectionFactory(ctx.owner()); - ctx.addCloseHook(client); - return (Future)client.connect(ctx, options); + return prepareForClose(ctx, client.connect(ctx, options)).map(MSSQLConnection::cast); } @Override diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionImpl.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionImpl.java index 94d02ba45..d05c79866 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionImpl.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2020 Contributors to the Eclipse Foundation + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License 2.0 which is available at @@ -36,8 +36,7 @@ public static Future connect(ContextInternal ctx, MySQLConnectO } catch (Exception e) { return ctx.failedFuture(e); } - ctx.addCloseHook(client); - return (Future)client.connect(ctx, options); + return prepareForClose(ctx, client.connect(ctx, options)).map(MySQLConnection::cast); } public MySQLConnectionImpl(ContextInternal context, ConnectionFactory factory, Connection conn) { diff --git a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionImpl.java b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionImpl.java index 779dd9de0..a20ab8063 100644 --- a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionImpl.java +++ b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2022 Contributors to the Eclipse Foundation + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License 2.0 which is available at @@ -30,7 +30,6 @@ public OracleConnectionImpl(ContextInternal context, ConnectionFactory factory, public static Future connect(Vertx vertx, OracleConnectOptions options) { ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext(); OracleConnectionFactory client = new OracleConnectionFactory(ctx.owner()); - ctx.addCloseHook(client); - return (Future) client.connect(ctx, options); + return prepareForClose(ctx, client.connect(ctx, options)).map(OracleConnection::cast); } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index 3553f1c71..12a029162 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -16,24 +16,19 @@ */ package io.vertx.pgclient.impl; +import io.vertx.core.*; import io.vertx.core.impl.ContextInternal; -import io.vertx.core.spi.metrics.ClientMetrics; import io.vertx.pgclient.PgConnectOptions; import io.vertx.pgclient.PgConnection; import io.vertx.pgclient.PgNotice; import io.vertx.pgclient.PgNotification; import io.vertx.pgclient.impl.codec.NoticeResponse; +import io.vertx.pgclient.impl.codec.TxFailedEvent; import io.vertx.pgclient.spi.PgDriver; import io.vertx.sqlclient.impl.Connection; import io.vertx.sqlclient.impl.Notification; import io.vertx.sqlclient.impl.SocketConnectionBase; import io.vertx.sqlclient.impl.SqlConnectionBase; -import io.vertx.core.AsyncResult; -import io.vertx.core.Context; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.pgclient.impl.codec.TxFailedEvent; import java.util.function.Supplier; @@ -46,8 +41,7 @@ public static Future connect(ContextInternal context, Supplier notificationHandler; diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionTestBase.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionTestBase.java index f34d65a43..2eb37191c 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionTestBase.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgConnectionTestBase.java @@ -17,18 +17,11 @@ package io.vertx.pgclient; -import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; -import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; -import io.vertx.sqlclient.ProxyServer; -import io.vertx.sqlclient.Row; -import io.vertx.sqlclient.RowSet; -import io.vertx.sqlclient.SqlConnection; -import io.vertx.sqlclient.TransactionRollbackException; -import io.vertx.sqlclient.Tuple; +import io.vertx.sqlclient.*; import org.junit.Test; import java.util.ArrayList; @@ -319,24 +312,6 @@ public void testBatchInsertError(TestContext ctx) throws Exception { })); } - @Test - public void testCloseOnUndeploy(TestContext ctx) { - Async done = ctx.async(); - vertx.deployVerticle(new AbstractVerticle() { - @Override - public void start(Promise startPromise) throws Exception { - connector.accept(ctx.asyncAssertSuccess(conn -> { - conn.closeHandler(v -> { - done.complete(); - }); - startPromise.complete(); - })); - } - }).onComplete(ctx.asyncAssertSuccess(id -> { - vertx.undeploy(id); - })); - } - @Test public void testTransactionCommit(TestContext ctx) { testTransactionCommit(ctx, Runnable::run); diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPooledConnectionTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPooledConnectionTest.java index b21387a3c..f58536f19 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPooledConnectionTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPooledConnectionTest.java @@ -43,11 +43,7 @@ public void tearDown(TestContext ctx) { if (pool != null) { PgPool p = pool; pool = null; - try { - p.close(); - } catch (IllegalStateException e) { - // Might be already closed because of testCloseOnUndeploy - } + p.close(); } super.tearDown(ctx); } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionBase.java index 2387d25f5..e7423ec17 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionBase.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionBase.java @@ -17,15 +17,18 @@ package io.vertx.sqlclient.impl; +import io.vertx.core.*; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.future.PromiseInternal; import io.vertx.core.spi.metrics.ClientMetrics; import io.vertx.core.spi.tracing.VertxTracer; import io.vertx.sqlclient.PrepareOptions; import io.vertx.sqlclient.PreparedStatement; +import io.vertx.sqlclient.SqlConnection; import io.vertx.sqlclient.Transaction; -import io.vertx.sqlclient.impl.command.*; -import io.vertx.core.*; +import io.vertx.sqlclient.impl.command.CommandBase; +import io.vertx.sqlclient.impl.command.PrepareStatementCommand; +import io.vertx.sqlclient.impl.command.QueryCommandBase; import io.vertx.sqlclient.impl.pool.SqlConnectionPool; import io.vertx.sqlclient.impl.tracing.QueryReporter; import io.vertx.sqlclient.spi.ConnectionFactory; @@ -35,10 +38,11 @@ /** * @author Julien Viet */ -public class SqlConnectionBase> extends SqlClientBase implements SqlConnectionInternal { +public class SqlConnectionBase> extends SqlClientBase implements SqlConnectionInternal, Closeable { private volatile Handler exceptionHandler; private volatile Handler closeHandler; + private volatile boolean closeFactoryAfterUsage; protected TransactionImpl tx; protected final ContextInternal context; protected final ConnectionFactory factory; @@ -192,7 +196,16 @@ public Future close() { return promise.future(); } - private void close(Promise promise) { + @Override + public void close(Promise completion) { + doClose(completion); + if (closeFactoryAfterUsage) { + completion.future().onComplete(v -> factory.close(Promise.promise())); + context.removeCloseHook(this); + } + } + + private void doClose(Promise promise) { context.execute(promise, p -> { if (tx != null) { tx.rollback(ar -> conn.close(this, p)); @@ -202,4 +215,14 @@ private void close(Promise promise) { } }); } + + protected static Future prepareForClose(ContextInternal ctx, Future future) { + return future.andThen(ar -> { + if (ar.succeeded()) { + SqlConnectionBase base = (SqlConnectionBase) ar.result(); + base.closeFactoryAfterUsage = true; + ctx.addCloseHook(base); + } + }); + } } diff --git a/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/ConnectionTestBase.java b/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/ConnectionTestBase.java index 36351b0d7..05657154f 100644 --- a/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/ConnectionTestBase.java +++ b/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/ConnectionTestBase.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2020 Contributors to the Eclipse Foundation + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License 2.0 which is available at @@ -11,19 +11,24 @@ package io.vertx.sqlclient.tck; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; +import io.vertx.core.*; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.impl.SqlConnectionBase; +import io.vertx.sqlclient.spi.ConnectionFactory; import io.vertx.sqlclient.spi.DatabaseMetadata; - import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.util.Collections; +import java.util.Set; +import java.util.WeakHashMap; + +import static java.util.concurrent.TimeUnit.SECONDS; + public abstract class ConnectionTestBase { protected Vertx vertx; protected Connector connector; @@ -46,7 +51,73 @@ public void tearDown(TestContext ctx) { @Test public void testConnect(TestContext ctx) { - connect(ctx.asyncAssertSuccess(conn -> { + connect(ctx.asyncAssertSuccess()); + } + + @Test + public void testConnectNoLeak(TestContext ctx) throws Exception { + Set> connections = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>())); + Set factories = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>())); + Async async = ctx.async(100); + for (int i = 0; i < 100; i++) { + connect(ctx.asyncAssertSuccess(conn -> { + SqlConnectionBase base = (SqlConnectionBase) conn; + connections.add(base); + factories.add(base.factory()); + conn.close().onComplete(ctx.asyncAssertSuccess(v -> async.countDown())); + })); + } + async.awaitSuccess(); + for (int c = 0; c < 5; c++) { + System.gc(); + SECONDS.sleep(1); + } + ctx.assertEquals(0, connections.size()); + ctx.assertEquals(0, factories.size()); + } + + @Test + public void testConnectNoLeakInVerticle(TestContext ctx) throws Exception { + Set> connections = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>())); + Set factories = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>())); + Async async = ctx.async(100); + vertx.deployVerticle(new AbstractVerticle() { + @Override + public void start() throws Exception { + for (int i = 0; i < 100; i++) { + connect(ctx.asyncAssertSuccess(conn -> { + SqlConnectionBase base = (SqlConnectionBase) conn; + connections.add(base); + factories.add(base.factory()); + conn.close().onComplete(ctx.asyncAssertSuccess(v -> async.countDown())); + })); + } + } + }); + async.awaitSuccess(); + for (int c = 0; c < 5; c++) { + System.gc(); + SECONDS.sleep(1); + } + ctx.assertEquals(0, connections.size()); + ctx.assertEquals(0, factories.size()); + } + + @Test + public void testCloseOnUndeploy(TestContext ctx) { + Async done = ctx.async(); + vertx.deployVerticle(new AbstractVerticle() { + @Override + public void start(Promise startPromise) throws Exception { + connect(ctx.asyncAssertSuccess(conn -> { + conn.closeHandler(v -> { + done.complete(); + }); + startPromise.complete(); + })); + } + }).onComplete(ctx.asyncAssertSuccess(id -> { + vertx.undeploy(id); })); }