diff --git a/CHANGELOG.md b/CHANGELOG.md index aa465a19065..456e9bafe4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,28 @@ # Changelog +## [6.84.0](https://github.com/googleapis/java-spanner/compare/v6.83.0...v6.84.0) (2025-01-06) + + +### Features + +* Add support for ARRAY<STRUCT> to CloudCilentExecutor ([#3544](https://github.com/googleapis/java-spanner/issues/3544)) ([6cbaf7e](https://github.com/googleapis/java-spanner/commit/6cbaf7ec6502d04fc0a0c09720e2054bd10bead9)) +* Add transaction runner for connections ([#3559](https://github.com/googleapis/java-spanner/issues/3559)) ([5a1be3d](https://github.com/googleapis/java-spanner/commit/5a1be3dedeafa6858502eadc7918820b9cd90f68)) +* Exposing InstanceType in Instance configuration (to define PROVISIONED or FREE spanner instance) ([8d295c4](https://github.com/googleapis/java-spanner/commit/8d295c4a4030b4e97b1d653cc3baf412864f3042)) +* Improve tracing by adding attributes ([#3576](https://github.com/googleapis/java-spanner/issues/3576)) ([eee333b](https://github.com/googleapis/java-spanner/commit/eee333b51fa69123e011dfbd2a0896fd31ac10dc)) +* **spanner:** Add jdbc support for external hosts ([#3536](https://github.com/googleapis/java-spanner/issues/3536)) ([801346a](https://github.com/googleapis/java-spanner/commit/801346a1b2efe7d0144f7442e1568eb5b02ddcbc)) + + +### Bug Fixes + +* AsyncTransactionManager did not always close the session ([#3580](https://github.com/googleapis/java-spanner/issues/3580)) ([d9813a0](https://github.com/googleapis/java-spanner/commit/d9813a05240b966f444168d3b8c30da9d27a8cc4)) +* Retry specific internal errors ([#3565](https://github.com/googleapis/java-spanner/issues/3565)) ([b9ce1a6](https://github.com/googleapis/java-spanner/commit/b9ce1a6fcbd11373a5cc82807af15c1cca0dd48e)) +* Update max_in_use_session at 10 mins interval ([#3570](https://github.com/googleapis/java-spanner/issues/3570)) ([cc1753d](https://github.com/googleapis/java-spanner/commit/cc1753da72b3e508f8fea8a6d19e1ed3f34e3602)) + + +### Dependencies + +* Update opentelemetry.version to v1.45.0 ([#3531](https://github.com/googleapis/java-spanner/issues/3531)) ([78c82ed](https://github.com/googleapis/java-spanner/commit/78c82edb4fcc4a5a9a372225ca429038c3b34955)) + ## [6.83.0](https://github.com/googleapis/java-spanner/compare/v6.82.0...v6.83.0) (2024-12-13) diff --git a/README.md b/README.md index 2d74ad3c5d5..a64487c221e 100644 --- a/README.md +++ b/README.md @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-spanner' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-spanner:6.83.0' +implementation 'com.google.cloud:google-cloud-spanner:6.84.0' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.83.0" +libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.84.0" ``` ## Authentication @@ -725,7 +725,7 @@ Java is a registered trademark of Oracle and/or its affiliates. [kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-spanner/java11.html [stability-image]: https://img.shields.io/badge/stability-stable-green [maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-spanner.svg -[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.83.0 +[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.84.0 [authentication]: https://github.com/googleapis/google-cloud-java#authentication [auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes [predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 31ad076c283..27f32c0e9c8 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -24,7 +24,7 @@ com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/google-cloud-spanner-bom/pom.xml b/google-cloud-spanner-bom/pom.xml index d8f035ea2fb..0416dc94151 100644 --- a/google-cloud-spanner-bom/pom.xml +++ b/google-cloud-spanner-bom/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-spanner-bom - 6.83.1-SNAPSHOT + 6.84.0 pom com.google.cloud @@ -53,43 +53,43 @@ com.google.cloud google-cloud-spanner - 6.83.1-SNAPSHOT + 6.84.0 com.google.cloud google-cloud-spanner test-jar - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-admin-instance-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-admin-database-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc proto-google-cloud-spanner-admin-instance-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc proto-google-cloud-spanner-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc proto-google-cloud-spanner-admin-database-v1 - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/google-cloud-spanner-executor/pom.xml b/google-cloud-spanner-executor/pom.xml index dd43ae439e6..07cb7db2abc 100644 --- a/google-cloud-spanner-executor/pom.xml +++ b/google-cloud-spanner-executor/pom.xml @@ -5,14 +5,14 @@ 4.0.0 com.google.cloud google-cloud-spanner-executor - 6.83.1-SNAPSHOT + 6.84.0 jar Google Cloud Spanner Executor com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 5b84cb4ebc3..c6796085d83 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -814,6 +814,12 @@ com/google/cloud/spanner/connection/TransactionRetryListener void retryDmlAsPartitionedDmlFailed(java.util.UUID, com.google.cloud.spanner.Statement, java.lang.Throwable) - + + + + 7012 + com/google/cloud/spanner/connection/Connection + java.lang.Object runTransaction(com.google.cloud.spanner.connection.Connection$TransactionCallable) + diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index c5128cc19f7..2e93e6e5227 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-spanner - 6.83.1-SNAPSHOT + 6.84.0 jar Google Cloud Spanner https://github.com/googleapis/java-spanner @@ -11,7 +11,7 @@ com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 google-cloud-spanner diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 01f41a2dfdc..89371a21c51 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -253,10 +253,15 @@ public void onSessionReady(SessionImpl session) { // initiate a begin transaction request to verify if read-write transactions are // supported using multiplexed sessions. if (sessionClient - .getSpanner() - .getOptions() - .getSessionPoolOptions() - .getUseMultiplexedSessionForRW()) { + .getSpanner() + .getOptions() + .getSessionPoolOptions() + .getUseMultiplexedSessionForRW() + && !sessionClient + .getSpanner() + .getOptions() + .getSessionPoolOptions() + .getSkipVerifyBeginTransactionForMuxRW()) { verifyBeginTransactionWithRWOnMultiplexedSessionAsync(session.getName()); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index 90f5317e88d..3d6a015531f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -107,6 +107,7 @@ public void onSuccess(AsyncTransactionManagerImpl result) { new ApiFutureCallback() { @Override public void onFailure(Throwable t) { + session.close(); res.setException(t); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 36a4e5fe208..03551640b43 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -83,6 +83,7 @@ public class SessionPoolOptions { // TODO: Change to use java.time.Duration. private final Duration multiplexedSessionMaintenanceDuration; + private final boolean skipVerifyingBeginTransactionForMuxRW; private SessionPoolOptions(Builder builder) { // minSessions > maxSessions is only possible if the user has only set a value for maxSessions. @@ -132,6 +133,7 @@ private SessionPoolOptions(Builder builder) { ? useMultiplexedSessionFromEnvVariablePartitionedOps : builder.useMultiplexedSessionPartitionedOps; this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration; + this.skipVerifyingBeginTransactionForMuxRW = builder.skipVerifyingBeginTransactionForMuxRW; } @Override @@ -169,8 +171,10 @@ public boolean equals(Object o) { && Objects.equals(this.useMultiplexedSession, other.useMultiplexedSession) && Objects.equals(this.useMultiplexedSessionForRW, other.useMultiplexedSessionForRW) && Objects.equals( - this.multiplexedSessionMaintenanceDuration, - other.multiplexedSessionMaintenanceDuration); + this.multiplexedSessionMaintenanceDuration, other.multiplexedSessionMaintenanceDuration) + && Objects.equals( + this.skipVerifyingBeginTransactionForMuxRW, + other.skipVerifyingBeginTransactionForMuxRW); } @Override @@ -199,7 +203,8 @@ public int hashCode() { this.poolMaintainerClock, this.useMultiplexedSession, this.useMultiplexedSessionForRW, - this.multiplexedSessionMaintenanceDuration); + this.multiplexedSessionMaintenanceDuration, + this.skipVerifyingBeginTransactionForMuxRW); } public Builder toBuilder() { @@ -392,6 +397,12 @@ Duration getMultiplexedSessionMaintenanceDuration() { return multiplexedSessionMaintenanceDuration; } + @VisibleForTesting + @InternalApi + boolean getSkipVerifyBeginTransactionForMuxRW() { + return skipVerifyingBeginTransactionForMuxRW; + } + public static Builder newBuilder() { return new Builder(); } @@ -607,6 +618,7 @@ public static class Builder { private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7); private Clock poolMaintainerClock = Clock.INSTANCE; + private boolean skipVerifyingBeginTransactionForMuxRW = false; private static Position getReleaseToPositionFromSystemProperty() { // NOTE: This System property is a beta feature. Support for it can be removed in the future. @@ -650,6 +662,7 @@ private Builder(SessionPoolOptions options) { this.useMultiplexedSessionPartitionedOps = options.useMultiplexedSessionForPartitionedOps; this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration; this.poolMaintainerClock = options.poolMaintainerClock; + this.skipVerifyingBeginTransactionForMuxRW = options.skipVerifyingBeginTransactionForMuxRW; } /** @@ -872,6 +885,18 @@ Builder setMultiplexedSessionMaintenanceDuration( return this; } + // The additional BeginTransaction RPC for multiplexed session read-write is causing + // unexpected behavior in mock Spanner tests that rely on mocking the BeginTransaction RPC. + // Invoking this method with `true` skips sending the BeginTransaction RPC when the multiplexed + // session is created for the first time during client initialization. + // This is only used for tests. + @VisibleForTesting + Builder setSkipVerifyingBeginTransactionForMuxRW( + boolean skipVerifyingBeginTransactionForMuxRW) { + this.skipVerifyingBeginTransactionForMuxRW = skipVerifyingBeginTransactionForMuxRW; + return this; + } + /** * Sets whether the client should automatically execute a background query to detect the dialect * that is used by the database or not. Set this option to true if you do not know what the diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index cafb27ba6b7..b1d37f3e4cd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -99,7 +99,7 @@ public void rollback() { public TransactionContext resetForRetry() { if (txn == null || !txn.isAborted() && txnState != TransactionState.ABORTED) { throw new IllegalStateException( - "resetForRetry can only be called if the previous attempt" + " aborted"); + "resetForRetry can only be called if the previous attempt aborted"); } try (IScope s = tracer.withSpan(span)) { boolean useInlinedBegin = txn.transactionId != null; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java index 547d2466e3e..eb69ae132cc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java @@ -835,6 +835,21 @@ default boolean isKeepTransactionAlive() { */ ApiFuture rollbackAsync(); + /** Functional interface for the {@link #runTransaction(TransactionCallable)} method. */ + interface TransactionCallable { + /** This method is invoked with a fresh transaction on the connection. */ + T run(Connection transaction); + } + + /** + * Runs the given callable in a transaction. The transaction type is determined by the current + * state of the connection. That is; if the connection is in read/write mode, the transaction type + * will be a read/write transaction. If the connection is in read-only mode, it will be a + * read-only transaction. The transaction will automatically be retried if it is aborted by + * Spanner. + */ + T runTransaction(TransactionCallable callable); + /** Returns the current savepoint support for this connection. */ SavepointSupport getSavepointSupport(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java index 2d7c917d230..5ea249ee0ac 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java @@ -194,6 +194,11 @@ private LeakedConnectionException() { */ private final ConnectionOptions options; + enum Caller { + APPLICATION, + TRANSACTION_RUNNER, + } + /** The supported batch modes. */ enum BatchMode { NONE, @@ -267,6 +272,9 @@ static UnitOfWorkType of(TransactionMode transactionMode) { */ private boolean transactionBeginMarked = false; + /** This field is set to true when a transaction runner is active for this connection. */ + private boolean transactionRunnerActive = false; + private BatchMode batchMode; private UnitOfWorkType unitOfWorkType; private final Stack transactionStack = new Stack<>(); @@ -1164,16 +1172,19 @@ public void onFailure() { @Override public void commit() { - get(commitAsync(CallType.SYNC)); + get(commitAsync(CallType.SYNC, Caller.APPLICATION)); } @Override public ApiFuture commitAsync() { - return commitAsync(CallType.ASYNC); + return commitAsync(CallType.ASYNC, Caller.APPLICATION); } - private ApiFuture commitAsync(CallType callType) { + ApiFuture commitAsync(CallType callType, Caller caller) { ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG); + ConnectionPreconditions.checkState( + !transactionRunnerActive || caller == Caller.TRANSACTION_RUNNER, + "Cannot call commit when a transaction runner is active"); maybeAutoCommitOrFlushCurrentUnitOfWork(COMMIT_STATEMENT.getType(), COMMIT_STATEMENT); return endCurrentTransactionAsync(callType, commit, COMMIT_STATEMENT); } @@ -1201,16 +1212,19 @@ public void onFailure() { @Override public void rollback() { - get(rollbackAsync(CallType.SYNC)); + get(rollbackAsync(CallType.SYNC, Caller.APPLICATION)); } @Override public ApiFuture rollbackAsync() { - return rollbackAsync(CallType.ASYNC); + return rollbackAsync(CallType.ASYNC, Caller.APPLICATION); } - private ApiFuture rollbackAsync(CallType callType) { + ApiFuture rollbackAsync(CallType callType, Caller caller) { ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG); + ConnectionPreconditions.checkState( + !transactionRunnerActive || caller == Caller.TRANSACTION_RUNNER, + "Cannot call rollback when a transaction runner is active"); maybeAutoCommitOrFlushCurrentUnitOfWork(ROLLBACK_STATEMENT.getType(), ROLLBACK_STATEMENT); return endCurrentTransactionAsync(callType, rollback, ROLLBACK_STATEMENT); } @@ -1243,6 +1257,27 @@ private ApiFuture endCurrentTransactionAsync( return res; } + @Override + public T runTransaction(TransactionCallable callable) { + ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG); + ConnectionPreconditions.checkState(!isBatchActive(), "Cannot run transaction while in a batch"); + ConnectionPreconditions.checkState( + !isTransactionStarted(), "Cannot run transaction when a transaction is already active"); + ConnectionPreconditions.checkState( + !transactionRunnerActive, "A transaction runner is already active for this connection"); + this.transactionRunnerActive = true; + try { + return new TransactionRunnerImpl(this).run(callable); + } finally { + this.transactionRunnerActive = false; + } + } + + void resetForRetry(UnitOfWork retryUnitOfWork) { + retryUnitOfWork.resetForRetry(); + this.currentUnitOfWork = retryUnitOfWork; + } + @Override public SavepointSupport getSavepointSupport() { return getConnectionPropertyValue(SAVEPOINT_SUPPORT); @@ -2000,7 +2035,7 @@ private UnitOfWork maybeStartAutoDmlBatch(UnitOfWork transaction) { return transaction; } - private UnitOfWork getCurrentUnitOfWorkOrStartNewUnitOfWork() { + UnitOfWork getCurrentUnitOfWorkOrStartNewUnitOfWork() { return getCurrentUnitOfWorkOrStartNewUnitOfWork( StatementType.UNKNOWN, /* parsedStatement = */ null, /* internalMetadataQuery = */ false); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java index 4ae0ae00608..1f6ab6bf0c6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java @@ -1261,6 +1261,11 @@ private ApiFuture rollbackAsync(CallType callType, boolean updateStatusAnd } } + @Override + public void resetForRetry() { + txContextFuture = ApiFutures.immediateFuture(txManager.resetForRetry()); + } + @Override String getUnitOfWorkName() { return "read/write transaction"; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/TransactionRunnerImpl.java new file mode 100644 index 00000000000..6c959d3e5f9 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/TransactionRunnerImpl.java @@ -0,0 +1,62 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.connection; + +import static com.google.cloud.spanner.SpannerApiFutures.get; + +import com.google.cloud.spanner.AbortedException; +import com.google.cloud.spanner.SpannerExceptionFactory; +import com.google.cloud.spanner.connection.Connection.TransactionCallable; +import com.google.cloud.spanner.connection.ConnectionImpl.Caller; +import com.google.cloud.spanner.connection.UnitOfWork.CallType; + +class TransactionRunnerImpl { + private final ConnectionImpl connection; + + TransactionRunnerImpl(ConnectionImpl connection) { + this.connection = connection; + } + + T run(TransactionCallable callable) { + connection.beginTransaction(); + // Disable internal retries during this transaction. + connection.setRetryAbortsInternally(/* retryAbortsInternally = */ false, /* local = */ true); + UnitOfWork transaction = connection.getCurrentUnitOfWorkOrStartNewUnitOfWork(); + while (true) { + try { + T result = callable.run(connection); + get(connection.commitAsync(CallType.SYNC, Caller.TRANSACTION_RUNNER)); + return result; + } catch (AbortedException abortedException) { + try { + //noinspection BusyWait + Thread.sleep(abortedException.getRetryDelayInMillis()); + connection.resetForRetry(transaction); + } catch (InterruptedException interruptedException) { + connection.rollbackAsync(CallType.SYNC, Caller.TRANSACTION_RUNNER); + throw SpannerExceptionFactory.propagateInterrupt(interruptedException); + } catch (Throwable t) { + connection.rollbackAsync(CallType.SYNC, Caller.TRANSACTION_RUNNER); + throw t; + } + } catch (Throwable t) { + connection.rollbackAsync(CallType.SYNC, Caller.TRANSACTION_RUNNER); + throw t; + } + } + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/UnitOfWork.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/UnitOfWork.java index ffa93d486e1..80981922225 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/UnitOfWork.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/UnitOfWork.java @@ -125,6 +125,10 @@ interface EndTransactionCallback { ApiFuture rollbackAsync( @Nonnull CallType callType, @Nonnull EndTransactionCallback callback); + default void resetForRetry() { + throw new UnsupportedOperationException(); + } + /** @see Connection#savepoint(String) */ void savepoint(@Nonnull String name, @Nonnull Dialect dialect); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java index 2449b8fba7c..371d3688c94 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; @@ -1084,6 +1085,37 @@ public void onSuccess(Long aLong) { } } + @Test + public void testAbandonedAsyncTransactionManager_rollbackFails() throws Exception { + mockSpanner.setRollbackExecutionTime( + SimulatedExecutionTime.ofException(Status.PERMISSION_DENIED.asRuntimeException())); + + boolean gotException = false; + try (AsyncTransactionManager manager = client().transactionManagerAsync()) { + TransactionContextFuture transactionContextFuture = manager.beginAsync(); + while (true) { + try { + AsyncTransactionStep updateCount = + transactionContextFuture.then( + (transactionContext, ignored) -> + transactionContext.executeUpdateAsync(UPDATE_STATEMENT), + executor); + assertEquals(1L, updateCount.get().longValue()); + // Break without committing or rolling back the transaction. + break; + } catch (AbortedException e) { + transactionContextFuture = manager.resetForRetryAsync(); + } + } + } catch (SpannerException spannerException) { + // The error from the automatically executed Rollback is surfaced when the + // AsyncTransactionManager is closed. + assertEquals(ErrorCode.PERMISSION_DENIED, spannerException.getErrorCode()); + gotException = true; + } + assertTrue(gotException); + } + private boolean isMultiplexedSessionsEnabled() { if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) { return false; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 86d0bfc2c94..1ee60509d55 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -270,6 +270,9 @@ public void tearDown() { @Test public void testPoolMaintainer_whenInactiveTransactionAndSessionIsNotFoundOnBackend_removeSessionsFromPool() { + assumeFalse( + "Session pool maintainer test skipped for multiplexed sessions", + isMultiplexedSessionsEnabledForRW()); FakeClock poolMaintainerClock = new FakeClock(); InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = InactiveTransactionRemovalOptions.newBuilder() @@ -347,6 +350,9 @@ public void tearDown() { @Test public void testPoolMaintainer_whenInactiveTransactionAndSessionExistsOnBackend_removeSessionsFromPool() { + assumeFalse( + "Session leaks tests are skipped for multiplexed sessions", + isMultiplexedSessionsEnabledForRW()); FakeClock poolMaintainerClock = new FakeClock(); InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = InactiveTransactionRemovalOptions.newBuilder() @@ -482,6 +488,9 @@ public void testPoolMaintainer_whenLongRunningPartitionedUpdateRequest_takeNoAct */ @Test public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessionsFromPool() { + assumeFalse( + "Session leaks tests are skipped for multiplexed sessions", + isMultiplexedSessionsEnabledForRW()); FakeClock poolMaintainerClock = new FakeClock(); InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = InactiveTransactionRemovalOptions.newBuilder() @@ -3085,6 +3094,7 @@ public void testDatabaseOrInstanceDoesNotExistOnCreate() { .run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT))); // No additional requests should have been sent by the client. // Note that in case of the use of multiplexed sessions, then we have 2 requests: + // Note that in case of the use of regular sessions, then we have 1 request: // 1. BatchCreateSessions for the session pool. // 2. CreateSession for the multiplexed session. assertThat(mockSpanner.getRequests()) @@ -3211,9 +3221,16 @@ public void testDatabaseOrInstanceIsDeletedAndThenRecreated() throws Exception { ResourceNotFoundException.class, () -> dbClient.singleUse().executeQuery(SELECT1)); } - assertThrows( - ResourceNotFoundException.class, - () -> dbClient.readWriteTransaction().run(transaction -> null)); + if (!spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()) { + // We only verify this for read-write transactions if we are not using multiplexed + // sessions. For multiplexed sessions, we don't need any special handling, as deleting the + // database will also invalidate the multiplexed session, and trying to continue to use it + // will continue to return an error. + assertThrows( + ResourceNotFoundException.class, + () -> dbClient.readWriteTransaction().run(transaction -> null)); + } + assertThat(mockSpanner.getRequests()).isEmpty(); // Now get a new database client. Normally multiple calls to Spanner#getDatabaseClient will // return the same instance, but not when the instance has been invalidated by a @@ -3300,13 +3317,18 @@ public void testAllowNestedTransactions() throws InterruptedException { Thread.sleep(1L); } assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); + int expectedMinSessions = + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW() + ? minSessions + : minSessions - 1; Long res = client .readWriteTransaction() .allowNestedTransaction() .run( transaction -> { - assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1); + assertThat(client.pool.getNumberOfSessionsInPool()) + .isEqualTo(expectedMinSessions); return transaction.executeUpdate(UPDATE_STATEMENT); }); assertThat(res).isEqualTo(UPDATE_COUNT); @@ -3333,6 +3355,9 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio } assertThat(client1.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); assertThat(client2.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); + // When read-write transaction uses multiplexed sessions, then sessions are not checked out from + // the session pool. + int expectedMinSessions = isMultiplexedSessionsEnabledForRW() ? minSessions : minSessions - 1; Long res = client1 .readWriteTransaction() @@ -3341,7 +3366,8 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio transaction -> { // Client1 should have 1 session checked out. // Client2 should have 0 sessions checked out. - assertThat(client1.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1); + assertThat(client1.pool.getNumberOfSessionsInPool()) + .isEqualTo(expectedMinSessions); assertThat(client2.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); Long add = client2 @@ -3350,9 +3376,9 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio transaction1 -> { // Both clients should now have 1 session checked out. assertThat(client1.pool.getNumberOfSessionsInPool()) - .isEqualTo(minSessions - 1); + .isEqualTo(expectedMinSessions); assertThat(client2.pool.getNumberOfSessionsInPool()) - .isEqualTo(minSessions - 1); + .isEqualTo(expectedMinSessions); try (ResultSet rs = transaction1.executeQuery(SELECT1)) { if (rs.next()) { return rs.getLong(0); @@ -5090,6 +5116,9 @@ public void testRetryOnResourceExhausted() { @Test public void testSessionPoolExhaustedError_containsStackTraces() { + assumeFalse( + "Session pool tests are skipped for multiplexed sessions", + isMultiplexedSessionsEnabledForRW()); try (Spanner spanner = SpannerOptions.newBuilder() .setProjectId(TEST_PROJECT) @@ -5450,4 +5479,11 @@ private boolean isMultiplexedSessionsEnabled() { } return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession(); } + + private boolean isMultiplexedSessionsEnabledForRW() { + if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) { + return false; + } + return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW(); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index c67c0084674..94b6de149c3 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -191,6 +191,13 @@ public void setUp() { .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()) .setTrackTransactionStarter() + // The extra BeginTransaction RPC for multiplexed session read-write is causing + // unexpected behavior in tests having a mock on the BeginTransaction RPC. Therefore, + // this is being skipped. + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setSkipVerifyingBeginTransactionForMuxRW(true) + .build()) .build() .getService(); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 676cb05eb07..9f27b28d323 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -62,6 +62,7 @@ import com.google.spanner.v1.PartitionReadRequest; import com.google.spanner.v1.PartitionResponse; import com.google.spanner.v1.ReadRequest; +import com.google.spanner.v1.RequestOptions; import com.google.spanner.v1.ResultSet; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.ResultSetStats; @@ -1829,7 +1830,7 @@ private ByteString getTransactionId(Session session, TransactionSelector tx) { transactionId = null; break; case BEGIN: - transactionId = beginTransaction(session, tx.getBegin(), null).getId(); + transactionId = beginTransaction(session, tx.getBegin(), null, null).getId(); break; case ID: Transaction transaction = transactions.get(tx.getId()); @@ -1895,7 +1896,8 @@ public void beginTransaction( beginTransactionExecutionTime.simulateExecutionTime( exceptions, stickyGlobalExceptions, freezeLock); Transaction transaction = - beginTransaction(session, request.getOptions(), request.getMutationKey()); + beginTransaction( + session, request.getOptions(), request.getMutationKey(), request.getRequestOptions()); responseObserver.onNext(transaction); responseObserver.onCompleted(); } catch (StatusRuntimeException t) { @@ -1906,7 +1908,10 @@ public void beginTransaction( } private Transaction beginTransaction( - Session session, TransactionOptions options, com.google.spanner.v1.Mutation mutationKey) { + Session session, + TransactionOptions options, + com.google.spanner.v1.Mutation mutationKey, + RequestOptions requestOptions) { ByteString transactionId = generateTransactionName(session.getName()); Transaction.Builder builder = Transaction.newBuilder().setId(transactionId); if (options != null && options.getModeCase() == ModeCase.READ_ONLY) { @@ -1920,12 +1925,17 @@ private Transaction beginTransaction( } Transaction transaction = builder.build(); transactions.put(transaction.getId(), transaction); - transactionsStarted.add(transaction.getId()); + // TODO: remove once UNIMPLEMENTED error is not thrown for read-write mux + // Do not consider the transaction if this request was from background thread + if (requestOptions == null + || !requestOptions.getTransactionTag().equals("multiplexed-rw-background-begin-txn")) { + transactionsStarted.add(transaction.getId()); + if (abortNextTransaction.getAndSet(false)) { + markAbortedTransaction(transaction.getId()); + } + } isPartitionedDmlTransaction.put( transaction.getId(), options.getModeCase() == ModeCase.PARTITIONED_DML); - if (abortNextTransaction.getAndSet(false)) { - markAbortedTransaction(transaction.getId()); - } return transaction; } @@ -2025,7 +2035,8 @@ public void commit(CommitRequest request, StreamObserver respons TransactionOptions.newBuilder() .setReadWrite(ReadWrite.getDefaultInstance()) .build(), - null); + null, + request.getRequestOptions()); } else if (request.getTransactionId() != null) { transaction = transactions.get(request.getTransactionId()); Optional aborted = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryApiTracerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryApiTracerTest.java index e4d25f1d9b3..65bb5f5f0d7 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryApiTracerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryApiTracerTest.java @@ -135,6 +135,7 @@ public void createSpannerInstance() { SessionPoolOptions.newBuilder() .setWaitForMinSessionsDuration(Duration.ofSeconds(5L)) .setFailOnSessionLeak() + .setSkipVerifyingBeginTransactionForMuxRW(true) .build()) .setEnableApiTracing(true) .build() @@ -428,6 +429,7 @@ public boolean isEnableApiTracing() { SessionPoolOptions.newBuilder() .setWaitForMinSessionsDuration(Duration.ofSeconds(5L)) .setFailOnSessionLeak() + .setSkipVerifyingBeginTransactionForMuxRW(true) .build()) .build() .getService(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java index 36a4e697a32..4a9fdf8cb55 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java @@ -159,6 +159,7 @@ public void createSpannerInstance() { SessionPoolOptions.newBuilder() .setWaitForMinSessionsDuration(Duration.ofSeconds(5L)) .setFailOnSessionLeak() + .setSkipVerifyingBeginTransactionForMuxRW(true) .build()) .setBuiltInMetricsEnabled(false) .setApiTracerFactory(metricsTracerFactory) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java index f070f154216..5496ad531bf 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java @@ -538,6 +538,9 @@ public void readOnlyTransactionReadRowUsingIndexNonRecoverable() throws Interrup @Test public void readWriteTransactionReadOnlySessionInPool() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); SessionPoolOptions.Builder builder = SessionPoolOptions.newBuilder(); if (failOnInvalidatedSession) { builder.setFailIfSessionNotFound(); @@ -568,6 +571,9 @@ public void readWriteTransactionReadOnlySessionInPool() throws InterruptedExcept @Test public void readWriteTransactionSelect() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -582,6 +588,9 @@ public void readWriteTransactionSelect() throws InterruptedException { @Test public void readWriteTransactionRead() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -597,6 +606,9 @@ public void readWriteTransactionRead() throws InterruptedException { @Test public void readWriteTransactionReadWithOptimisticLock() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(Options.optimisticLock()); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -612,6 +624,9 @@ public void readWriteTransactionReadWithOptimisticLock() throws InterruptedExcep @Test public void readWriteTransactionReadUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -628,6 +643,9 @@ public void readWriteTransactionReadUsingIndex() throws InterruptedException { @Test public void readWriteTransactionReadRow() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -638,6 +656,9 @@ public void readWriteTransactionReadRow() throws InterruptedException { @Test public void readWriteTransactionReadRowUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -649,6 +670,9 @@ public void readWriteTransactionReadRowUsingIndex() throws InterruptedException @Test public void readWriteTransactionUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> runner.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT))); @@ -656,6 +680,9 @@ public void readWriteTransactionUpdate() throws InterruptedException { @Test public void readWriteTransactionBatchUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -666,6 +693,9 @@ public void readWriteTransactionBatchUpdate() throws InterruptedException { @Test public void readWriteTransactionBuffer() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -678,6 +708,9 @@ public void readWriteTransactionBuffer() throws InterruptedException { @Test public void readWriteTransactionSelectInvalidatedDuringTransaction() { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); final AtomicInteger attempt = new AtomicInteger(); assertThrowsSessionNotFoundIfShouldFail( @@ -702,6 +735,9 @@ public void readWriteTransactionSelectInvalidatedDuringTransaction() { @Test public void readWriteTransactionReadInvalidatedDuringTransaction() { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); final AtomicInteger attempt = new AtomicInteger(); assertThrowsSessionNotFoundIfShouldFail( @@ -728,6 +764,9 @@ public void readWriteTransactionReadInvalidatedDuringTransaction() { @Test public void readWriteTransactionReadUsingIndexInvalidatedDuringTransaction() { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); final AtomicInteger attempt = new AtomicInteger(); assertThrowsSessionNotFoundIfShouldFail( @@ -756,6 +795,9 @@ public void readWriteTransactionReadUsingIndexInvalidatedDuringTransaction() { @Test public void readWriteTransactionReadRowInvalidatedDuringTransaction() { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); final AtomicInteger attempt = new AtomicInteger(); assertThrowsSessionNotFoundIfShouldFail( @@ -778,6 +820,9 @@ public void readWriteTransactionReadRowInvalidatedDuringTransaction() { @Test public void readWriteTransactionReadRowUsingIndexInvalidatedDuringTransaction() { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); final AtomicInteger attempt = new AtomicInteger(); assertThrowsSessionNotFoundIfShouldFail( @@ -803,6 +848,9 @@ public void readWriteTransactionReadRowUsingIndexInvalidatedDuringTransaction() @SuppressWarnings("resource") @Test public void transactionManagerReadOnlySessionInPool() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -825,6 +873,9 @@ public void transactionManagerReadOnlySessionInPool() throws InterruptedExceptio @SuppressWarnings("resource") @Test public void transactionManagerSelect() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -847,6 +898,9 @@ public void transactionManagerSelect() throws InterruptedException { @SuppressWarnings("resource") @Test public void transactionManagerRead() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -870,6 +924,9 @@ public void transactionManagerRead() throws InterruptedException { @SuppressWarnings("resource") @Test public void transactionManagerReadUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -893,6 +950,9 @@ public void transactionManagerReadUsingIndex() throws InterruptedException { @Test public void transactionManagerReadRow() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -914,6 +974,9 @@ public void transactionManagerReadRow() throws InterruptedException { @Test public void transactionManagerReadRowUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -937,6 +1000,9 @@ public void transactionManagerReadRowUsingIndex() throws InterruptedException { @Test public void transactionManagerUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager(Options.commitStats())) { TransactionContext transaction = manager.begin(); while (true) { @@ -958,6 +1024,9 @@ public void transactionManagerUpdate() throws InterruptedException { @Test public void transactionManagerAborted_thenSessionNotFoundOnBeginTransaction() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); int attempt = 0; try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); @@ -990,6 +1059,9 @@ public void transactionManagerAborted_thenSessionNotFoundOnBeginTransaction() @Test public void transactionManagerBatchUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -1013,6 +1085,9 @@ public void transactionManagerBatchUpdate() throws InterruptedException { @SuppressWarnings("resource") @Test public void transactionManagerBuffer() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -1037,6 +1112,9 @@ public void transactionManagerBuffer() throws InterruptedException { @SuppressWarnings("resource") @Test public void transactionManagerSelectInvalidatedDuringTransaction() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); SessionPoolOptions.Builder builder = SessionPoolOptions.newBuilder(); if (failOnInvalidatedSession) { builder.setFailIfSessionNotFound(); @@ -1083,6 +1161,9 @@ public void transactionManagerSelectInvalidatedDuringTransaction() throws Interr @SuppressWarnings("resource") @Test public void transactionManagerReadInvalidatedDuringTransaction() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); SessionPoolOptions.Builder builder = SessionPoolOptions.newBuilder(); if (failOnInvalidatedSession) { builder.setFailIfSessionNotFound(); @@ -1131,6 +1212,9 @@ public void transactionManagerReadInvalidatedDuringTransaction() throws Interrup @Test public void transactionManagerReadUsingIndexInvalidatedDuringTransaction() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); SessionPoolOptions.Builder builder = SessionPoolOptions.newBuilder(); if (failOnInvalidatedSession) { builder.setFailIfSessionNotFound(); @@ -1180,6 +1264,9 @@ public void transactionManagerReadUsingIndexInvalidatedDuringTransaction() @SuppressWarnings("resource") @Test public void transactionManagerReadRowInvalidatedDuringTransaction() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); SessionPoolOptions.Builder builder = SessionPoolOptions.newBuilder(); if (failOnInvalidatedSession) { builder.setFailIfSessionNotFound(); @@ -1226,6 +1313,9 @@ public void transactionManagerReadRowInvalidatedDuringTransaction() throws Inter @Test public void transactionManagerReadRowUsingIndexInvalidatedDuringTransaction() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); SessionPoolOptions.Builder builder = SessionPoolOptions.newBuilder(); if (failOnInvalidatedSession) { builder.setFailIfSessionNotFound(); @@ -1283,6 +1373,9 @@ public void partitionedDml() throws InterruptedException { @Test public void write() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); assertThrowsSessionNotFoundIfShouldFail( () -> client.write(Collections.singletonList(Mutation.delete("FOO", KeySet.all())))); } @@ -1300,17 +1393,26 @@ public void writeAtLeastOnce() throws InterruptedException { @Test public void asyncRunnerSelect() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncRunner_withReadFunction(input -> input.executeQueryAsync(SELECT1AND2)); } @Test public void asyncRunnerRead() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncRunner_withReadFunction( input -> input.readAsync("FOO", KeySet.all(), Collections.singletonList("BAR"))); } @Test public void asyncRunnerReadUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncRunner_withReadFunction( input -> input.readUsingIndexAsync( @@ -1319,6 +1421,9 @@ public void asyncRunnerReadUsingIndex() throws InterruptedException { private void asyncRunner_withReadFunction( final Function readFunction) throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); final ExecutorService queryExecutor = Executors.newSingleThreadExecutor(); try { AsyncRunner runner = client.runAsync(); @@ -1356,6 +1461,9 @@ private void asyncRunner_withReadFunction( @Test public void asyncRunnerReadRow() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); AsyncRunner runner = client.runAsync(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -1367,6 +1475,9 @@ public void asyncRunnerReadRow() throws InterruptedException { @Test public void asyncRunnerReadRowUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); AsyncRunner runner = client.runAsync(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -1380,6 +1491,9 @@ public void asyncRunnerReadRowUsingIndex() throws InterruptedException { @Test public void asyncRunnerUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); AsyncRunner runner = client.runAsync(); assertThrowsSessionNotFoundIfShouldFail( () -> get(runner.runAsync(txn -> txn.executeUpdateAsync(UPDATE_STATEMENT), executor))); @@ -1387,6 +1501,9 @@ public void asyncRunnerUpdate() throws InterruptedException { @Test public void asyncRunnerBatchUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); AsyncRunner runner = client.runAsync(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -1398,6 +1515,9 @@ public void asyncRunnerBatchUpdate() throws InterruptedException { @Test public void asyncRunnerBuffer() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); AsyncRunner runner = client.runAsync(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -1412,17 +1532,26 @@ public void asyncRunnerBuffer() throws InterruptedException { @Test public void asyncTransactionManagerAsyncSelect() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readAsync(input -> input.executeQueryAsync(SELECT1AND2)); } @Test public void asyncTransactionManagerAsyncRead() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readAsync( input -> input.readAsync("FOO", KeySet.all(), Collections.singletonList("BAR"))); } @Test public void asyncTransactionManagerAsyncReadUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readAsync( input -> input.readUsingIndexAsync( @@ -1431,6 +1560,9 @@ public void asyncTransactionManagerAsyncReadUsingIndex() throws InterruptedExcep private void asyncTransactionManager_readAsync( final Function fn) throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); final ExecutorService queryExecutor = Executors.newSingleThreadExecutor(); try (AsyncTransactionManager manager = client.transactionManagerAsync()) { TransactionContextFuture context = manager.beginAsync(); @@ -1475,17 +1607,26 @@ private void asyncTransactionManager_readAsync( @Test public void asyncTransactionManagerSelect() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readSync(input -> input.executeQuery(SELECT1AND2)); } @Test public void asyncTransactionManagerRead() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readSync( input -> input.read("FOO", KeySet.all(), Collections.singletonList("BAR"))); } @Test public void asyncTransactionManagerReadUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readSync( input -> input.readUsingIndex("FOO", "idx", KeySet.all(), Collections.singletonList("BAR"))); @@ -1493,6 +1634,9 @@ public void asyncTransactionManagerReadUsingIndex() throws InterruptedException private void asyncTransactionManager_readSync(final Function fn) throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); final ExecutorService queryExecutor = Executors.newSingleThreadExecutor(); try (AsyncTransactionManager manager = client.transactionManagerAsync()) { TransactionContextFuture context = manager.beginAsync(); @@ -1524,6 +1668,9 @@ private void asyncTransactionManager_readSync(final Function ApiFutures.immediateFuture( @@ -1532,6 +1679,9 @@ public void asyncTransactionManagerReadRow() throws InterruptedException { @Test public void asyncTransactionManagerReadRowUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readRowFunction( input -> ApiFutures.immediateFuture( @@ -1541,12 +1691,18 @@ public void asyncTransactionManagerReadRowUsingIndex() throws InterruptedExcepti @Test public void asyncTransactionManagerReadRowAsync() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readRowFunction( input -> input.readRowAsync("FOO", Key.of("foo"), Collections.singletonList("BAR"))); } @Test public void asyncTransactionManagerReadRowUsingIndexAsync() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readRowFunction( input -> input.readRowUsingIndexAsync( @@ -1555,6 +1711,9 @@ public void asyncTransactionManagerReadRowUsingIndexAsync() throws InterruptedEx private void asyncTransactionManager_readRowFunction( final Function> fn) throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); final ExecutorService queryExecutor = Executors.newSingleThreadExecutor(); try (AsyncTransactionManager manager = client.transactionManagerAsync()) { TransactionContextFuture context = manager.beginAsync(); @@ -1576,18 +1735,27 @@ private void asyncTransactionManager_readRowFunction( @Test public void asyncTransactionManagerUpdateAsync() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_updateFunction( input -> input.executeUpdateAsync(UPDATE_STATEMENT), UPDATE_COUNT); } @Test public void asyncTransactionManagerUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_updateFunction( input -> ApiFutures.immediateFuture(input.executeUpdate(UPDATE_STATEMENT)), UPDATE_COUNT); } @Test public void asyncTransactionManagerBatchUpdateAsync() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_updateFunction( input -> input.batchUpdateAsync(Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT)), new long[] {UPDATE_COUNT, UPDATE_COUNT}); @@ -1595,6 +1763,9 @@ public void asyncTransactionManagerBatchUpdateAsync() throws InterruptedExceptio @Test public void asyncTransactionManagerBatchUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_updateFunction( input -> ApiFutures.immediateFuture( @@ -1604,6 +1775,9 @@ public void asyncTransactionManagerBatchUpdate() throws InterruptedException { private void asyncTransactionManager_updateFunction( final Function> fn, T expected) throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (AsyncTransactionManager manager = client.transactionManagerAsync()) { TransactionContextFuture transaction = manager.beginAsync(); while (true) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java index 4672f03aeff..8ccea443dc1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java @@ -171,6 +171,9 @@ public void testIgnoreLeakedSession() { @Test public void testReadWriteTransactionExceptionOnCreateSession() { + assumeFalse( + "Session Leaks do not occur with Multiplexed Sessions", + isMultiplexedSessionsEnabledForRW()); readWriteTransactionTest( () -> mockSpanner.setBatchCreateSessionsExecutionTime( @@ -180,6 +183,9 @@ public void testReadWriteTransactionExceptionOnCreateSession() { @Test public void testReadWriteTransactionExceptionOnBegin() { + assumeFalse( + "Session Leaks do not occur with Multiplexed Sessions", + isMultiplexedSessionsEnabledForRW()); readWriteTransactionTest( () -> mockSpanner.setBeginTransactionExecutionTime( @@ -200,6 +206,9 @@ private void readWriteTransactionTest( @Test public void testTransactionManagerExceptionOnCreateSession() { + assumeFalse( + "Session Leaks do not occur with Multiplexed Sessions", + isMultiplexedSessionsEnabledForRW()); transactionManagerTest( () -> mockSpanner.setBatchCreateSessionsExecutionTime( @@ -209,6 +218,9 @@ public void testTransactionManagerExceptionOnCreateSession() { @Test public void testTransactionManagerExceptionOnBegin() { + assumeFalse( + "Session Leaks do not occur with Multiplexed Sessions", + isMultiplexedSessionsEnabledForRW()); assertThat(pool.getNumberOfSessionsInPool(), is(equalTo(0))); mockSpanner.setBeginTransactionExecutionTime( SimulatedExecutionTime.ofException(FAILED_PRECONDITION)); @@ -229,4 +241,11 @@ private void transactionManagerTest(Runnable setup, int expectedNumberOfSessions } assertEquals(expectedNumberOfSessionsAfterExecution, pool.getNumberOfSessionsInPool()); } + + private boolean isMultiplexedSessionsEnabledForRW() { + if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) { + return false; + } + return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW(); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java index bd346c4f18b..14658210f44 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java @@ -188,6 +188,8 @@ private SpannerOptions createSpannerOptions() { .setCompressorName("gzip") .setHost("http://" + endpoint) .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder().setSkipVerifyingBeginTransactionForMuxRW(true).build()) .build(); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index 10b13125152..aee3d5ed5b4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -234,6 +234,21 @@ public void usesPreparedTransaction() { com.google.protobuf.Timestamp.newBuilder() .setSeconds(System.currentTimeMillis() * 1000)) .build())); + when(rpc.createSession( + Mockito.anyString(), + Mockito.anyString(), + Mockito.anyMap(), + Mockito.eq(null), + Mockito.eq(true))) + .thenAnswer( + invocation -> + Session.newBuilder() + .setName(invocation.getArguments()[0] + "/sessions/1") + .setMultiplexed(true) + .setCreateTime( + com.google.protobuf.Timestamp.newBuilder() + .setSeconds(System.currentTimeMillis() * 1000)) + .build()); when(rpc.beginTransactionAsync( Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( @@ -300,6 +315,21 @@ public void inlineBegin() { com.google.protobuf.Timestamp.newBuilder() .setSeconds(System.currentTimeMillis() * 1000)) .build())); + when(rpc.createSession( + Mockito.anyString(), + Mockito.anyString(), + Mockito.anyMap(), + Mockito.eq(null), + Mockito.eq(true))) + .thenAnswer( + invocation -> + Session.newBuilder() + .setName(invocation.getArguments()[0] + "/sessions/1") + .setMultiplexed(true) + .setCreateTime( + com.google.protobuf.Timestamp.newBuilder() + .setSeconds(System.currentTimeMillis() * 1000)) + .build()); when(rpc.beginTransactionAsync( Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 1fd6817ea96..d8bd6ed448d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -186,6 +186,21 @@ public void usesPreparedTransaction() { .setCreateTime( Timestamp.newBuilder().setSeconds(System.currentTimeMillis() * 1000)) .build())); + when(rpc.createSession( + Mockito.anyString(), + Mockito.anyString(), + Mockito.anyMap(), + Mockito.eq(null), + Mockito.eq(true))) + .thenAnswer( + invocation -> + Session.newBuilder() + .setName(invocation.getArguments()[0] + "/sessions/1") + .setMultiplexed(true) + .setCreateTime( + com.google.protobuf.Timestamp.newBuilder() + .setSeconds(System.currentTimeMillis() * 1000)) + .build()); when(rpc.beginTransactionAsync( Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/RunTransactionMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/RunTransactionMockServerTest.java new file mode 100644 index 00000000000..91662ef8668 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/RunTransactionMockServerTest.java @@ -0,0 +1,226 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.connection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.spanner.ErrorCode; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.SpannerException; +import com.google.spanner.v1.CommitRequest; +import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.RollbackRequest; +import io.grpc.Status; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class RunTransactionMockServerTest extends AbstractMockServerTest { + + @Test + public void testRunTransaction() { + try (Connection connection = createConnection()) { + connection.runTransaction( + transaction -> { + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + return null; + }); + } + assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + + @Test + public void testRunTransactionInAutoCommit() { + try (Connection connection = createConnection()) { + connection.setAutocommit(true); + + connection.runTransaction( + transaction -> { + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + return null; + }); + } + assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + + @Test + public void testRunTransactionInReadOnly() { + try (Connection connection = createConnection()) { + connection.setReadOnly(true); + connection.setAutocommit(false); + + assertEquals( + RANDOM_RESULT_SET_ROW_COUNT, + connection + .runTransaction( + transaction -> { + int rows = 0; + try (ResultSet resultSet = transaction.executeQuery(SELECT_RANDOM_STATEMENT)) { + while (resultSet.next()) { + rows++; + } + } + return rows; + }) + .intValue()); + } + assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(0, mockSpanner.countRequestsOfType(CommitRequest.class)); + assertEquals(0, mockSpanner.countRequestsOfType(RollbackRequest.class)); + } + + @Test + public void testRunTransaction_rollbacksAfterException() { + try (Connection connection = createConnection()) { + SpannerException exception = + assertThrows( + SpannerException.class, + () -> + connection.runTransaction( + transaction -> { + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + mockSpanner.setExecuteSqlExecutionTime( + SimulatedExecutionTime.ofException( + Status.INVALID_ARGUMENT + .withDescription("invalid statement") + .asRuntimeException())); + // This statement will fail. + transaction.executeUpdate(INSERT_STATEMENT); + return null; + })); + assertEquals(ErrorCode.INVALID_ARGUMENT, exception.getErrorCode()); + assertTrue(exception.getMessage(), exception.getMessage().endsWith("invalid statement")); + } + assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(0, mockSpanner.countRequestsOfType(CommitRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(RollbackRequest.class)); + } + + @Test + public void testRunTransactionCommitAborted() { + final AtomicInteger attempts = new AtomicInteger(); + try (Connection connection = createConnection()) { + connection.runTransaction( + transaction -> { + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + if (attempts.incrementAndGet() == 1) { + mockSpanner.abortNextStatement(); + } + return null; + }); + } + assertEquals(2, attempts.get()); + assertEquals(4, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(2, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + + @Test + public void testRunTransactionDmlAborted() { + final AtomicInteger attempts = new AtomicInteger(); + try (Connection connection = createConnection()) { + assertTrue(connection.isRetryAbortsInternally()); + connection.runTransaction( + transaction -> { + assertFalse(transaction.isRetryAbortsInternally()); + if (attempts.incrementAndGet() == 1) { + mockSpanner.abortNextStatement(); + } + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + return null; + }); + assertTrue(connection.isRetryAbortsInternally()); + } + assertEquals(2, attempts.get()); + assertEquals(3, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + + @Test + public void testRunTransactionQueryAborted() { + final AtomicInteger attempts = new AtomicInteger(); + try (Connection connection = createConnection()) { + int rowCount = + connection.runTransaction( + transaction -> { + if (attempts.incrementAndGet() == 1) { + mockSpanner.abortNextStatement(); + } + int rows = 0; + try (ResultSet resultSet = transaction.executeQuery(SELECT_RANDOM_STATEMENT)) { + while (resultSet.next()) { + rows++; + } + } + return rows; + }); + assertEquals(RANDOM_RESULT_SET_ROW_COUNT, rowCount); + } + assertEquals(2, attempts.get()); + assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + + @Test + public void testCommitInRunTransaction() { + try (Connection connection = createConnection()) { + connection.runTransaction( + transaction -> { + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + SpannerException exception = assertThrows(SpannerException.class, transaction::commit); + assertEquals(ErrorCode.FAILED_PRECONDITION, exception.getErrorCode()); + assertEquals( + "FAILED_PRECONDITION: Cannot call commit when a transaction runner is active", + exception.getMessage()); + return null; + }); + } + assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + + @Test + public void testRollbackInRunTransaction() { + try (Connection connection = createConnection()) { + connection.runTransaction( + transaction -> { + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + SpannerException exception = + assertThrows(SpannerException.class, transaction::rollback); + assertEquals(ErrorCode.FAILED_PRECONDITION, exception.getErrorCode()); + assertEquals( + "FAILED_PRECONDITION: Cannot call rollback when a transaction runner is active", + exception.getMessage()); + return null; + }); + } + assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + assertEquals(0, mockSpanner.countRequestsOfType(RollbackRequest.class)); + } +} diff --git a/grpc-google-cloud-spanner-admin-database-v1/pom.xml b/grpc-google-cloud-spanner-admin-database-v1/pom.xml index 2ec9ebce7b9..79a7ca08bee 100644 --- a/grpc-google-cloud-spanner-admin-database-v1/pom.xml +++ b/grpc-google-cloud-spanner-admin-database-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-spanner-admin-database-v1 - 6.83.1-SNAPSHOT + 6.84.0 grpc-google-cloud-spanner-admin-database-v1 GRPC library for grpc-google-cloud-spanner-admin-database-v1 com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/grpc-google-cloud-spanner-admin-instance-v1/pom.xml b/grpc-google-cloud-spanner-admin-instance-v1/pom.xml index 78a64b26cb1..435cf44ff07 100644 --- a/grpc-google-cloud-spanner-admin-instance-v1/pom.xml +++ b/grpc-google-cloud-spanner-admin-instance-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-spanner-admin-instance-v1 - 6.83.1-SNAPSHOT + 6.84.0 grpc-google-cloud-spanner-admin-instance-v1 GRPC library for grpc-google-cloud-spanner-admin-instance-v1 com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/grpc-google-cloud-spanner-executor-v1/pom.xml b/grpc-google-cloud-spanner-executor-v1/pom.xml index de84666cd41..c0bc1b9b837 100644 --- a/grpc-google-cloud-spanner-executor-v1/pom.xml +++ b/grpc-google-cloud-spanner-executor-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-spanner-executor-v1 - 6.83.1-SNAPSHOT + 6.84.0 grpc-google-cloud-spanner-executor-v1 GRPC library for google-cloud-spanner com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/grpc-google-cloud-spanner-v1/pom.xml b/grpc-google-cloud-spanner-v1/pom.xml index 294952b7ecb..722150cf6c7 100644 --- a/grpc-google-cloud-spanner-v1/pom.xml +++ b/grpc-google-cloud-spanner-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-spanner-v1 - 6.83.1-SNAPSHOT + 6.84.0 grpc-google-cloud-spanner-v1 GRPC library for grpc-google-cloud-spanner-v1 com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/pom.xml b/pom.xml index 46157f0f1ed..9945c18137e 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.google.cloud google-cloud-spanner-parent pom - 6.83.1-SNAPSHOT + 6.84.0 Google Cloud Spanner Parent https://github.com/googleapis/java-spanner @@ -61,47 +61,47 @@ com.google.api.grpc proto-google-cloud-spanner-admin-instance-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc proto-google-cloud-spanner-executor-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-executor-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc proto-google-cloud-spanner-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc proto-google-cloud-spanner-admin-database-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-admin-instance-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-admin-database-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.cloud google-cloud-spanner - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/proto-google-cloud-spanner-admin-database-v1/pom.xml b/proto-google-cloud-spanner-admin-database-v1/pom.xml index af791549cdb..b23496331a3 100644 --- a/proto-google-cloud-spanner-admin-database-v1/pom.xml +++ b/proto-google-cloud-spanner-admin-database-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-spanner-admin-database-v1 - 6.83.1-SNAPSHOT + 6.84.0 proto-google-cloud-spanner-admin-database-v1 PROTO library for proto-google-cloud-spanner-admin-database-v1 com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/proto-google-cloud-spanner-admin-instance-v1/pom.xml b/proto-google-cloud-spanner-admin-instance-v1/pom.xml index 96dfaf95ad2..76ccb0cf4b1 100644 --- a/proto-google-cloud-spanner-admin-instance-v1/pom.xml +++ b/proto-google-cloud-spanner-admin-instance-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-spanner-admin-instance-v1 - 6.83.1-SNAPSHOT + 6.84.0 proto-google-cloud-spanner-admin-instance-v1 PROTO library for proto-google-cloud-spanner-admin-instance-v1 com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/proto-google-cloud-spanner-executor-v1/pom.xml b/proto-google-cloud-spanner-executor-v1/pom.xml index c37eb525cc6..301f105837f 100644 --- a/proto-google-cloud-spanner-executor-v1/pom.xml +++ b/proto-google-cloud-spanner-executor-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-spanner-executor-v1 - 6.83.1-SNAPSHOT + 6.84.0 proto-google-cloud-spanner-executor-v1 Proto library for google-cloud-spanner com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/proto-google-cloud-spanner-v1/pom.xml b/proto-google-cloud-spanner-v1/pom.xml index 16495a9b5c4..f42ee2b3e90 100644 --- a/proto-google-cloud-spanner-v1/pom.xml +++ b/proto-google-cloud-spanner-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-spanner-v1 - 6.83.1-SNAPSHOT + 6.84.0 proto-google-cloud-spanner-v1 PROTO library for proto-google-cloud-spanner-v1 com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index 772e830d5ef..f44821f2838 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -32,7 +32,7 @@ com.google.cloud google-cloud-spanner - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/versions.txt b/versions.txt index 371f87c50c2..52488f51531 100644 --- a/versions.txt +++ b/versions.txt @@ -1,13 +1,13 @@ # Format: # module:released-version:current-version -proto-google-cloud-spanner-admin-instance-v1:6.83.0:6.83.1-SNAPSHOT -proto-google-cloud-spanner-v1:6.83.0:6.83.1-SNAPSHOT -proto-google-cloud-spanner-admin-database-v1:6.83.0:6.83.1-SNAPSHOT -grpc-google-cloud-spanner-v1:6.83.0:6.83.1-SNAPSHOT -grpc-google-cloud-spanner-admin-instance-v1:6.83.0:6.83.1-SNAPSHOT -grpc-google-cloud-spanner-admin-database-v1:6.83.0:6.83.1-SNAPSHOT -google-cloud-spanner:6.83.0:6.83.1-SNAPSHOT -google-cloud-spanner-executor:6.83.0:6.83.1-SNAPSHOT -proto-google-cloud-spanner-executor-v1:6.83.0:6.83.1-SNAPSHOT -grpc-google-cloud-spanner-executor-v1:6.83.0:6.83.1-SNAPSHOT +proto-google-cloud-spanner-admin-instance-v1:6.84.0:6.84.0 +proto-google-cloud-spanner-v1:6.84.0:6.84.0 +proto-google-cloud-spanner-admin-database-v1:6.84.0:6.84.0 +grpc-google-cloud-spanner-v1:6.84.0:6.84.0 +grpc-google-cloud-spanner-admin-instance-v1:6.84.0:6.84.0 +grpc-google-cloud-spanner-admin-database-v1:6.84.0:6.84.0 +google-cloud-spanner:6.84.0:6.84.0 +google-cloud-spanner-executor:6.84.0:6.84.0 +proto-google-cloud-spanner-executor-v1:6.84.0:6.84.0 +grpc-google-cloud-spanner-executor-v1:6.84.0:6.84.0