Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for service client connection reuse. #1589

Merged
merged 13 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions codegen/src/main/twirl/templates/JavaClient/Client.scala.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import akka.stream.Materializer;
import akka.stream.SystemMaterializer;

import akka.grpc.internal.*;
import akka.grpc.GrpcChannel;
import akka.grpc.GrpcClientCloseException;
import akka.grpc.GrpcClientSettings;
import akka.grpc.javadsl.AkkaGrpcClient;

Expand All @@ -28,26 +30,29 @@ import akka.grpc.AkkaGrpcGenerated;
@@AkkaGrpcGenerated
public abstract class @{service.name}Client extends @{service.name}ClientPowerApi implements @{service.name}, AkkaGrpcClient {
public static final @{service.name}Client create(GrpcClientSettings settings, ClassicActorSystemProvider sys) {
return new Default@{service.name}Client(settings, sys);
return new Default@{service.name}Client(akka.grpc.GrpcChannel$.MODULE$.apply(settings, sys), true, sys);
}

public static final @{service.name}Client create(GrpcChannel channel, ClassicActorSystemProvider sys) {
return new Default@{service.name}Client(channel, false, sys);
}

@@AkkaGrpcGenerated
protected final static class Default@{service.name}Client extends @{service.name}Client {

private final ClientState clientState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can remove the ClientState type entirely: to achieve the binary compatibly promise we do in https://doc.akka.io/docs/akka-grpc/current/binary-compatibility.html , code generated with previous Akka gRPC versions should still work when a newer version of the runtime library is on the classpath. So I think we need to keep at least a deprecated 'wrapper' ClientState around, that will make sure 'older' generated code keeps working.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the feedback!

As you suggested, I recreated ClientState as a wrapper around a GrpcChannel. I think it properly supports the constructor and methods that are used by older generated clients.

private final GrpcChannel channel;
private final boolean isChannelOwned;
private final GrpcClientSettings settings;
private final io.grpc.CallOptions options;
private final Materializer mat;
private final ExecutionContext ec;

private Default@{service.name}Client(GrpcClientSettings settings, ClassicActorSystemProvider sys) {
this.settings = settings;
private Default@{service.name}Client(GrpcChannel channel, boolean isChannelOwned, ClassicActorSystemProvider sys) {
this.channel = channel;
this.isChannelOwned = isChannelOwned;
this.settings = channel.settings();
this.mat = SystemMaterializer.get(sys).materializer();
this.ec = sys.classicSystem().dispatcher();
this.clientState = new ClientState(
settings,
akka.event.Logging$.MODULE$.apply(sys.classicSystem(), Default@{service.name}Client.class, akka.event.LogSource$.MODULE$.<Default@{service.name}Client>fromAnyClass()),
sys);
this.options = NettyClientUtils.callOptions(settings);

sys.classicSystem().getWhenTerminated().whenComplete((v, e) -> close());
Expand Down Expand Up @@ -101,7 +106,7 @@ public abstract class @{service.name}Client extends @{service.name}ClientPowerAp
public StreamResponseRequestBuilder<akka.stream.javadsl.Source<@method.inputTypeUnboxed, akka.NotUsed>, @method.outputTypeUnboxed> @{method.name}()
}
{
return @{method.name}RequestBuilder(clientState.internalChannel());
return @{method.name}RequestBuilder(channel.internalChannel());
}
}

Expand All @@ -120,15 +125,19 @@ public abstract class @{service.name}Client extends @{service.name}ClientPowerAp
* Initiates a shutdown in which preexisting and new calls are cancelled.
*/
public java.util.concurrent.CompletionStage<akka.Done> close() {
return clientState.closeCS() ;
if (isChannelOwned) {
return channel.closeCS();
} else {
throw new GrpcClientCloseException();
}
}

/**
* Returns a CompletionState that completes successfully when shutdown via close()
* or exceptionally if a connection can not be established after maxConnectionAttempts.
*/
public java.util.concurrent.CompletionStage<akka.Done> closed() {
return clientState.closedCS();
return channel.closedCS();
}
}

Expand Down
99 changes: 48 additions & 51 deletions codegen/src/main/twirl/templates/ScalaClient/Client.scala.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import scala.concurrent.ExecutionContext

import akka.actor.ClassicActorSystemProvider

import akka.grpc.GrpcChannel
import akka.grpc.GrpcClientCloseException
import akka.grpc.GrpcClientSettings

import akka.grpc.scaladsl.AkkaGrpcClient

import akka.grpc.internal.NettyClientUtils
import akka.grpc.internal.ClientState

import akka.grpc.AkkaGrpcGenerated

Expand All @@ -43,62 +44,58 @@ trait @{service.name}Client extends @{service.name} with @{service.name}ClientPo
@@AkkaGrpcGenerated
object @{service.name}Client {
def apply(settings: GrpcClientSettings)(implicit sys: ClassicActorSystemProvider): @{service.name}Client =
new Default@{service.name}Client(settings)
}

@@AkkaGrpcGenerated
final class Default@{service.name}Client(settings: GrpcClientSettings)(implicit sys: ClassicActorSystemProvider) extends @{service.name}Client {
import @{service.name}.MethodDescriptors._
new Default@{service.name}Client(GrpcChannel(settings), isChannelOwned = true)
def apply(channel: GrpcChannel)(implicit sys: ClassicActorSystemProvider): @{service.name}Client =
new Default@{service.name}Client(channel, isChannelOwned = false)

private class Default@{service.name}Client(channel: GrpcChannel, isChannelOwned: Boolean)(implicit sys: ClassicActorSystemProvider) extends @{service.name}Client {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making the default service client private to the service object feels like an improvement to me. The default client was already marked final to prevent subclassing, and user should be using the apply() method on the service client object rather than creating default clients directly. Making the class private to the service client just enforces these rules more strongly, and brings the generated scala client into alignment with the generated java client.

That said, I'm still calibrating my application of the Boy Scout Rule. Let me know if this goes too far.

import @{service.name}.MethodDescriptors._

private implicit val ex: ExecutionContext = sys.classicSystem.dispatcher
private val settings = channel.settings
private val options = NettyClientUtils.callOptions(settings)

@for(method <- service.methods) {
private def @{method.name}RequestBuilder(channel: akka.grpc.internal.InternalChannel) =
@if(method.methodType == akka.grpc.gen.Unary) {
new ScalaUnaryRequestBuilder(@{method.name}Descriptor, channel, options, settings)
} else {
@if(method.methodType == akka.grpc.gen.ServerStreaming) {
new ScalaServerStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings)
} else if(method.methodType == akka.grpc.gen.ClientStreaming) {
new ScalaClientStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings)
} else if (method.methodType == akka.grpc.gen.BidiStreaming) {
new ScalaBidirectionalStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings)
}
}
}

