From 677d24f9b78dc0c1bd8134eb927ca4368cf09755 Mon Sep 17 00:00:00 2001 From: Jason Longshore Date: Wed, 27 Sep 2023 13:41:52 -0500 Subject: [PATCH 1/6] Permit setting deadline per-call Adds a setDeadline method to the request builder, so that it can optionally be specified per call. --- .../grpc/internal/RequestBuilderImpl.scala | 21 +++++++++++++++++-- .../akka/grpc/scaladsl/RequestBuilder.scala | 7 +++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala b/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala index 02418fea5..ea4f36b63 100644 --- a/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala +++ b/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala @@ -4,11 +4,11 @@ package akka.grpc.internal -import java.util.concurrent.CompletionStage - +import java.util.concurrent.{ CompletionStage, TimeUnit } import akka.NotUsed import akka.annotation.{ InternalApi, InternalStableApi } import akka.dispatch.ExecutionContexts +import akka.grpc.scaladsl.SingleResponseRequestBuilder import akka.grpc.{ GrpcClientSettings, GrpcResponseMetadata, GrpcServiceException, GrpcSingleResponse } import akka.stream.{ Graph, Materializer, SourceShape } import akka.stream.javadsl.{ Source => JavaSource } @@ -17,6 +17,7 @@ import akka.util.ByteString import io.grpc._ import scala.compat.java8.FutureConverters._ +import scala.concurrent.duration.Duration import scala.concurrent.{ ExecutionContext, Future } /** @@ -52,6 +53,14 @@ final class ScalaUnaryRequestBuilder[I, O]( override def withHeaders(headers: MetadataImpl): ScalaUnaryRequestBuilder[I, O] = new ScalaUnaryRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) + + override def setDeadline(deadline: Duration): SingleResponseRequestBuilder[I, O] = + new ScalaUnaryRequestBuilder[I, O]( + descriptor, + channel, + defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + settings, + headers) } /** @@ -152,6 +161,14 @@ final class ScalaClientStreamingRequestBuilder[I, O]( override def withHeaders(headers: MetadataImpl): ScalaClientStreamingRequestBuilder[I, O] = new ScalaClientStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) + + override def setDeadline(deadline: Duration): ScalaClientStreamingRequestBuilder[I, O] = + new ScalaClientStreamingRequestBuilder[I, O]( + descriptor, + channel, + defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + settings, + headers) } /** diff --git a/runtime/src/main/scala/akka/grpc/scaladsl/RequestBuilder.scala b/runtime/src/main/scala/akka/grpc/scaladsl/RequestBuilder.scala index 22debc4dc..b060b86ca 100644 --- a/runtime/src/main/scala/akka/grpc/scaladsl/RequestBuilder.scala +++ b/runtime/src/main/scala/akka/grpc/scaladsl/RequestBuilder.scala @@ -11,6 +11,7 @@ import akka.stream.scaladsl.Source import akka.util.ByteString import scala.concurrent.Future +import scala.concurrent.duration.Duration /** * Request builder for requests providing per call specific metadata capabilities in @@ -48,6 +49,12 @@ trait SingleResponseRequestBuilder[Req, Res] { * Invoke the gRPC method with the additional metadata added and provide access to response metadata */ def invokeWithMetadata(request: Req): Future[GrpcSingleResponse[Res]] + + /** + * Set the deadline for this call + * @return A new request builder, that will use the supplied deadline when invoked + */ + def setDeadline(deadline: Duration): SingleResponseRequestBuilder[Req, Res] } /** From 8bb2f4ae8c7f8aaf1869940727cc0d14d41e7e00 Mon Sep 17 00:00:00 2001 From: Jason Longshore Date: Fri, 29 Sep 2023 23:23:41 -0500 Subject: [PATCH 2/6] streaming --- .../akka/grpc/internal/RequestBuilderImpl.scala | 16 ++++++++++++++++ .../akka/grpc/scaladsl/RequestBuilder.scala | 6 ++++++ 2 files changed, 22 insertions(+) diff --git a/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala b/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala index ea4f36b63..63de35eab 100644 --- a/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala +++ b/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala @@ -259,6 +259,14 @@ final class ScalaServerStreamingRequestBuilder[I, O]( override def withHeaders(headers: MetadataImpl): ScalaServerStreamingRequestBuilder[I, O] = new ScalaServerStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) + + override def setDeadline(deadline: Duration): ScalaServerStreamingRequestBuilder[I, O] = + new ScalaServerStreamingRequestBuilder[I, O]( + descriptor, + channel, + defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + settings, + headers) } /** @@ -350,6 +358,14 @@ final class ScalaBidirectionalStreamingRequestBuilder[I, O]( override def withHeaders(headers: MetadataImpl): ScalaBidirectionalStreamingRequestBuilder[I, O] = new ScalaBidirectionalStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) + + override def setDeadline(deadline: Duration): ScalaBidirectionalStreamingRequestBuilder[I, O] = + new ScalaBidirectionalStreamingRequestBuilder[I, O]( + descriptor, + channel, + defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + settings, + headers) } /** diff --git a/runtime/src/main/scala/akka/grpc/scaladsl/RequestBuilder.scala b/runtime/src/main/scala/akka/grpc/scaladsl/RequestBuilder.scala index b060b86ca..1f84fb25f 100644 --- a/runtime/src/main/scala/akka/grpc/scaladsl/RequestBuilder.scala +++ b/runtime/src/main/scala/akka/grpc/scaladsl/RequestBuilder.scala @@ -95,4 +95,10 @@ trait StreamResponseRequestBuilder[Req, Res] { * Invoke the gRPC method with the additional metadata added and provide access to response metadata */ def invokeWithMetadata(request: Req): Source[Res, Future[GrpcResponseMetadata]] + + /** + * Set the deadline for this call + * @return A new request builder, that will use the supplied deadline when invoked + */ + def setDeadline(deadline: Duration): StreamResponseRequestBuilder[Req, Res] } From c8fd5bf8be518875cd42834a3fa1b73bd23ffe3f Mon Sep 17 00:00:00 2001 From: Jason Longshore Date: Fri, 29 Sep 2023 23:31:12 -0500 Subject: [PATCH 3/6] java --- .../grpc/internal/RequestBuilderImpl.scala | 33 +++++++++++++++++++ .../akka/grpc/javadsl/RequestBuilder.scala | 13 ++++++++ 2 files changed, 46 insertions(+) diff --git a/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala b/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala index 63de35eab..622ad1d47 100644 --- a/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala +++ b/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala @@ -4,6 +4,7 @@ package akka.grpc.internal +import java.time.{ Duration => JDuration } import java.util.concurrent.{ CompletionStage, TimeUnit } import akka.NotUsed import akka.annotation.{ InternalApi, InternalStableApi } @@ -93,6 +94,14 @@ final class JavaUnaryRequestBuilder[I, O]( override def withHeaders(headers: MetadataImpl): JavaUnaryRequestBuilder[I, O] = new JavaUnaryRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) + + override def setDeadline(deadline: JDuration): JavaUnaryRequestBuilder[I, O] = + new JavaUnaryRequestBuilder[I, O]( + descriptor, + channel, + defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + settings, + headers) } /** @@ -212,6 +221,14 @@ final class JavaClientStreamingRequestBuilder[I, O]( override def withHeaders(headers: MetadataImpl): JavaClientStreamingRequestBuilder[I, O] = new JavaClientStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) + + override def setDeadline(deadline: JDuration): JavaClientStreamingRequestBuilder[I, O] = + new JavaClientStreamingRequestBuilder[I, O]( + descriptor, + channel, + defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + settings, + headers) } /** @@ -310,6 +327,14 @@ final class JavaServerStreamingRequestBuilder[I, O]( override def withHeaders(headers: MetadataImpl): JavaServerStreamingRequestBuilder[I, O] = new JavaServerStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) + + override def setDeadline(deadline: JDuration): JavaServerStreamingRequestBuilder[I, O] = + new JavaServerStreamingRequestBuilder[I, O]( + descriptor, + channel, + defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + settings, + headers) } /** @@ -410,6 +435,14 @@ final class JavaBidirectionalStreamingRequestBuilder[I, O]( override def withHeaders(headers: MetadataImpl): JavaBidirectionalStreamingRequestBuilder[I, O] = new JavaBidirectionalStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) + + override def setDeadline(deadline: JDuration): JavaBidirectionalStreamingRequestBuilder[I, O] = + new JavaBidirectionalStreamingRequestBuilder[I, O]( + descriptor, + channel, + defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + settings, + headers) } /** diff --git a/runtime/src/main/scala/akka/grpc/javadsl/RequestBuilder.scala b/runtime/src/main/scala/akka/grpc/javadsl/RequestBuilder.scala index dfcc9f7d6..061010182 100644 --- a/runtime/src/main/scala/akka/grpc/javadsl/RequestBuilder.scala +++ b/runtime/src/main/scala/akka/grpc/javadsl/RequestBuilder.scala @@ -4,6 +4,7 @@ package akka.grpc.javadsl +import java.time.Duration import java.util.concurrent.CompletionStage import akka.NotUsed @@ -48,6 +49,12 @@ trait SingleResponseRequestBuilder[Req, Res] { * Invoke the gRPC method with the additional metadata added and provide access to response metadata */ def invokeWithMetadata(request: Req): CompletionStage[GrpcSingleResponse[Res]] + + /** + * Set the deadline for this call + * @return A new request builder, that will use the supplied deadline when invoked + */ + def setDeadline(deadline: Duration): SingleResponseRequestBuilder[Req, Res] } /** @@ -86,4 +93,10 @@ trait StreamResponseRequestBuilder[Req, Res] { * Invoke the gRPC method with the additional metadata added and provide access to response metadata */ def invokeWithMetadata(request: Req): Source[Res, CompletionStage[GrpcResponseMetadata]] + + /** + * Set the deadline for this call + * @return A new request builder, that will use the supplied deadline when invoked + */ + def setDeadline(deadline: Duration): StreamResponseRequestBuilder[Req, Res] } From 34ab47a7b06daec00118a7b9f102821757865abf Mon Sep 17 00:00:00 2001 From: Jason Longshore Date: Thu, 12 Oct 2023 17:28:36 -0500 Subject: [PATCH 4/6] support clearing deadline --- .../grpc/internal/RequestBuilderImpl.scala | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala b/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala index 622ad1d47..e1fcf7b9a 100644 --- a/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala +++ b/runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala @@ -59,7 +59,8 @@ final class ScalaUnaryRequestBuilder[I, O]( new ScalaUnaryRequestBuilder[I, O]( descriptor, channel, - defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + if (!deadline.isFinite) defaultOptions.withDeadline(null) + else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), settings, headers) } @@ -99,7 +100,8 @@ final class JavaUnaryRequestBuilder[I, O]( new JavaUnaryRequestBuilder[I, O]( descriptor, channel, - defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + if (deadline == null) defaultOptions.withDeadline(null) + else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), settings, headers) } @@ -175,7 +177,8 @@ final class ScalaClientStreamingRequestBuilder[I, O]( new ScalaClientStreamingRequestBuilder[I, O]( descriptor, channel, - defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + if (!deadline.isFinite) defaultOptions.withDeadline(null) + else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), settings, headers) } @@ -226,7 +229,8 @@ final class JavaClientStreamingRequestBuilder[I, O]( new JavaClientStreamingRequestBuilder[I, O]( descriptor, channel, - defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + if (deadline == null) defaultOptions.withDeadline(null) + else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), settings, headers) } @@ -281,7 +285,8 @@ final class ScalaServerStreamingRequestBuilder[I, O]( new ScalaServerStreamingRequestBuilder[I, O]( descriptor, channel, - defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + if (!deadline.isFinite) defaultOptions.withDeadline(null) + else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), settings, headers) } @@ -332,7 +337,8 @@ final class JavaServerStreamingRequestBuilder[I, O]( new JavaServerStreamingRequestBuilder[I, O]( descriptor, channel, - defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + if (deadline == null) defaultOptions.withDeadline(null) + else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), settings, headers) } @@ -388,7 +394,8 @@ final class ScalaBidirectionalStreamingRequestBuilder[I, O]( new ScalaBidirectionalStreamingRequestBuilder[I, O]( descriptor, channel, - defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + if (!deadline.isFinite) defaultOptions.withDeadline(null) + else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), settings, headers) } @@ -440,7 +447,8 @@ final class JavaBidirectionalStreamingRequestBuilder[I, O]( new JavaBidirectionalStreamingRequestBuilder[I, O]( descriptor, channel, - defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), + if (deadline == null) defaultOptions.withDeadline(null) + else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS), settings, headers) } From 9ec05cb73ea818a520329f8dc54407e0ccad8b83 Mon Sep 17 00:00:00 2001 From: Jason Longshore Date: Thu, 12 Oct 2023 17:31:08 -0500 Subject: [PATCH 5/6] mima --- .../1838-deadline-per-call.excludes | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 runtime/src/main/mima-filters/2.3.x.backwards.excludes/1838-deadline-per-call.excludes diff --git a/runtime/src/main/mima-filters/2.3.x.backwards.excludes/1838-deadline-per-call.excludes b/runtime/src/main/mima-filters/2.3.x.backwards.excludes/1838-deadline-per-call.excludes new file mode 100644 index 000000000..445d65b8a --- /dev/null +++ b/runtime/src/main/mima-filters/2.3.x.backwards.excludes/1838-deadline-per-call.excludes @@ -0,0 +1,6 @@ +# New APIs for setting deadline per call + +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.javadsl.SingleResponseRequestBuilder.setDeadline") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.javadsl.StreamResponseRequestBuilder.setDeadline") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.scaladsl.SingleResponseRequestBuilder.setDeadline") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.scaladsl.StreamResponseRequestBuilder.setDeadline") From e1887a1bb633dfc9f2e913317448eaa768720049 Mon Sep 17 00:00:00 2001 From: Jason Longshore Date: Fri, 13 Oct 2023 13:23:57 -0500 Subject: [PATCH 6/6] move mima filter to 2.4.0-M1 --- .../1838-deadline-per-call.excludes | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename runtime/src/main/mima-filters/{2.3.x.backwards.excludes => 2.4.0-M1.backwards.excludes}/1838-deadline-per-call.excludes (100%) diff --git a/runtime/src/main/mima-filters/2.3.x.backwards.excludes/1838-deadline-per-call.excludes b/runtime/src/main/mima-filters/2.4.0-M1.backwards.excludes/1838-deadline-per-call.excludes similarity index 100% rename from runtime/src/main/mima-filters/2.3.x.backwards.excludes/1838-deadline-per-call.excludes rename to runtime/src/main/mima-filters/2.4.0-M1.backwards.excludes/1838-deadline-per-call.excludes