diff --git a/src/main/java/io/asyncer/r2dbc/mysql/ConnectionState.java b/src/main/java/io/asyncer/r2dbc/mysql/ConnectionState.java index 236801e26..33f8cf551 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/ConnectionState.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/ConnectionState.java @@ -30,6 +30,13 @@ interface ConnectionState { */ void setIsolationLevel(IsolationLevel level); + /** + * Reutrns session lock wait timeout. + * + * @return Session lock wait timeout. + */ + long getSessionLockWaitTimeout(); + /** * Sets current lock wait timeout. * diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java index f83c76c38..ed8d04b42 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java @@ -209,7 +209,7 @@ public Mono close() { @Override public Mono commitTransaction() { return Mono.defer(() -> { - return QueryFlow.doneTransaction(client, this, true, lockWaitTimeout, batchSupported); + return QueryFlow.doneTransaction(client, this, true, batchSupported); }); } @@ -223,19 +223,7 @@ public MySqlBatch createBatch() { @Override public Mono createSavepoint(String name) { requireValidName(name, "Savepoint name must not be empty and not contain backticks"); - - String sql = String.format("SAVEPOINT `%s`", name); - - return Mono.defer(() -> { - if (isInTransaction()) { - return QueryFlow.executeVoid(client, sql); - } else if (batchSupported) { - // If connection does not in transaction, then starts transaction. - return QueryFlow.executeVoid(client, "BEGIN;" + sql); - } - - return QueryFlow.executeVoid(client, "BEGIN", sql); - }); + return QueryFlow.createSavepoint(client, this, name, batchSupported); } @Override @@ -286,7 +274,7 @@ public Mono releaseSavepoint(String name) { @Override public Mono rollbackTransaction() { return Mono.defer(() -> { - return QueryFlow.doneTransaction(client, this, false, lockWaitTimeout, batchSupported); + return QueryFlow.doneTransaction(client, this, false, batchSupported); }); } @@ -371,6 +359,11 @@ public void setIsolationLevel(IsolationLevel level) { this.currentLevel = level; } + @Override + public long getSessionLockWaitTimeout() { + return lockWaitTimeout; + } + @Override public void setCurrentLockWaitTimeout(long timeoutSeconds) { this.currentLockWaitTimeout = timeoutSeconds; diff --git a/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java b/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java index f25365cdb..f6d56cf6c 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java @@ -249,9 +249,9 @@ static Mono executeVoid(Client client, String... statements) { */ static Mono beginTransaction(Client client, ConnectionState state, boolean batchSupported, TransactionDefinition definition) { - StartTransactionState startState = StartTransactionState.of(state, definition); + final StartTransactionState startState = new StartTransactionState(state, definition); - if (batchSupported || startState.isSimple()) { + if (batchSupported) { return client.exchange(new TransactionBatchExchangeable(startState)).then(); } @@ -265,21 +265,27 @@ static Mono beginTransaction(Client client, ConnectionState state, boolean * @param client the {@link Client} to exchange messages with. * @param state the connection state for checks and resets transaction statuses. * @param commit if commit, otherwise rollback. - * @param lockWaitTimeout the lock wait timeout of the initial connection state. * @param batchSupported if connection supports batch query. * @return receives complete signal. */ - static Mono doneTransaction(Client client, ConnectionState state, boolean commit, - long lockWaitTimeout, boolean batchSupported) { - CommitRollbackState commitState = CommitRollbackState.of(state, commit, lockWaitTimeout); + static Mono doneTransaction(Client client, ConnectionState state, boolean commit, boolean batchSupported) { + final CommitRollbackState commitState = new CommitRollbackState(state, commit); - if (batchSupported || commitState.isSimple()) { + if (batchSupported) { return client.exchange(new TransactionBatchExchangeable(commitState)).then(); } return client.exchange(new TransactionMultiExchangeable(commitState)).then(); } + static Mono createSavepoint(Client client, ConnectionState state, String name, boolean batchSupported) { + final CreateSavepointState savepointState = new CreateSavepointState(state, name); + if (batchSupported) { + return client.exchange(new TransactionBatchExchangeable(savepointState)).then(); + } + return client.exchange(new TransactionMultiExchangeable(savepointState)).then(); + } + /** * Execute a simple query statement. Query execution terminates with the last {@link CompleteMessage} or a * {@link ErrorMessage}. The {@link ErrorMessage} will emit an exception. The exchange will be completed @@ -979,30 +985,24 @@ abstract class AbstractTransactionState { final ConnectionState state; + final List statements = new ArrayList<>(5); + /** * A bitmap of unfinished tasks, the lowest one bit is current task. */ int tasks; - private final List statements; - @Nullable private String sql; - protected AbstractTransactionState(ConnectionState state, int tasks, List statements) { + protected AbstractTransactionState(ConnectionState state) { this.state = state; - this.tasks = tasks; - this.statements = statements; } final void setSql(String sql) { this.sql = sql; } - final boolean isSimple() { - return statements.size() == 1; - } - final String batchStatement() { if (statements.size() == 1) { return statements.get(0); @@ -1050,19 +1050,31 @@ final class CommitRollbackState extends AbstractTransactionState { private static final int COMMIT_OR_ROLLBACK = 2; - private CommitRollbackState(ConnectionState state, int tasks, List statements) { - super(state, tasks, statements); + private static final int CANCEL = 4; + + private final boolean commit; + + CommitRollbackState(ConnectionState state, boolean commit) { + super(state); + this.commit = commit; } @Override boolean cancelTasks() { - if (state.isInTransaction()) { - return false; + if (!state.isInTransaction()) { + tasks |= CANCEL; + return true; } - this.tasks = COMMIT_OR_ROLLBACK; + if (state.isLockWaitTimeoutChanged()) { + tasks |= LOCK_WAIT_TIMEOUT; + statements.add("SET innodb_lock_wait_timeout=" + state.getSessionLockWaitTimeout()); + } - return true; + tasks |= COMMIT_OR_ROLLBACK; + final String doneSql = commit ? "COMMIT" : "ROLLBACK"; + statements.add(doneSql); + return false; } @Override @@ -1075,27 +1087,15 @@ protected boolean process(int task, SynchronousSink sink) { state.resetIsolationLevel(); sink.complete(); return false; + case CANCEL: + sink.complete(); + return false; } sink.error(new IllegalStateException("Undefined commit task: " + task + ", remain: " + tasks)); return false; } - - static CommitRollbackState of(ConnectionState state, boolean commit, long lockWaitTimeout) { - String doneSql = commit ? "COMMIT" : "ROLLBACK"; - - if (state.isLockWaitTimeoutChanged()) { - List statements = new ArrayList<>(2); - - statements.add("SET innodb_lock_wait_timeout=" + lockWaitTimeout); - statements.add(doneSql); - - return new CommitRollbackState(state, LOCK_WAIT_TIMEOUT | COMMIT_OR_ROLLBACK, statements); - } - - return new CommitRollbackState(state, COMMIT_OR_ROLLBACK, Collections.singletonList(doneSql)); - } } final class StartTransactionState extends AbstractTransactionState { @@ -1106,25 +1106,37 @@ final class StartTransactionState extends AbstractTransactionState { private static final int START_TRANSACTION = 4; - private final long lockWaitTimeout; - - @Nullable - private final IsolationLevel isolationLevel; + private static final int CANCEL = 8; - private StartTransactionState(ConnectionState state, int tasks, List statements, - long lockWaitTimeout, @Nullable IsolationLevel level) { - super(state, tasks, statements); + private final TransactionDefinition definition; - this.lockWaitTimeout = lockWaitTimeout; - this.isolationLevel = level; + StartTransactionState(ConnectionState state, TransactionDefinition definition) { + super(state); + this.definition = definition; } @Override boolean cancelTasks() { if (state.isInTransaction()) { - this.tasks = START_TRANSACTION; + tasks |= CANCEL; return true; } + final Duration timeout = definition.getAttribute(TransactionDefinition.LOCK_WAIT_TIMEOUT); + if (timeout != null) { + final long lockWaitTimeout = timeout.getSeconds(); + tasks |= LOCK_WAIT_TIMEOUT; + statements.add("SET innodb_lock_wait_timeout=" + lockWaitTimeout); + } + + final IsolationLevel isolationLevel = definition.getAttribute(TransactionDefinition.ISOLATION_LEVEL); + + if (isolationLevel != null) { + statements.add("SET TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql()); + tasks |= ISOLATION_LEVEL; + } + + tasks |= START_TRANSACTION; + statements.add(buildStartTransaction(definition)); return false; } @@ -1133,58 +1145,29 @@ boolean cancelTasks() { protected boolean process(int task, SynchronousSink sink) { switch (task) { case LOCK_WAIT_TIMEOUT: - state.setCurrentLockWaitTimeout(lockWaitTimeout); + final Duration timeout = definition.getAttribute(TransactionDefinition.LOCK_WAIT_TIMEOUT); + if (timeout != null) { + final long lockWaitTimeout = timeout.getSeconds(); + state.setCurrentLockWaitTimeout(lockWaitTimeout); + } return true; case ISOLATION_LEVEL: + final IsolationLevel isolationLevel = definition.getAttribute(TransactionDefinition.ISOLATION_LEVEL); if (isolationLevel != null) { state.setIsolationLevel(isolationLevel); } return true; case START_TRANSACTION: + case CANCEL: sink.complete(); return false; + } sink.error(new IllegalStateException("Undefined transaction task: " + task + ", remain: " + tasks)); - return false; } - static StartTransactionState of(ConnectionState state, TransactionDefinition definition) { - int tasks = START_TRANSACTION; - Duration timeout = definition.getAttribute(TransactionDefinition.LOCK_WAIT_TIMEOUT); - List statements = null; - long lockWaitTimeout; - - if (timeout == null) { - lockWaitTimeout = Long.MIN_VALUE; - } else { - lockWaitTimeout = timeout.getSeconds(); - statements = new ArrayList<>(3); - statements.add("SET innodb_lock_wait_timeout=" + lockWaitTimeout); - tasks |= LOCK_WAIT_TIMEOUT; - } - - IsolationLevel isolationLevel = definition.getAttribute(TransactionDefinition.ISOLATION_LEVEL); - - if (isolationLevel != null) { - if (statements == null) { - statements = new ArrayList<>(3); - } - statements.add("SET TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql()); - tasks |= ISOLATION_LEVEL; - } - - if (statements == null) { - return new StartTransactionState(state, tasks, - Collections.singletonList(buildStartTransaction(definition)), lockWaitTimeout, null); - } - - statements.add(buildStartTransaction(definition)); - - return new StartTransactionState(state, tasks, statements, lockWaitTimeout, isolationLevel); - } - private static String buildStartTransaction(TransactionDefinition definition) { Boolean readOnly = definition.getAttribute(TransactionDefinition.READ_ONLY); Boolean snapshot = definition.getAttribute(MySqlTransactionDefinition.WITH_CONSISTENT_SNAPSHOT); @@ -1227,8 +1210,50 @@ private static String buildStartTransaction(TransactionDefinition definition) { } } +final class CreateSavepointState extends AbstractTransactionState { + + private static final int START_TRANSACTION = 1; + + private static final int CREATE_SAVEPOINT = 2; + + private final String name; + + CreateSavepointState(final ConnectionState state, final String name) { + super(state); + this.name = name; + } + + @Override + boolean cancelTasks() { + if (!state.isInTransaction()) { + tasks |= START_TRANSACTION; + statements.add("BEGIN"); + } + + final String doneSql = String.format("SAVEPOINT `%s`", name); + tasks |= CREATE_SAVEPOINT; + statements.add(doneSql); + return false; + } + + @Override + protected boolean process(int task, SynchronousSink sink) { + switch (task) { + case START_TRANSACTION: + return true; + case CREATE_SAVEPOINT: + sink.complete(); + return false; + } + + sink.error(new IllegalStateException("Undefined transaction task: " + task + ", remain: " + tasks)); + return false; + } +} + final class TransactionBatchExchangeable extends FluxExchangeable { + private final AbstractTransactionState state; TransactionBatchExchangeable(AbstractTransactionState state) { @@ -1249,7 +1274,6 @@ public void dispose() { public void subscribe(CoreSubscriber s) { if (state.cancelTasks()) { s.onSubscribe(Operators.scalarSubscription(s, PingMessage.INSTANCE)); - return; } @@ -1268,11 +1292,11 @@ final class TransactionMultiExchangeable extends FluxExchangeable { private final AbstractTransactionState state; - private final Iterator statements; + @Nullable + private Iterator statements; TransactionMultiExchangeable(AbstractTransactionState state) { this.state = state; - this.statements = state.statements(); } @Override @@ -1305,6 +1329,7 @@ public void subscribe(CoreSubscriber s) { return; } + statements = state.statements(); String sql = statements.next();