private implicit val ex: ExecutionContext = sys.classicSystem.dispatcher
private val options = NettyClientUtils.callOptions(settings)
private val clientState = new ClientState(settings, akka.event.Logging(sys.classicSystem, classOf[Default@{service.name}Client]))
@for(method <- service.methods) {
/**
* Lower level "lifted" version of the method, giving access to request metadata etc.
* prefer @{method.nameSafe}(@method.parameterType) if possible.
*/
@if(method.methodType == akka.grpc.gen.Unary || method.methodType == akka.grpc.gen.ClientStreaming) {
override def @{method.nameSafe}(): SingleResponseRequestBuilder[@method.parameterType, @method.outputTypeUnboxed] =
@{method.name}RequestBuilder(channel.internalChannel)
} else {
override def @{method.nameSafe}(): StreamResponseRequestBuilder[@method.parameterType, @method.outputTypeUnboxed] =
@{method.name}RequestBuilder(channel.internalChannel)
}

@for(method <- service.methods) {
private def @{method.name}RequestBuilder(channel: akka.grpc.internal.InternalChannel) =
@if(method.methodType == akka.grpc.gen.Unary) {
new ScalaUnaryRequestBuilder(@{method.name}Descriptor, channel, options, settings)
} else {
@if(method.methodType == akka.grpc.gen.ServerStreaming) {
new ScalaServerStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings)
} else if(method.methodType == akka.grpc.gen.ClientStreaming) {
new ScalaClientStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings)
} else if (method.methodType == akka.grpc.gen.BidiStreaming) {
new ScalaBidirectionalStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings)
/**
* For access to method metadata use the parameterless version of @{method.nameSafe}
*/
def @{method.nameSafe}(in: @method.parameterType): @method.returnType =
@{method.nameSafe}().invoke(in)
}
}
}

