Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented Connection#setStatementTimeout #200

Merged
merged 7 commits into from
Jan 22, 2024
28 changes: 26 additions & 2 deletions src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Lifecycle;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
jchrys marked this conversation as resolved.
Show resolved Hide resolved
import io.r2dbc.spi.TransactionDefinition;
import io.r2dbc.spi.ValidationDepth;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -74,6 +75,10 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS

private static final ServerVersion MYSQL_8 = ServerVersion.create(8, 0, 0);

private static final ServerVersion MYSQL_5_7_4 = ServerVersion.create(5, 7, 4);

private static final ServerVersion MARIA_10_1_1 = ServerVersion.create(10, 1, 1, true);

private static final BiConsumer<ServerMessage, SynchronousSink<Boolean>> PING = (message, sink) -> {
if (message instanceof ErrorMessage) {
ErrorMessage msg = (ErrorMessage) message;
Expand Down Expand Up @@ -410,9 +415,28 @@ public Mono<Void> setLockWaitTimeout(Duration timeout) {
@Override
public Mono<Void> setStatementTimeout(Duration timeout) {
requireNonNull(timeout, "timeout must not be null");
final boolean isMariaDb = context.isMariaDb();
final ServerVersion serverVersion = context.getServerVersion();
final long timeoutMs = timeout.toMillis();
final String sql = isMariaDb ? "SET max_statement_time=" + timeoutMs / 1000.0
: "SET SESSION MAX_EXECUTION_TIME=" + timeoutMs;

// mariadb: https://mariadb.com/kb/en/aborting-statements/
// mysql: https://dev.mysql.com/blog-archive/server-side-select-statement-timeouts/
// ref: https://github.com/mariadb-corporation/mariadb-connector-r2dbc
if (isMariaDb && serverVersion.isGreaterThanOrEqualTo(MARIA_10_1_1)
|| !isMariaDb && serverVersion.isGreaterThanOrEqualTo(MYSQL_5_7_4)) {
return QueryFlow.executeVoid(client, sql);
}

// TODO: implement me
return Mono.empty();
return Mono.error(
new R2dbcNonTransientResourceException(
jchrys marked this conversation as resolved.
Show resolved Hide resolved
"Statement timeout is not supported by server version " + serverVersion,
"HY000",
-1,
sql
)
);
}

boolean isSessionAutoCommit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public R2dbcException toException() {
}

public R2dbcException toException(@Nullable String sql) {
// mysql: https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html
// mariadb: https://mariadb.com/kb/en/mariadb-error-code-reference/
// Should keep looking more error codes
switch (code) {
case 1044: // Database access denied
Expand All @@ -93,6 +95,8 @@ public R2dbcException toException(@Nullable String sql) {
return new R2dbcTransientResourceException(message, sqlState, code);
case 1205: // Wait lock timeout
case 1907: // Statement executing timeout
case 3024: // Query execution was interrupted, maximum statement execution time exceeded
case 1969: // Query execution was interrupted
return new R2dbcTimeoutException(message, sqlState, code);
case 1613: // Transaction rollback because of took too long
return new R2dbcRollbackException(message, sqlState, code);
Expand Down
22 changes: 22 additions & 0 deletions src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.asyncer.r2dbc.mysql;

import io.r2dbc.spi.R2dbcBadGrammarException;
import io.r2dbc.spi.R2dbcTimeoutException;
import io.r2dbc.spi.Result;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -50,6 +51,10 @@ void badGrammar(Function<? super MySqlConnection, Publisher<?>> runner) {
process(runner).verifyError(R2dbcBadGrammarException.class);
}

void timeout(Function<? super MySqlConnection, Publisher<?>> runner) {
process(runner).verifyError(R2dbcTimeoutException.class);
}

void illegalArgument(Function<? super MySqlConnection, Publisher<?>> runner) {
process(runner).expectError(IllegalArgumentException.class).verify(Duration.ofSeconds(3));
}
Expand Down Expand Up @@ -148,4 +153,21 @@ static boolean envIsMariaDb10_5_1() {

return ver.isGreaterThanOrEqualTo(ServerVersion.create(10, 5, 1));
}

boolean envIsLessThanMySql574OrMariaDb1011() {
String version = System.getProperty("test.mysql.version");

if (version == null || version.isEmpty()) {
return true;
}

ServerVersion ver = ServerVersion.parse(version);
String type = System.getProperty("test.db.type");

if ("mariadb".equalsIgnoreCase(type)) {
return ver.isLessThan(ServerVersion.create(10, 1, 1));
}

return ver.isLessThan(ServerVersion.create(5, 7, 4));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.asyncer.r2dbc.mysql;

import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.util.ReferenceCountUtil;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Result;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -611,6 +610,17 @@ void testUnionQueryWithJsonColumnDecodedAsString() {
);
}

@Test
@DisabledIf("envIsLessThanMySql574OrMariaDb1011")
void setStatementTimeoutTest() {
final String sql = "SELECT 1 WHERE SLEEP(1) > 1";
timeout(connection -> connection.setStatementTimeout(Duration.ofMillis(500))
.then(Mono.from(connection.createStatement(sql).execute()))
.flatMapMany(result -> Mono.from(result.map((row, metadata) -> row.get(0, String.class))))
.collectList()
);
}

private static JsonNode parseJson(String json) {
ObjectMapper mapper = new ObjectMapper();
try {
Expand Down