Skip to content

Commit

Permalink
Fix memory leak with one-shot connection factories (#1311)
Browse files Browse the repository at this point in the history
Fixes #1302

After creating an unpooled connection, add it as a close hook to the creating context.
Also, in this case, make sure the connection factory is closed after usage.

Signed-off-by: Thomas Segismont <[email protected]>
  • Loading branch information
tsegismont committed Apr 20, 2023
1 parent 769a6d2 commit d542a61
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public static Future<DB2Connection> connect(Vertx vertx, DB2ConnectOptions optio
} catch (Exception e) {
return ctx.failedFuture(e);
}
ctx.addCloseHook(client);
return (Future) client.connect(ctx, options);
return prepareForClose(ctx, client.connect(ctx, options)).map(DB2Connection::cast);
}

public DB2ConnectionImpl(ContextInternal context, ConnectionFactory factory, Connection conn) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
Expand Down Expand Up @@ -35,8 +35,7 @@ public MSSQLConnectionImpl(ContextInternal context, ConnectionFactory factory, C
public static Future<MSSQLConnection> connect(Vertx vertx, MSSQLConnectOptions options) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
MSSQLConnectionFactory client = new MSSQLConnectionFactory(ctx.owner());
ctx.addCloseHook(client);
return (Future)client.connect(ctx, options);
return prepareForClose(ctx, client.connect(ctx, options)).map(MSSQLConnection::cast);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
Expand Down Expand Up @@ -36,8 +36,7 @@ public static Future<MySQLConnection> connect(ContextInternal ctx, MySQLConnectO
} catch (Exception e) {
return ctx.failedFuture(e);
}
ctx.addCloseHook(client);
return (Future)client.connect(ctx, options);
return prepareForClose(ctx, client.connect(ctx, options)).map(MySQLConnection::cast);
}

public MySQLConnectionImpl(ContextInternal context, ConnectionFactory factory, Connection conn) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2022 Contributors to the Eclipse Foundation
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
Expand Down Expand Up @@ -30,7 +30,6 @@ public OracleConnectionImpl(ContextInternal context, ConnectionFactory factory,
public static Future<OracleConnection> connect(Vertx vertx, OracleConnectOptions options) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
OracleConnectionFactory client = new OracleConnectionFactory(ctx.owner());
ctx.addCloseHook(client);
return (Future) client.connect(ctx, options);
return prepareForClose(ctx, client.connect(ctx, options)).map(OracleConnection::cast);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,19 @@
*/
package io.vertx.pgclient.impl;

import io.vertx.core.*;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.pgclient.PgNotice;
import io.vertx.pgclient.PgNotification;
import io.vertx.pgclient.impl.codec.NoticeResponse;
import io.vertx.pgclient.impl.codec.TxFailedEvent;
import io.vertx.pgclient.spi.PgDriver;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.Notification;
import io.vertx.sqlclient.impl.SocketConnectionBase;
import io.vertx.sqlclient.impl.SqlConnectionBase;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.pgclient.impl.codec.TxFailedEvent;

import java.util.function.Supplier;

Expand All @@ -46,8 +41,7 @@ public static Future<PgConnection> connect(ContextInternal context, Supplier<PgC
} catch (Exception e) {
return context.failedFuture(e);
}
context.addCloseHook(client);
return (Future) client.connect(context, options.get());
return prepareForClose(context, client.connect(context, options.get())).map(PgConnection::cast);
}

private volatile Handler<PgNotification> notificationHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,11 @@

package io.vertx.pgclient;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.sqlclient.ProxyServer;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.TransactionRollbackException;
import io.vertx.sqlclient.Tuple;
import io.vertx.sqlclient.*;
import org.junit.Test;

import java.util.ArrayList;
Expand Down Expand Up @@ -319,24 +312,6 @@ public void testBatchInsertError(TestContext ctx) throws Exception {
}));
}

@Test
public void testCloseOnUndeploy(TestContext ctx) {
Async done = ctx.async();
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start(Promise<Void> startPromise) throws Exception {
connector.accept(ctx.asyncAssertSuccess(conn -> {
conn.closeHandler(v -> {
done.complete();
});
startPromise.complete();
}));
}
}).onComplete(ctx.asyncAssertSuccess(id -> {
vertx.undeploy(id);
}));
}