@for(method <- service.methods) {
/**
* Lower level "lifted" version of the method, giving access to request metadata etc.
* prefer @{method.nameSafe}(@method.parameterType) if possible.
*/
@if(method.methodType == akka.grpc.gen.Unary || method.methodType == akka.grpc.gen.ClientStreaming) {
override def @{method.nameSafe}(): SingleResponseRequestBuilder[@method.parameterType, @method.outputTypeUnboxed] =
@{method.name}RequestBuilder(clientState.internalChannel)
} else {
override def @{method.nameSafe}(): StreamResponseRequestBuilder[@method.parameterType, @method.outputTypeUnboxed] =
@{method.name}RequestBuilder(clientState.internalChannel)
}
override def close(): scala.concurrent.Future[akka.Done] =
if (isChannelOwned) channel.close()
else throw new GrpcClientCloseException()

/**
* For access to method metadata use the parameterless version of @{method.nameSafe}
*/
def @{method.nameSafe}(in: @method.parameterType): @method.returnType =
@{method.nameSafe}().invoke(in)
override def closed: scala.concurrent.Future[akka.Done] = channel.closed()
}

override def close(): scala.concurrent.Future[akka.Done] = clientState.close()
override def closed: scala.concurrent.Future[akka.Done] = clientState.closed()

}

@@AkkaGrpcGenerated
object Default@{service.name}Client {

def apply(settings: GrpcClientSettings)(implicit sys: ClassicActorSystemProvider): @{service.name}Client =
new Default@{service.name}Client(settings)
}

@@AkkaGrpcGenerated
Expand Down
17 changes: 16 additions & 1 deletion docs/src/main/paradox/client/details.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,22 @@ reconnect is infinite and configurable via `GrpcClientSettings`'s `connectionAtt

The client offers a method `closed()` that returns a @scala[`Future`]@java[`CompletionStage`]
that will complete once the client is explicitly closed after invoking `close()`. The returned @scala[`Future`]@java[`CompletionStage`]
will complete with a failure when the maximum number of `connectionAttempts` (which causes a shutdown).
will complete with a failure when the maximum number of `connectionAttempts` (which causes a shutdown).

## Shared Channels

By default, each instance of a generated client creates a separate HTTP connection to the server. If the server
supports multiple services, you may want to allow multiple generated clients to share a single connection.

To do this, create a @apidoc[GrpcChannel] by passing @apidoc[GrpcClientSettings] to the apply method. You can then
use the GrpcChannel instance to create multiple generated clients; each client will use the provided channel to
communicate with the server.

When using a shared channel, the client lifecycle changes slightly. Like the generated client, `GrpcChannel` offers
`close` and `closed` methods; these can be used to explicitly close the connection to the server and detect when the
connection has been closed or shutdown due to errors, respectively. When you are done communicating with the server,
you should call `close` on the channel, rather than the individual clients. Calling `close` on a generated client
that was created with a shared channel will throw a @apidoc[GrpcClientCloseException].

## Load balancing

Expand Down
1 change: 1 addition & 0 deletions interop-tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ akka.http {
server {
default-host-header = "localhost.com"
preview.enable-http2 = on
remote-address-attribute = on
}
}
100 changes: 100 additions & 0 deletions interop-tests/src/test/scala/akka/grpc/scaladsl/GrpcChannelSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (C) 2020-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.grpc.scaladsl

