Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support connection maximum lifetime to recycle connections regularly #1298

Merged
merged 2 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public void testPipeliningDistribution(TestContext ctx) {
public void testPoolIdleTimeout(TestContext ctx) {
ProxyServer proxy = ProxyServer.create(vertx, options.getPort(), options.getHost());
AtomicReference<ProxyServer.Connection> proxyConn = new AtomicReference<>();
int pooleCleanerPeriod = 100;
int poolCleanerPeriod = 100;
int idleTimeout = 3000;
Async latch = ctx.async();
proxy.proxyHandler(conn -> {
Expand All @@ -393,8 +393,8 @@ public void testPoolIdleTimeout(TestContext ctx) {
conn.clientCloseHandler(v -> {
long lifetime = System.currentTimeMillis() - now;
int delta = 500;
int lowerBound = idleTimeout - pooleCleanerPeriod - delta;
int upperBound = idleTimeout + pooleCleanerPeriod + delta;
int lowerBound = idleTimeout - poolCleanerPeriod - delta;
int upperBound = idleTimeout + poolCleanerPeriod + delta;
ctx.assertTrue(lifetime >= lowerBound, "Was expecting connection to be closed in more than " + lowerBound + ": " + lifetime);
ctx.assertTrue(lifetime <= upperBound, "Was expecting connection to be closed in less than " + upperBound + ": "+ lifetime);
latch.complete();
Expand All @@ -408,7 +408,8 @@ public void testPoolIdleTimeout(TestContext ctx) {
listenLatch.awaitSuccess(20_000);

poolOptions
.setPoolCleanerPeriod(pooleCleanerPeriod)
.setPoolCleanerPeriod(poolCleanerPeriod)
.setMaxLifetime(0)
.setIdleTimeout(idleTimeout)
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS);
options.setPort(8080);
Expand All @@ -422,6 +423,49 @@ public void testPoolIdleTimeout(TestContext ctx) {
.onComplete(ctx.asyncAssertSuccess());
}

@Test
public void testPoolMaxLifetime(TestContext ctx) {
ProxyServer proxy = ProxyServer.create(vertx, options.getPort(), options.getHost());
AtomicReference<ProxyServer.Connection> proxyConn = new AtomicReference<>();
int poolCleanerPeriod = 100;
int maxLifetime = 3000;
Async latch = ctx.async();
proxy.proxyHandler(conn -> {
proxyConn.set(conn);
long now = System.currentTimeMillis();
conn.clientCloseHandler(v -> {
long lifetime = System.currentTimeMillis() - now;
int delta = 500;
int lowerBound = maxLifetime - poolCleanerPeriod - delta;
int upperBound = maxLifetime + poolCleanerPeriod + delta;
ctx.assertTrue(lifetime >= lowerBound, "Was expecting connection to be closed in more than " + lowerBound + ": " + lifetime);
ctx.assertTrue(lifetime <= upperBound, "Was expecting connection to be closed in less than " + upperBound + ": "+ lifetime);
latch.complete();
});
conn.connect();
});

// Start proxy
Async listenLatch = ctx.async();
proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(res -> listenLatch.complete()));
listenLatch.awaitSuccess(20_000);

poolOptions
.setPoolCleanerPeriod(poolCleanerPeriod)
.setIdleTimeout(0)
.setMaxLifetime(maxLifetime)
.setMaxLifetimeUnit(TimeUnit.MILLISECONDS);
options.setPort(8080);
options.setHost("localhost");
PgPool pool = createPool(options, poolOptions);

// Create a connection that remains in the pool
pool
.getConnection()
.flatMap(SqlClient::close)
.onComplete(ctx.asyncAssertSuccess());
}

@Test
public void testPoolConnectTimeout(TestContext ctx) {
Async async = ctx.async(2);
Expand Down Expand Up @@ -463,9 +507,9 @@ public void testPoolConnectTimeout(TestContext ctx) {
public void testNoConnectionLeaks(TestContext ctx) {
Async killConnections = ctx.async();
PgConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> {
Collector<Row, ?, List<Integer>> collector = mapping(row -> row.getInteger(0), toList());
Collector<Row, ?, List<Boolean>> collector = mapping(row -> row.getBoolean(0), toList());
String sql = "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND datname = $1";
PreparedQuery<SqlResult<List<Integer>>> preparedQuery = conn.preparedQuery(sql).collecting(collector);
PreparedQuery<SqlResult<List<Boolean>>> preparedQuery = conn.preparedQuery(sql).collecting(collector);
Tuple params = Tuple.of(options.getDatabase());
preparedQuery.execute(params).compose(cf -> conn.close()).onComplete(ctx.asyncAssertSuccess(v -> killConnections.complete()));
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setIdleTimeoutUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
}
break;
case "maxLifetime":
if (member.getValue() instanceof Number) {
obj.setMaxLifetime(((Number)member.getValue()).intValue());
}
break;
case "maxLifetimeUnit":
if (member.getValue() instanceof String) {
obj.setMaxLifetimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
}
break;
case "maxSize":
if (member.getValue() instanceof Number) {
obj.setMaxSize(((Number)member.getValue()).intValue());
Expand Down Expand Up @@ -88,6 +98,10 @@ public static void toJson(PoolOptions obj, java.util.Map<String, Object> json) {
if (obj.getIdleTimeoutUnit() != null) {
json.put("idleTimeoutUnit", obj.getIdleTimeoutUnit().name());
}
json.put("maxLifetime", obj.getMaxLifetime());
if (obj.getMaxLifetimeUnit() != null) {
json.put("maxLifetimeUnit", obj.getMaxLifetimeUnit().name());
}
json.put("maxSize", obj.getMaxSize());
json.put("maxWaitQueueSize", obj.getMaxWaitQueueSize());
if (obj.getName() != null) {
Expand Down
55 changes: 52 additions & 3 deletions vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package io.vertx.sqlclient;

import io.vertx.codegen.annotations.DataObject;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.impl.Arguments;
import io.vertx.core.json.JsonObject;

Expand Down Expand Up @@ -48,11 +47,21 @@ public class PoolOptions {
*/
public static final int DEFAULT_IDLE_TIMEOUT = 0;

/**
* Default maximum pooled connection lifetime = 0 (no maximum)
*/
public static final int DEFAULT_MAXIMUM_LIFETIME = 0;

/**
* Default connection idle time unit in the pool = seconds
*/
public static final TimeUnit DEFAULT_IDLE_TIMEOUT_TIME_UNIT = TimeUnit.SECONDS;

/**
* Default maximum pooled connection lifetime unit = seconds
*/
public static final TimeUnit DEFAULT_MAXIMUM_LIFETIME_TIME_UNIT = TimeUnit.SECONDS;

/**
* Default pool cleaner period = 1000 ms (1 second)
*/
Expand Down Expand Up @@ -87,6 +96,8 @@ public class PoolOptions {
private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE;
private int idleTimeout = DEFAULT_IDLE_TIMEOUT;
private TimeUnit idleTimeoutUnit = DEFAULT_IDLE_TIMEOUT_TIME_UNIT;
private int maxLifetime = DEFAULT_MAXIMUM_LIFETIME;
private TimeUnit maxLifetimeUnit = DEFAULT_MAXIMUM_LIFETIME_TIME_UNIT;
private int poolCleanerPeriod = DEFAULT_POOL_CLEANER_PERIOD;
private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
private TimeUnit connectionTimeoutUnit = DEFAULT_CONNECTION_TIMEOUT_TIME_UNIT;
Expand Down Expand Up @@ -177,16 +188,54 @@ public int getIdleTimeout() {
}

/**
* Establish an idle timeout for pooled connections.
* Establish an idle timeout for pooled connections, a value of zero disables the idle timeout.
*
* @param idleTimeout the pool connection idle time unitq
* @param idleTimeout the pool connection idle timeout
* @return a reference to this, so the API can be used fluently
*/
public PoolOptions setIdleTimeout(int idleTimeout) {
Arguments.require(idleTimeout >= 0, "idleTimeout must be >= 0");
this.idleTimeout = idleTimeout;
return this;
}

/**
* @return the pooled connection max lifetime unit
*/
public TimeUnit getMaxLifetimeUnit() {
return maxLifetimeUnit;
}

/**
* Establish a max lifetime unit for pooled connections.
*
* @param maxLifetimeUnit pooled connection max lifetime unit
* @return a reference to this, so the API can be used fluently
*/
public PoolOptions setMaxLifetimeUnit(TimeUnit maxLifetimeUnit) {
this.maxLifetimeUnit = maxLifetimeUnit;
return this;
}

/**
* @return pooled connection max lifetime
*/
public int getMaxLifetime() {
return maxLifetime;
}

/**
* Establish a max lifetime for pooled connections, a value of zero disables the maximum lifetime.
*
* @param maxLifetime the pool connection max lifetime
* @return a reference to this, so the API can be used fluently
*/
public PoolOptions setMaxLifetime(int maxLifetime) {
Arguments.require(maxLifetime >= 0, "maxLifetime must be >= 0");
this.maxLifetime = maxLifetime;
kdubb marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

/**
* @return the connection pool cleaner period in ms.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
private final CloseFuture closeFuture;
private final long idleTimeout;
private final long connectionTimeout;
private final long maxLifetime;
private final long cleanerPeriod;
private final int pipeliningLimit;
private volatile Handler<SqlConnectionPool.PooledConnection> connectionInitializer;
Expand All @@ -66,20 +67,23 @@ public PoolImpl(VertxInternal vertx,

this.idleTimeout = MILLISECONDS.convert(poolOptions.getIdleTimeout(), poolOptions.getIdleTimeoutUnit());
this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit());
this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit());
this.cleanerPeriod = poolOptions.getPoolCleanerPeriod();
this.timerID = -1L;
this.pipeliningLimit = pipeliningLimit;
this.vertx = vertx;
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, afterAcquire, beforeRecycle, vertx, idleTimeout, poolOptions.getMaxSize(), pipeliningLimit, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer,
afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipeliningLimit,
poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
this.closeFuture = closeFuture;
}

public Pool init() {
closeFuture.add(this);
if (idleTimeout > 0 && cleanerPeriod > 0) {
if ((idleTimeout > 0 || maxLifetime > 0) && cleanerPeriod > 0) {
synchronized (this) {
timerID = vertx.setTimer(cleanerPeriod, id -> {
checkExpired();
runEviction();
});
}
}
Expand All @@ -94,17 +98,17 @@ public Pool connectionProvider(Function<Context, Future<SqlConnection>> connecti
return this;
}

private void checkExpired() {
private void runEviction() {
synchronized (this) {
if (timerID == -1) {
// Cancelled
return;
}
timerID = vertx.setTimer(cleanerPeriod, id -> {
checkExpired();
runEviction();
});
}
pool.checkExpired();
pool.evict();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class SqlConnectionPool {
private final Function<Connection, Future<Void>> beforeRecycle;
private final int pipeliningLimit;
private final long idleTimeout;
private final long maxLifetime;
private final int maxSize;

public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProvider,
Expand All @@ -56,6 +57,7 @@ public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProv
Function<Connection, Future<Void>> beforeRecycle,
VertxInternal vertx,
long idleTimeout,
long maxLifetime,
int maxSize,
int pipeliningLimit,
int maxWaitQueueSize,
Expand All @@ -73,6 +75,7 @@ public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProv
this.vertx = vertx;
this.pipeliningLimit = pipeliningLimit;
this.idleTimeout = idleTimeout;
this.maxLifetime = maxLifetime;
this.maxSize = maxSize;
this.hook = hook;
this.connectionProvider = connectionProvider;
Expand Down Expand Up @@ -140,9 +143,9 @@ public int size() {
return pool.size();
}

public void checkExpired() {
public void evict() {
long now = System.currentTimeMillis();
pool.evict(conn -> conn.expirationTimestamp < now, ar -> {
pool.evict(conn -> conn.shouldEvict(now), ar -> {
if (ar.succeeded()) {
List<PooledConnection> res = ar.result();
for (PooledConnection conn : res) {
Expand All @@ -166,7 +169,7 @@ public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd) {
future = pooled.schedule(context, cmd);
}
return future.onComplete(v -> {
pooled.expirationTimestamp = System.currentTimeMillis() + idleTimeout;
pooled.refresh();
lease.recycle();
});
});
Expand Down Expand Up @@ -261,12 +264,15 @@ public class PooledConnection implements Connection, Connection.Holder {
private Holder holder;
private Promise<ConnectResult<PooledConnection>> poolCallback;
private Lease<PooledConnection> lease;
public long expirationTimestamp;
public long idleEvictionTimestamp;
public long lifetimeEvictionTimestamp;

PooledConnection(ConnectionFactory factory, Connection conn, PoolConnector.Listener listener) {
this.factory = factory;
this.conn = conn;
this.listener = listener;
this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime : Long.MAX_VALUE;
refresh();
}

public ConnectionFactory factory() {
Expand Down Expand Up @@ -305,6 +311,10 @@ private void close(Promise<Void> promise) {
conn.close(this, promise);
}

private void refresh() {
this.idleEvictionTimestamp = idleTimeout > 0 ? System.currentTimeMillis() + idleTimeout : Long.MAX_VALUE;
}

@Override
public void init(Holder holder) {
if (this.holder != null) {
Expand Down Expand Up @@ -348,7 +358,7 @@ private void doClose(Holder holder, Promise<Void> promise) {
private void cleanup(Promise<Void> promise) {
Lease<PooledConnection> l = this.lease;
this.lease = null;
this.expirationTimestamp = System.currentTimeMillis() + idleTimeout;
refresh();
l.recycle();
promise.complete();
}
Expand Down Expand Up @@ -394,5 +404,18 @@ public int getSecretKey() {
public Connection unwrap() {
return conn;
}

private boolean hasIdleExpired(long now) {
return idleEvictionTimestamp < now;
}

private boolean hasLifetimeExpired(long now) {
return lifetimeEvictionTimestamp < now;
}

private boolean shouldEvict(long now) {
return hasIdleExpired(now) || hasLifetimeExpired(now);
}

}
}