diff --git a/codegen/src/main/twirl/templates/JavaClient/Client.scala.txt b/codegen/src/main/twirl/templates/JavaClient/Client.scala.txt index 5fac03367..17b62af9c 100644 --- a/codegen/src/main/twirl/templates/JavaClient/Client.scala.txt +++ b/codegen/src/main/twirl/templates/JavaClient/Client.scala.txt @@ -12,6 +12,8 @@ import akka.stream.Materializer; import akka.stream.SystemMaterializer; import akka.grpc.internal.*; +import akka.grpc.GrpcChannel; +import akka.grpc.GrpcClientCloseException; import akka.grpc.GrpcClientSettings; import akka.grpc.javadsl.AkkaGrpcClient; @@ -28,26 +30,29 @@ import akka.grpc.AkkaGrpcGenerated; @@AkkaGrpcGenerated public abstract class @{service.name}Client extends @{service.name}ClientPowerApi implements @{service.name}, AkkaGrpcClient { public static final @{service.name}Client create(GrpcClientSettings settings, ClassicActorSystemProvider sys) { - return new Default@{service.name}Client(settings, sys); + return new Default@{service.name}Client(akka.grpc.GrpcChannel$.MODULE$.apply(settings, sys), true, sys); + } + + public static final @{service.name}Client create(GrpcChannel channel, ClassicActorSystemProvider sys) { + return new Default@{service.name}Client(channel, false, sys); } @@AkkaGrpcGenerated protected final static class Default@{service.name}Client extends @{service.name}Client { - private final ClientState clientState; + private final GrpcChannel channel; + private final boolean isChannelOwned; private final GrpcClientSettings settings; private final io.grpc.CallOptions options; private final Materializer mat; private final ExecutionContext ec; - private Default@{service.name}Client(GrpcClientSettings settings, ClassicActorSystemProvider sys) { - this.settings = settings; + private Default@{service.name}Client(GrpcChannel channel, boolean isChannelOwned, ClassicActorSystemProvider sys) { + this.channel = channel; + this.isChannelOwned = isChannelOwned; + this.settings = channel.settings(); this.mat = SystemMaterializer.get(sys).materializer(); this.ec = sys.classicSystem().dispatcher(); - this.clientState = new ClientState( - settings, - akka.event.Logging$.MODULE$.apply(sys.classicSystem(), Default@{service.name}Client.class, akka.event.LogSource$.MODULE$.fromAnyClass()), - sys); this.options = NettyClientUtils.callOptions(settings); sys.classicSystem().getWhenTerminated().whenComplete((v, e) -> close()); @@ -101,7 +106,7 @@ public abstract class @{service.name}Client extends @{service.name}ClientPowerAp public StreamResponseRequestBuilder, @method.outputTypeUnboxed> @{method.name}() } { - return @{method.name}RequestBuilder(clientState.internalChannel()); + return @{method.name}RequestBuilder(channel.internalChannel()); } } @@ -120,7 +125,11 @@ public abstract class @{service.name}Client extends @{service.name}ClientPowerAp * Initiates a shutdown in which preexisting and new calls are cancelled. */ public java.util.concurrent.CompletionStage close() { - return clientState.closeCS() ; + if (isChannelOwned) { + return channel.closeCS(); + } else { + throw new GrpcClientCloseException(); + } } /** @@ -128,7 +137,7 @@ public abstract class @{service.name}Client extends @{service.name}ClientPowerAp * or exceptionally if a connection can not be established after maxConnectionAttempts. */ public java.util.concurrent.CompletionStage closed() { - return clientState.closedCS(); + return channel.closedCS(); } } diff --git a/codegen/src/main/twirl/templates/ScalaClient/Client.scala.txt b/codegen/src/main/twirl/templates/ScalaClient/Client.scala.txt index 5cd8c3023..20c1f3742 100644 --- a/codegen/src/main/twirl/templates/ScalaClient/Client.scala.txt +++ b/codegen/src/main/twirl/templates/ScalaClient/Client.scala.txt @@ -11,12 +11,13 @@ import scala.concurrent.ExecutionContext import akka.actor.ClassicActorSystemProvider +import akka.grpc.GrpcChannel +import akka.grpc.GrpcClientCloseException import akka.grpc.GrpcClientSettings import akka.grpc.scaladsl.AkkaGrpcClient import akka.grpc.internal.NettyClientUtils -import akka.grpc.internal.ClientState import akka.grpc.AkkaGrpcGenerated @@ -43,62 +44,58 @@ trait @{service.name}Client extends @{service.name} with @{service.name}ClientPo @@AkkaGrpcGenerated object @{service.name}Client { def apply(settings: GrpcClientSettings)(implicit sys: ClassicActorSystemProvider): @{service.name}Client = - new Default@{service.name}Client(settings) -} - -@@AkkaGrpcGenerated -final class Default@{service.name}Client(settings: GrpcClientSettings)(implicit sys: ClassicActorSystemProvider) extends @{service.name}Client { - import @{service.name}.MethodDescriptors._ + new Default@{service.name}Client(GrpcChannel(settings), isChannelOwned = true) + def apply(channel: GrpcChannel)(implicit sys: ClassicActorSystemProvider): @{service.name}Client = + new Default@{service.name}Client(channel, isChannelOwned = false) + + private class Default@{service.name}Client(channel: GrpcChannel, isChannelOwned: Boolean)(implicit sys: ClassicActorSystemProvider) extends @{service.name}Client { + import @{service.name}.MethodDescriptors._ + + private implicit val ex: ExecutionContext = sys.classicSystem.dispatcher + private val settings = channel.settings + private val options = NettyClientUtils.callOptions(settings) + + @for(method <- service.methods) { + private def @{method.name}RequestBuilder(channel: akka.grpc.internal.InternalChannel) = + @if(method.methodType == akka.grpc.gen.Unary) { + new ScalaUnaryRequestBuilder(@{method.name}Descriptor, channel, options, settings) + } else { + @if(method.methodType == akka.grpc.gen.ServerStreaming) { + new ScalaServerStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings) + } else if(method.methodType == akka.grpc.gen.ClientStreaming) { + new ScalaClientStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings) + } else if (method.methodType == akka.grpc.gen.BidiStreaming) { + new ScalaBidirectionalStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings) + } + } + } - private implicit val ex: ExecutionContext = sys.classicSystem.dispatcher - private val options = NettyClientUtils.callOptions(settings) - private val clientState = new ClientState(settings, akka.event.Logging(sys.classicSystem, classOf[Default@{service.name}Client])) + @for(method <- service.methods) { + /** + * Lower level "lifted" version of the method, giving access to request metadata etc. + * prefer @{method.nameSafe}(@method.parameterType) if possible. + */ + @if(method.methodType == akka.grpc.gen.Unary || method.methodType == akka.grpc.gen.ClientStreaming) { + override def @{method.nameSafe}(): SingleResponseRequestBuilder[@method.parameterType, @method.outputTypeUnboxed] = + @{method.name}RequestBuilder(channel.internalChannel) + } else { + override def @{method.nameSafe}(): StreamResponseRequestBuilder[@method.parameterType, @method.outputTypeUnboxed] = + @{method.name}RequestBuilder(channel.internalChannel) + } - @for(method <- service.methods) { - private def @{method.name}RequestBuilder(channel: akka.grpc.internal.InternalChannel) = - @if(method.methodType == akka.grpc.gen.Unary) { - new ScalaUnaryRequestBuilder(@{method.name}Descriptor, channel, options, settings) - } else { - @if(method.methodType == akka.grpc.gen.ServerStreaming) { - new ScalaServerStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings) - } else if(method.methodType == akka.grpc.gen.ClientStreaming) { - new ScalaClientStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings) - } else if (method.methodType == akka.grpc.gen.BidiStreaming) { - new ScalaBidirectionalStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings) + /** + * For access to method metadata use the parameterless version of @{method.nameSafe} + */ + def @{method.nameSafe}(in: @method.parameterType): @method.returnType = + @{method.nameSafe}().invoke(in) } - } - } - @for(method <- service.methods) { - /** - * Lower level "lifted" version of the method, giving access to request metadata etc. - * prefer @{method.nameSafe}(@method.parameterType) if possible. - */ - @if(method.methodType == akka.grpc.gen.Unary || method.methodType == akka.grpc.gen.ClientStreaming) { - override def @{method.nameSafe}(): SingleResponseRequestBuilder[@method.parameterType, @method.outputTypeUnboxed] = - @{method.name}RequestBuilder(clientState.internalChannel) - } else { - override def @{method.nameSafe}(): StreamResponseRequestBuilder[@method.parameterType, @method.outputTypeUnboxed] = - @{method.name}RequestBuilder(clientState.internalChannel) - } + override def close(): scala.concurrent.Future[akka.Done] = + if (isChannelOwned) channel.close() + else throw new GrpcClientCloseException() - /** - * For access to method metadata use the parameterless version of @{method.nameSafe} - */ - def @{method.nameSafe}(in: @method.parameterType): @method.returnType = - @{method.nameSafe}().invoke(in) + override def closed: scala.concurrent.Future[akka.Done] = channel.closed() } - - override def close(): scala.concurrent.Future[akka.Done] = clientState.close() - override def closed: scala.concurrent.Future[akka.Done] = clientState.closed() - -} - -@@AkkaGrpcGenerated -object Default@{service.name}Client { - - def apply(settings: GrpcClientSettings)(implicit sys: ClassicActorSystemProvider): @{service.name}Client = - new Default@{service.name}Client(settings) } @@AkkaGrpcGenerated diff --git a/docs/src/main/paradox/client/details.md b/docs/src/main/paradox/client/details.md index c471f6ae4..990ff381b 100644 --- a/docs/src/main/paradox/client/details.md +++ b/docs/src/main/paradox/client/details.md @@ -17,7 +17,22 @@ reconnect is infinite and configurable via `GrpcClientSettings`'s `connectionAtt The client offers a method `closed()` that returns a @scala[`Future`]@java[`CompletionStage`] that will complete once the client is explicitly closed after invoking `close()`. The returned @scala[`Future`]@java[`CompletionStage`] -will complete with a failure when the maximum number of `connectionAttempts` (which causes a shutdown). +will complete with a failure when the maximum number of `connectionAttempts` (which causes a shutdown). + +## Shared Channels + +By default, each instance of a generated client creates a separate HTTP connection to the server. If the server +supports multiple services, you may want to allow multiple generated clients to share a single connection. + +To do this, create a @apidoc[GrpcChannel] by passing @apidoc[GrpcClientSettings] to the apply method. You can then +use the GrpcChannel instance to create multiple generated clients; each client will use the provided channel to +communicate with the server. + +When using a shared channel, the client lifecycle changes slightly. Like the generated client, `GrpcChannel` offers +`close` and `closed` methods; these can be used to explicitly close the connection to the server and detect when the +connection has been closed or shutdown due to errors, respectively. When you are done communicating with the server, +you should call `close` on the channel, rather than the individual clients. Calling `close` on a generated client +that was created with a shared channel will throw a @apidoc[GrpcClientCloseException]. ## Load balancing diff --git a/interop-tests/src/test/resources/application.conf b/interop-tests/src/test/resources/application.conf index 39544bf89..1cf8e7fd3 100644 --- a/interop-tests/src/test/resources/application.conf +++ b/interop-tests/src/test/resources/application.conf @@ -5,5 +5,6 @@ akka.http { server { default-host-header = "localhost.com" preview.enable-http2 = on + remote-address-attribute = on } } \ No newline at end of file diff --git a/interop-tests/src/test/scala/akka/grpc/scaladsl/GrpcChannelSpec.scala b/interop-tests/src/test/scala/akka/grpc/scaladsl/GrpcChannelSpec.scala new file mode 100644 index 000000000..17f63cf08 --- /dev/null +++ b/interop-tests/src/test/scala/akka/grpc/scaladsl/GrpcChannelSpec.scala @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2020-2021 Lightbend Inc. + */ + +package akka.grpc.scaladsl + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.grpc.scaladsl.tools.MutableServiceDiscovery +import akka.grpc.{ GrpcChannel, GrpcClientCloseException, GrpcClientSettings } +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.RemoteAddress +import akka.http.scaladsl.server.Directives +import akka.stream.SystemMaterializer +import com.typesafe.config.{ Config, ConfigFactory } +import example.myapp.helloworld.grpc.helloworld._ +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.Span +import org.scalatest.wordspec.AnyWordSpec + +class GrpcClientSpecNetty extends GrpcChannelSpec() + +class GrpcChannelSpec(config: Config = ConfigFactory.load()) + extends AnyWordSpec + with Matchers + with BeforeAndAfterAll + with ScalaFutures { + implicit val system = ActorSystem("GrpcChannelSpec", config) + implicit val mat = SystemMaterializer(system).materializer + implicit val ec = system.dispatcher + + override implicit val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, Span(10, org.scalatest.time.Millis)) + + private val clientAddresses = new java.util.concurrent.ConcurrentHashMap[RemoteAddress.IP, Unit] + private val service = new CountingGreeterServiceImpl() + private val handler = GreeterServiceHandler(service) + private val route = Directives.extractClientIP { clientIp => + clientAddresses.put(clientIp.toIP.get, ()) + Directives.handle(handler) + } + + private val server = Http().newServerAt("127.0.0.1", 0).bind(route).futureValue + + private val discovery = MutableServiceDiscovery(List(server)) + private val settings = GrpcClientSettings.usingServiceDiscovery("greeter", discovery).withTls(false) + + "GrpcChannel" should { + "create separate connections for separate channels" in { + clientAddresses.clear() + + val greeterClient1 = GreeterServiceClient(settings) + greeterClient1.sayHello(HelloRequest(s"Hello 1")).futureValue + + val greeterClient2 = GreeterServiceClient(settings) + greeterClient2.sayHello(HelloRequest(s"Hello 2")).futureValue + + clientAddresses.size should be(2) + } + + "reuse a single connection for a shared channel" in { + clientAddresses.clear() + + val channel = GrpcChannel(settings) + + val greeterClient1 = GreeterServiceClient(channel) + greeterClient1.sayHello(HelloRequest(s"Hello 0")).futureValue + + val greeterClient2 = GreeterServiceClient(channel) + greeterClient2.sayHello(HelloRequest(s"Hello 1")).futureValue + + clientAddresses.size should be(1) + } + } + + "GrpcClient" should { + "allow close on owned connection" in { + val greeterClient = GreeterServiceClient(settings) + greeterClient.sayHello(HelloRequest("Hello")).futureValue + greeterClient.close().futureValue + } + + "throw an exception when closing a shared connection" in { + val channel = GrpcChannel(settings) + val greeterClient = GreeterServiceClient(channel) + greeterClient.sayHello(HelloRequest("Hello")).futureValue + assertThrows[GrpcClientCloseException] { + greeterClient.close().futureValue + } + channel.close().futureValue + } + } + + override def afterAll(): Unit = { + Await.result(system.terminate(), 10.seconds) + } +} diff --git a/runtime/src/main/mima-filters/2.1.4.backwards.excludes/1589-grpc-channel.excludes b/runtime/src/main/mima-filters/2.1.4.backwards.excludes/1589-grpc-channel.excludes new file mode 100644 index 000000000..6e111123f --- /dev/null +++ b/runtime/src/main/mima-filters/2.1.4.backwards.excludes/1589-grpc-channel.excludes @@ -0,0 +1,15 @@ +# InternalApi: Removed ChannelUtils.closeCS method, unused since 31bb602. +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.ChannelUtils.closeCS") + +# InternalApi: Added private modifier to ClientState primary constructor only called by auxiliary constructor. +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.ClientState.this") + +# internal: Removed createPool method only called by a ClientState auxiliary constructor. +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.ClientState.createPool") + +# internal: Removed ClientState object only used by ClientState auxiliary constructor. +ProblemFilters.exclude[MissingClassProblem]("akka.grpc.internal.ClientState$") + +# Generated: Made default service client a private inner class of servie client object. +ProblemFilters.exclude[MissingClassProblem]("grpc.reflection.v1alpha.reflection.DefaultServerReflectionClient") +ProblemFilters.exclude[MissingClassProblem]("grpc.reflection.v1alpha.reflection.DefaultServerReflectionClient$") diff --git a/runtime/src/main/scala/akka/grpc/GrpcChannel.scala b/runtime/src/main/scala/akka/grpc/GrpcChannel.scala new file mode 100644 index 000000000..b403ff80a --- /dev/null +++ b/runtime/src/main/scala/akka/grpc/GrpcChannel.scala @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2018-2021 Lightbend Inc. + */ + +package akka.grpc + +import java.util.concurrent.CompletionStage + +import scala.compat.java8.FutureConverters._ +import scala.concurrent.Future + +import akka.Done +import akka.actor.ClassicActorSystemProvider +import akka.annotation.InternalStableApi +import akka.grpc.internal.{ ChannelUtils, InternalChannel } +import akka.grpc.scaladsl.Grpc + +final class GrpcChannel private ( + @InternalStableApi val settings: GrpcClientSettings, + @InternalStableApi val internalChannel: InternalChannel)(implicit sys: ClassicActorSystemProvider) { + + Grpc(sys).registerChannel(this) + + /** + * Java API: Initiates a shutdown in which preexisting and new calls are cancelled. + */ + def closeCS(): CompletionStage[Done] = + close().toJava + + /** + * Java API: Returns a CompletionStage that completes successfully when channel is shut down via close(), + * or exceptionally if connection cannot be established or reestablished after maxConnectionAttempts. + */ + def closedCS(): CompletionStage[Done] = + closed().toJava + + /** + * Scala API: Initiates a shutdown in which preexisting and new calls are cancelled. + */ + def close(): Future[akka.Done] = { + Grpc(sys).deregisterChannel(this) + ChannelUtils.close(internalChannel) + } + + /** + * Scala API: Returns a Future that completes successfully when channel is shut down via close() + * or exceptionally if a connection cannot be established or reestablished after maxConnectionAttempts. + */ + def closed(): Future[akka.Done] = + internalChannel.done +} + +object GrpcChannel { + def apply(settings: GrpcClientSettings)(implicit sys: ClassicActorSystemProvider): GrpcChannel = { + new GrpcChannel( + settings, + ChannelUtils.create(settings, akka.event.Logging(sys.classicSystem, classOf[GrpcChannel]))) + } +} diff --git a/runtime/src/main/scala/akka/grpc/GrpcClientCloseException.scala b/runtime/src/main/scala/akka/grpc/GrpcClientCloseException.scala new file mode 100644 index 000000000..361550589 --- /dev/null +++ b/runtime/src/main/scala/akka/grpc/GrpcClientCloseException.scala @@ -0,0 +1,11 @@ +/* + * Copyright (C) 2018-2021 Lightbend Inc. + */ + +package akka.grpc + +/** + * Thrown if close() is called on a client that uses a shared channel. + */ +final class GrpcClientCloseException() + extends IllegalStateException("Client close() should not be called when using a shared channel") diff --git a/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala b/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala index f944a3896..237455788 100644 --- a/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala +++ b/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala @@ -4,14 +4,15 @@ package akka.grpc.internal -import java.util.concurrent.CompletionStage - import akka.Done + +import akka.actor.ClassicActorSystemProvider import akka.annotation.InternalApi import akka.event.LoggingAdapter +import akka.grpc.GrpcClientSettings + import io.grpc.{ ConnectivityState, ManagedChannel } -import scala.compat.java8.FutureConverters._ import scala.concurrent.{ Future, Promise } /** @@ -33,17 +34,25 @@ object ChannelUtils { * INTERNAL API */ @InternalApi - def close(internalChannel: InternalChannel): Future[Done] = { - internalChannel.shutdown() - internalChannel.done + private[akka] def create(settings: GrpcClientSettings, log: LoggingAdapter)( + implicit sys: ClassicActorSystemProvider): InternalChannel = { + settings.backend match { + case "netty" => + NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher) + case "akka-http" => + AkkaHttpClientUtils.createChannel(settings, log) + case _ => throw new IllegalArgumentException(s"Unexpected backend [${settings.backend}]") + } } /** * INTERNAL API */ @InternalApi - def closeCS(internalChannel: InternalChannel): CompletionStage[Done] = - close(internalChannel).toJava + def close(internalChannel: InternalChannel): Future[Done] = { + internalChannel.shutdown() + internalChannel.done + } /** * INTERNAL API diff --git a/runtime/src/main/scala/akka/grpc/internal/ClientState.scala b/runtime/src/main/scala/akka/grpc/internal/ClientState.scala index 5bc832240..5ee0bbc18 100644 --- a/runtime/src/main/scala/akka/grpc/internal/ClientState.scala +++ b/runtime/src/main/scala/akka/grpc/internal/ClientState.scala @@ -6,54 +6,37 @@ package akka.grpc.internal import java.util.concurrent.CompletionStage +import scala.concurrent.Future + import akka.Done import akka.actor.ClassicActorSystemProvider -import akka.annotation.InternalApi -import akka.annotation.InternalStableApi +import akka.annotation.{ InternalApi, InternalStableApi } import akka.event.LoggingAdapter -import akka.grpc.GrpcClientSettings -import akka.grpc.scaladsl.Grpc - -import scala.compat.java8.FutureConverters._ -import scala.concurrent.Future +import akka.grpc.{ GrpcChannel, GrpcClientSettings } /** * INTERNAL API * - * Client utilities taking care of Channel reconnection and Channel lifecycle in general. + * Deprecated: This class wraps a GrpcChannel for compatibility with clients generated by previous versions. */ +@deprecated("Kept for binary compatibility between generated code and runtime", "akka-grpc 2.1.5") @InternalApi -final class ClientState(@InternalStableApi val internalChannel: InternalChannel)( - implicit sys: ClassicActorSystemProvider) { +final class ClientState private (channel: GrpcChannel)(implicit sys: ClassicActorSystemProvider) { @InternalStableApi - def this(settings: GrpcClientSettings, log: LoggingAdapter)(implicit sys: ClassicActorSystemProvider) = - this(ClientState.createPool(settings, log)) + val internalChannel: InternalChannel = + channel.internalChannel - Grpc(sys).registerClient(this) - - def closedCS(): CompletionStage[Done] = closed().toJava - def closeCS(): CompletionStage[Done] = close().toJava + @InternalStableApi + def this(settings: GrpcClientSettings, log: LoggingAdapter)(implicit sys: ClassicActorSystemProvider) = + this(GrpcChannel(settings)) - def closed(): Future[Done] = internalChannel.done + def closedCS(): CompletionStage[Done] = channel.closedCS() + def closeCS(): CompletionStage[Done] = channel.closeCS() - def close(): Future[Done] = { - Grpc(sys).deregisterClient(this) - ChannelUtils.close(internalChannel) - } -} + def closed(): Future[Done] = channel.closed() -object ClientState { - def createPool(settings: GrpcClientSettings, log: LoggingAdapter)( - implicit sys: ClassicActorSystemProvider): InternalChannel = { - settings.backend match { - case "netty" => - NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher) - case "akka-http" => - AkkaHttpClientUtils.createChannel(settings, log) - case _ => throw new IllegalArgumentException(s"Unexpected backend [${settings.backend}]") - } - } + def close(): Future[Done] = channel.close() } /** diff --git a/runtime/src/main/scala/akka/grpc/javadsl/AkkaGrpcClient.scala b/runtime/src/main/scala/akka/grpc/javadsl/AkkaGrpcClient.scala index 502b5e7d5..528fcf7e1 100644 --- a/runtime/src/main/scala/akka/grpc/javadsl/AkkaGrpcClient.scala +++ b/runtime/src/main/scala/akka/grpc/javadsl/AkkaGrpcClient.scala @@ -12,6 +12,15 @@ import akka.annotation.DoNotInherit /** Common trait of all generated Akka gRPC clients. Not for user extension. */ @DoNotInherit trait AkkaGrpcClient { + + /** + * Initiates a shutdown in which preexisting and new calls are cancelled. + * + * This method is only valid for clients that use an internal channel. If the client was created + * with a shared, user-provided channel, the channel itself should be closed. + * + * @throws akka.grpc.GrpcClientCloseException if client was created with a user-provided [[akka.grpc.GrpcChannel]]. + */ def close(): CompletionStage[Done] def closed(): CompletionStage[Done] } diff --git a/runtime/src/main/scala/akka/grpc/scaladsl/AkkaGrpcClient.scala b/runtime/src/main/scala/akka/grpc/scaladsl/AkkaGrpcClient.scala index a3e8c1e27..a0b19e668 100644 --- a/runtime/src/main/scala/akka/grpc/scaladsl/AkkaGrpcClient.scala +++ b/runtime/src/main/scala/akka/grpc/scaladsl/AkkaGrpcClient.scala @@ -15,6 +15,11 @@ trait AkkaGrpcClient { /** * Initiates a shutdown in which preexisting and new calls are cancelled. + * + * This method is only valid for clients that use an internal channel. If the client was created + * with a shared user-provided channel, the channel itself should be closed. + * + * @throws akka.grpc.GrpcClientCloseException if client was created with a user-provided [[akka.grpc.GrpcChannel]]. */ def close(): Future[Done] diff --git a/runtime/src/main/scala/akka/grpc/scaladsl/Grpc.scala b/runtime/src/main/scala/akka/grpc/scaladsl/Grpc.scala index 7acee3446..9f84ece3d 100644 --- a/runtime/src/main/scala/akka/grpc/scaladsl/Grpc.scala +++ b/runtime/src/main/scala/akka/grpc/scaladsl/Grpc.scala @@ -10,28 +10,28 @@ import scala.jdk.CollectionConverters._ import akka.Done import akka.actor.{ CoordinatedShutdown, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import akka.annotation.InternalApi -import akka.grpc.internal.ClientState import java.util.concurrent.ConcurrentHashMap import akka.event.Logging +import akka.grpc.GrpcChannel /** INTERNAL API */ @InternalApi private[grpc] final class GrpcImpl(system: ExtendedActorSystem) extends Extension { - private val clients = new ConcurrentHashMap[ClientState, Unit] + private val channels = new ConcurrentHashMap[GrpcChannel, Unit] - CoordinatedShutdown(system).addTask("before-actor-system-terminate", "close-grpc-clients") { () => + CoordinatedShutdown(system).addTask("before-actor-system-terminate", "close-grpc-channels") { () => implicit val ec = system.dispatcher Future .sequence( - clients + channels .keySet() .asScala - .map(client => - client.close().recover { + .map(channel => + channel.close().recover { case e => val log = Logging(system, getClass) - log.warning("Failed to gracefully close {}, proceeding with shutdown anyway. {}", client, e) + log.warning("Failed to gracefully close {}, proceeding with shutdown anyway. {}", channel, e) Done })) .map(_ => Done) @@ -39,13 +39,14 @@ private[grpc] final class GrpcImpl(system: ExtendedActorSystem) extends Extensio /** INTERNAL API */ @InternalApi - def registerClient(client: ClientState): Unit = - clients.put(client, ()) + def registerChannel(channel: GrpcChannel): Unit = + channels.put(channel, ()) /** INTERNAL API */ @InternalApi - def deregisterClient(client: ClientState): Unit = - clients.remove(client) + def deregisterChannel(channel: GrpcChannel): Unit = + channels.remove(channel) + } /** INTERNAL API */ diff --git a/runtime/src/test/scala/akka/grpc/internal/ClientStateSpec.scala b/runtime/src/test/scala/akka/grpc/internal/ClientStateSpec.scala deleted file mode 100644 index 4eef3a709..000000000 --- a/runtime/src/test/scala/akka/grpc/internal/ClientStateSpec.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (C) 2018-2021 Lightbend Inc. - */ - -package akka.grpc.internal - -import scala.concurrent.duration._ -import scala.concurrent.{ Future, Promise } -import akka.{ Done, NotUsed } -import akka.actor.ActorSystem -import akka.grpc.{ GrpcResponseMetadata, GrpcSingleResponse } -import akka.stream.scaladsl.Source -import io.grpc.{ CallOptions, MethodDescriptor } -import org.scalatest.concurrent.{ Eventually, ScalaFutures } -import org.scalatest.BeforeAndAfterAll -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec - -class ClientStateSpec extends AnyWordSpec with Matchers with ScalaFutures with Eventually with BeforeAndAfterAll { - implicit val sys = ActorSystem() - implicit val ec = sys.dispatcher - implicit val patience = PatienceConfig(timeout = 10.seconds, interval = 150.milliseconds) - - private def clientState(channelCompletion: Promise[Done] = Promise[Done]()) = { - val channel = - new InternalChannel() { - override def invoke[I, O]( - request: I, - headers: MetadataImpl, - descriptor: MethodDescriptor[I, O], - options: CallOptions): Future[O] = ??? - override def invokeWithMetadata[I, O]( - request: I, - headers: MetadataImpl, - descriptor: MethodDescriptor[I, O], - options: CallOptions): Future[GrpcSingleResponse[O]] = ??? - override def invokeWithMetadata[I, O]( - source: Source[I, NotUsed], - headers: MetadataImpl, - descriptor: MethodDescriptor[I, O], - streamingResponse: Boolean, - options: CallOptions): Source[O, Future[GrpcResponseMetadata]] = ??? - override def shutdown(): Unit = channelCompletion.success(Done) - override def done: Future[Done] = channelCompletion.future - } - new ClientState(channel) - } - - "Client State" should { - "successfully provide a channel" in { - // given a state - val state = clientState() - // it provides a channel when needed - state.internalChannel should not be null - } - } - - override def afterAll(): Unit = { - super.afterAll() - sys.terminate() - } -}