import scala.concurrent.Await
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.grpc.scaladsl.tools.MutableServiceDiscovery
import akka.grpc.{ GrpcChannel, GrpcClientCloseException, GrpcClientSettings }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.RemoteAddress
import akka.http.scaladsl.server.Directives
import akka.stream.SystemMaterializer
import com.typesafe.config.{ Config, ConfigFactory }
import example.myapp.helloworld.grpc.helloworld._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Span
import org.scalatest.wordspec.AnyWordSpec

class GrpcClientSpecNetty extends GrpcChannelSpec()

class GrpcChannelSpec(config: Config = ConfigFactory.load())
extends AnyWordSpec
with Matchers
with BeforeAndAfterAll
with ScalaFutures {
implicit val system = ActorSystem("GrpcChannelSpec", config)
implicit val mat = SystemMaterializer(system).materializer
implicit val ec = system.dispatcher

override implicit val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, Span(10, org.scalatest.time.Millis))

private val clientAddresses = new java.util.concurrent.ConcurrentHashMap[RemoteAddress.IP, Unit]
private val service = new CountingGreeterServiceImpl()
private val handler = GreeterServiceHandler(service)
private val route = Directives.extractClientIP { clientIp =>
clientAddresses.put(clientIp.toIP.get, ())
Directives.handle(handler)
}

private val server = Http().newServerAt("127.0.0.1", 0).bind(route).futureValue

private val discovery = MutableServiceDiscovery(List(server))
private val settings = GrpcClientSettings.usingServiceDiscovery("greeter", discovery).withTls(false)

"GrpcChannel" should {
"create separate connections for separate channels" in {
clientAddresses.clear()

val greeterClient1 = GreeterServiceClient(settings)
greeterClient1.sayHello(HelloRequest(s"Hello 1")).futureValue

val greeterClient2 = GreeterServiceClient(settings)
greeterClient2.sayHello(HelloRequest(s"Hello 2")).futureValue

clientAddresses.size should be(2)
}

"reuse a single connection for a shared channel" in {
clientAddresses.clear()

val channel = GrpcChannel(settings)

val greeterClient1 = GreeterServiceClient(channel)
greeterClient1.sayHello(HelloRequest(s"Hello 0")).futureValue

val greeterClient2 = GreeterServiceClient(channel)
greeterClient2.sayHello(HelloRequest(s"Hello 1")).futureValue

clientAddresses.size should be(1)
}
}

"GrpcClient" should {
"allow close on owned connection" in {
val greeterClient = GreeterServiceClient(settings)
greeterClient.sayHello(HelloRequest("Hello")).futureValue
greeterClient.close().futureValue
}

"throw an exception when closing a shared connection" in {
val channel = GrpcChannel(settings)
val greeterClient = GreeterServiceClient(channel)
greeterClient.sayHello(HelloRequest("Hello")).futureValue
assertThrows[GrpcClientCloseException] {
greeterClient.close().futureValue
}
channel.close().futureValue
}
}

override def afterAll(): Unit = {
Await.result(system.terminate(), 10.seconds)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# InternalApi: Removed ChannelUtils.closeCS method, unused since 31bb602.
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.ChannelUtils.closeCS")

# InternalApi: Added private modifier to ClientState primary constructor only called by auxiliary constructor.
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.ClientState.this")

# internal: Removed createPool method only called by a ClientState auxiliary constructor.
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.ClientState.createPool")

# internal: Removed ClientState object only used by ClientState auxiliary constructor.
ProblemFilters.exclude[MissingClassProblem]("akka.grpc.internal.ClientState$")

# Generated: Made default service client a private inner class of servie client object.
ProblemFilters.exclude[MissingClassProblem]("grpc.reflection.v1alpha.reflection.DefaultServerReflectionClient")
ProblemFilters.exclude[MissingClassProblem]("grpc.reflection.v1alpha.reflection.DefaultServerReflectionClient$")
Loading