diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java index 771a923e9..c54eb05bd 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java @@ -75,9 +75,9 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS private static final ServerVersion MYSQL_8 = ServerVersion.create(8, 0, 0); - private static final ServerVersion MYSQL_STATEMENT_TIMEOUT = ServerVersion.create(5, 7, 4); + private static final ServerVersion MYSQL_5_7_4 = ServerVersion.create(5, 7, 4); - private static final ServerVersion MARIA_STATEMENT_TIMEOUT = ServerVersion.create(10, 2, 0); + private static final ServerVersion MARIA_10_2_0 = ServerVersion.create(10, 2, 0); private static final BiConsumer> PING = (message, sink) -> { if (message instanceof ErrorMessage) { @@ -403,13 +403,15 @@ public Mono setStatementTimeout(Duration timeout) { requireNonNull(timeout, "timeout must not be null"); final boolean isMariaDb = context.getCapability().isMariaDb(); final ServerVersion serverVersion = context.getServerVersion(); - final String sql = isMariaDb ? "SET max_statement_time=" + timeout.getSeconds() - : "SET SESSION MAX_EXECUTION_TIME=" + timeout.toMillis(); + final long timeoutMs = timeout.toMillis(); + final String sql = isMariaDb ? "SET max_statement_time=" + timeoutMs / 1000.0 + : "SET SESSION MAX_EXECUTION_TIME=" + timeoutMs; // mariadb: https://mariadb.com/kb/en/aborting-statements/ // mysql: https://dev.mysql.com/blog-archive/server-side-select-statement-timeouts/ - if (isMariaDb && serverVersion.isGreaterThanOrEqualTo(MARIA_STATEMENT_TIMEOUT) - || !isMariaDb && serverVersion.isGreaterThanOrEqualTo(MYSQL_STATEMENT_TIMEOUT)) { + // ref: https://github.com/mariadb-corporation/mariadb-connector-r2dbc + if (isMariaDb && serverVersion.isGreaterThanOrEqualTo(MARIA_10_2_0) + || !isMariaDb && serverVersion.isGreaterThanOrEqualTo(MYSQL_5_7_4)) { return QueryFlow.executeVoid(client, sql); } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ErrorMessage.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ErrorMessage.java index 9dee369ee..2e3503af6 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ErrorMessage.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ErrorMessage.java @@ -93,6 +93,7 @@ public R2dbcException toException(@Nullable String sql) { return new R2dbcTransientResourceException(message, sqlState, code); case 1205: // Wait lock timeout case 1907: // Statement executing timeout + case 3024: // query timeout return new R2dbcTimeoutException(message, sqlState, code); case 1613: // Transaction rollback because of took too long return new R2dbcRollbackException(message, sqlState, code); diff --git a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java index 11e754e99..3f81a383d 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java @@ -17,6 +17,7 @@ package io.asyncer.r2dbc.mysql; import io.r2dbc.spi.R2dbcBadGrammarException; +import io.r2dbc.spi.R2dbcTimeoutException; import io.r2dbc.spi.Result; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; @@ -50,6 +51,10 @@ void badGrammar(Function> runner) { process(runner).verifyError(R2dbcBadGrammarException.class); } + void timeout(Function> runner) { + process(runner).verifyError(R2dbcTimeoutException.class); + } + void illegalArgument(Function> runner) { process(runner).expectError(IllegalArgumentException.class).verify(Duration.ofSeconds(3)); } @@ -136,4 +141,21 @@ boolean envIsLessThanMySql57OrMariaDb102() { return ver.isLessThan(ServerVersion.create(5, 7, 0)); } + + boolean envIsLessThanMySql574OrMariaDb102() { + String version = System.getProperty("test.mysql.version"); + + if (version == null || version.isEmpty()) { + return true; + } + + ServerVersion ver = ServerVersion.parse(version); + String type = System.getProperty("test.db.type"); + + if ("mariadb".equalsIgnoreCase(type)) { + return ver.isLessThan(ServerVersion.create(10, 2, 0)); + } + + return ver.isLessThan(ServerVersion.create(5, 7, 4)); + } } diff --git a/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java index d03811faf..9a0051457 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java @@ -612,6 +612,17 @@ void testUnionQueryWithJsonColumnDecodedAsString() { ); } + @Test + @DisabledIf("envIsLessThanMySql574OrMariaDb102") + void setStatementTimeoutTest() { + final String sql = "SELECT 1 WHERE SLEEP(1) > 1"; + timeout(connection -> connection.setStatementTimeout(Duration.ofMillis(500)) + .then(Mono.from(connection.createStatement(sql).execute())) + .flatMapMany(result -> Mono.from(result.map((row, metadata) -> row.get(0, String.class)))) + .doOnNext(it -> System.out.println(it)) + ); + } + private static JsonNode parseJson(String json) { ObjectMapper mapper = new ObjectMapper(); try {