@Test
public void testTransactionCommit(TestContext ctx) {
testTransactionCommit(ctx, Runnable::run);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@ public void tearDown(TestContext ctx) {
if (pool != null) {
PgPool p = pool;
pool = null;
try {
p.close();
} catch (IllegalStateException e) {
// Might be already closed because of testCloseOnUndeploy
}
p.close();
}
super.tearDown(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

package io.vertx.sqlclient.impl;

import io.vertx.core.*;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.sqlclient.PrepareOptions;
import io.vertx.sqlclient.PreparedStatement;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.Transaction;
import io.vertx.sqlclient.impl.command.*;
import io.vertx.core.*;
import io.vertx.sqlclient.impl.command.CommandBase;
import io.vertx.sqlclient.impl.command.PrepareStatementCommand;
import io.vertx.sqlclient.impl.command.QueryCommandBase;
import io.vertx.sqlclient.impl.pool.SqlConnectionPool;
import io.vertx.sqlclient.impl.tracing.QueryReporter;
import io.vertx.sqlclient.spi.ConnectionFactory;
Expand All @@ -35,10 +38,11 @@
/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public class SqlConnectionBase<C extends SqlConnectionBase<C>> extends SqlClientBase implements SqlConnectionInternal {
public class SqlConnectionBase<C extends SqlConnectionBase<C>> extends SqlClientBase implements SqlConnectionInternal, Closeable {

private volatile Handler<Throwable> exceptionHandler;
private volatile Handler<Void> closeHandler;
private volatile boolean closeFactoryAfterUsage;
protected TransactionImpl tx;
protected final ContextInternal context;
protected final ConnectionFactory factory;
Expand Down Expand Up @@ -192,7 +196,16 @@ public Future<Void> close() {
return promise.future();
}

private void close(Promise<Void> promise) {
@Override
public void close(Promise<Void> completion) {
doClose(completion);
if (closeFactoryAfterUsage) {
completion.future().onComplete(v -> factory.close(Promise.promise()));
context.removeCloseHook(this);
}
}

private void doClose(Promise<Void> promise) {
context.execute(promise, p -> {
if (tx != null) {
tx.rollback(ar -> conn.close(this, p));
Expand All @@ -202,4 +215,14 @@ private void close(Promise<Void> promise) {
}
});
}

protected static Future<SqlConnection> prepareForClose(ContextInternal ctx, Future<SqlConnection> future) {
return future.andThen(ar -> {
if (ar.succeeded()) {
SqlConnectionBase<?> base = (SqlConnectionBase<?>) ar.result();
base.closeFactoryAfterUsage = true;
ctx.addCloseHook(base);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
Expand All @@ -11,19 +11,24 @@

package io.vertx.sqlclient.tck;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.*;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.impl.SqlConnectionBase;
import io.vertx.sqlclient.spi.ConnectionFactory;
import io.vertx.sqlclient.spi.DatabaseMetadata;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;

import static java.util.concurrent.TimeUnit.SECONDS;

public abstract class ConnectionTestBase {
protected Vertx vertx;
protected Connector<SqlConnection> connector;
Expand All @@ -46,7 +51,73 @@ public void tearDown(TestContext ctx) {

@Test
public void testConnect(TestContext ctx) {
connect(ctx.asyncAssertSuccess(conn -> {
connect(ctx.asyncAssertSuccess());
}

@Test
public void testConnectNoLeak(TestContext ctx) throws Exception {
Set<SqlConnectionBase<?>> connections = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
Set<ConnectionFactory> factories = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
Async async = ctx.async(100);
for (int i = 0; i < 100; i++) {
connect(ctx.asyncAssertSuccess(conn -> {
SqlConnectionBase<?> base = (SqlConnectionBase<?>) conn;
connections.add(base);
factories.add(base.factory());
conn.close().onComplete(ctx.asyncAssertSuccess(v -> async.countDown()));
}));
}
async.awaitSuccess();
for (int c = 0; c < 5; c++) {
System.gc();
SECONDS.sleep(1);
}
ctx.assertEquals(0, connections.size());
ctx.assertEquals(0, factories.size());
}

@Test
public void testConnectNoLeakInVerticle(TestContext ctx) throws Exception {
Set<SqlConnectionBase<?>> connections = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
Set<ConnectionFactory> factories = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
Async async = ctx.async(100);
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() throws Exception {
for (int i = 0; i < 100; i++) {
connect(ctx.asyncAssertSuccess(conn -> {
SqlConnectionBase<?> base = (SqlConnectionBase<?>) conn;
connections.add(base);
factories.add(base.factory());
conn.close().onComplete(ctx.asyncAssertSuccess(v -> async.countDown()));
}));
}
}
});
async.awaitSuccess();
for (int c = 0; c < 5; c++) {
System.gc();
SECONDS.sleep(1);
}
ctx.assertEquals(0, connections.size());
ctx.assertEquals(0, factories.size());
}

@Test
public void testCloseOnUndeploy(TestContext ctx) {
Async done = ctx.async();
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start(Promise<Void> startPromise) throws Exception {
connect(ctx.asyncAssertSuccess(conn -> {
conn.closeHandler(v -> {
done.complete();
});
startPromise.complete();
}));
}
}).onComplete(ctx.asyncAssertSuccess(id -> {
vertx.undeploy(id);
}));
}

Expand Down

0 comments on commit d542a61

Please sign in to comment.