Skip to content

Commit

Permalink
Add support for service client connection reuse.
Browse files Browse the repository at this point in the history
Refactored ClientState into GrpcChannel.
Modified generated service clients to use GrpcChannel instead of GrpcClientSettings, with an apply() shim to create a GrpcChannel from GrpcClientSettings.
Added GrpChannelSpec to confirm that connection is being reused.
  • Loading branch information
drmontgomery committed Mar 8, 2022
1 parent c34a630 commit 911bd42
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 160 deletions.
24 changes: 13 additions & 11 deletions codegen/src/main/twirl/templates/JavaClient/Client.scala.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import akka.stream.Materializer;
import akka.stream.SystemMaterializer;

import akka.grpc.internal.*;
import akka.grpc.GrpcChannel;
import akka.grpc.GrpcClientSettings;
import akka.grpc.javadsl.AkkaGrpcClient;

Expand All @@ -28,26 +29,27 @@ import akka.grpc.AkkaGrpcGenerated;
@@AkkaGrpcGenerated
public abstract class @{service.name}Client extends @{service.name}ClientPowerApi implements @{service.name}, AkkaGrpcClient {
public static final @{service.name}Client create(GrpcClientSettings settings, ClassicActorSystemProvider sys) {
return new Default@{service.name}Client(settings, sys);
return new Default@{service.name}Client(akka.grpc.GrpcChannel$.MODULE$.apply(settings, sys), sys);
}

public static final @{service.name}Client create(GrpcChannel channel, ClassicActorSystemProvider sys) {
return new Default@{service.name}Client(channel, sys);
}

@@AkkaGrpcGenerated
protected final static class Default@{service.name}Client extends @{service.name}Client {

private final ClientState clientState;
private final GrpcChannel channel;
private final GrpcClientSettings settings;
private final io.grpc.CallOptions options;
private final Materializer mat;
private final ExecutionContext ec;

private Default@{service.name}Client(GrpcClientSettings settings, ClassicActorSystemProvider sys) {
this.settings = settings;
private Default@{service.name}Client(GrpcChannel channel, ClassicActorSystemProvider sys) {
this.channel = channel;
this.settings = channel.settings();
this.mat = SystemMaterializer.get(sys).materializer();
this.ec = sys.classicSystem().dispatcher();
this.clientState = new ClientState(
settings,
akka.event.Logging$.MODULE$.apply(sys.classicSystem(), Default@{service.name}Client.class, akka.event.LogSource$.MODULE$.<Default@{service.name}Client>fromAnyClass()),
sys);
this.options = NettyClientUtils.callOptions(settings);

sys.classicSystem().getWhenTerminated().whenComplete((v, e) -> close());
Expand Down Expand Up @@ -101,7 +103,7 @@ public abstract class @{service.name}Client extends @{service.name}ClientPowerAp
public StreamResponseRequestBuilder<akka.stream.javadsl.Source<@method.inputTypeUnboxed, akka.NotUsed>, @method.outputTypeUnboxed> @{method.name}()
}
{
return @{method.name}RequestBuilder(clientState.internalChannel());
return @{method.name}RequestBuilder(channel.internalChannel());
}
}

Expand All @@ -120,15 +122,15 @@ public abstract class @{service.name}Client extends @{service.name}ClientPowerAp
* Initiates a shutdown in which preexisting and new calls are cancelled.
*/
public java.util.concurrent.CompletionStage<akka.Done> close() {
return clientState.closeCS() ;
return channel.closeCS() ;
}

/**
* Returns a CompletionState that completes successfully when shutdown via close()
* or exceptionally if a connection can not be established after maxConnectionAttempts.
*/
public java.util.concurrent.CompletionStage<akka.Done> closed() {
return clientState.closedCS();
return channel.closedCS();
}
}

Expand Down
22 changes: 13 additions & 9 deletions codegen/src/main/twirl/templates/ScalaClient/Client.scala.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import scala.concurrent.ExecutionContext

import akka.actor.ClassicActorSystemProvider

import akka.grpc.GrpcChannel
import akka.grpc.GrpcClientSettings

import akka.grpc.scaladsl.AkkaGrpcClient

import akka.grpc.internal.NettyClientUtils
import akka.grpc.internal.ClientState

import akka.grpc.AkkaGrpcGenerated

Expand All @@ -43,16 +43,18 @@ trait @{service.name}Client extends @{service.name} with @{service.name}ClientPo
@@AkkaGrpcGenerated
object @{service.name}Client {
def apply(settings: GrpcClientSettings)(implicit sys: ClassicActorSystemProvider): @{service.name}Client =
new Default@{service.name}Client(settings)
new Default@{service.name}Client(GrpcChannel(settings))
def apply(channel: GrpcChannel)(implicit sys: ClassicActorSystemProvider): @{service.name}Client =
new Default@{service.name}Client(channel)
}

@@AkkaGrpcGenerated
final class Default@{service.name}Client(settings: GrpcClientSettings)(implicit sys: ClassicActorSystemProvider) extends @{service.name}Client {
final class Default@{service.name}Client(channel: GrpcChannel)(implicit sys: ClassicActorSystemProvider) extends @{service.name}Client {
import @{service.name}.MethodDescriptors._

private implicit val ex: ExecutionContext = sys.classicSystem.dispatcher
private val settings = channel.settings
private val options = NettyClientUtils.callOptions(settings)
private val clientState = new ClientState(settings, akka.event.Logging(sys.classicSystem, classOf[Default@{service.name}Client]))

@for(method <- service.methods) {
private def @{method.name}RequestBuilder(channel: akka.grpc.internal.InternalChannel) =
Expand All @@ -76,10 +78,10 @@ final class Default@{service.name}Client(settings: GrpcClientSettings)(implicit
*/
@if(method.methodType == akka.grpc.gen.Unary || method.methodType == akka.grpc.gen.ClientStreaming) {
override def @{method.nameSafe}(): SingleResponseRequestBuilder[@method.parameterType, @method.outputTypeUnboxed] =
@{method.name}RequestBuilder(clientState.internalChannel)
@{method.name}RequestBuilder(channel.internalChannel)
} else {
override def @{method.nameSafe}(): StreamResponseRequestBuilder[@method.parameterType, @method.outputTypeUnboxed] =
@{method.name}RequestBuilder(clientState.internalChannel)
@{method.name}RequestBuilder(channel.internalChannel)
}

/**
Expand All @@ -89,16 +91,18 @@ final class Default@{service.name}Client(settings: GrpcClientSettings)(implicit
@{method.nameSafe}().invoke(in)
}

override def close(): scala.concurrent.Future[akka.Done] = clientState.close()
override def closed: scala.concurrent.Future[akka.Done] = clientState.closed()
override def close(): scala.concurrent.Future[akka.Done] = channel.close()
override def closed: scala.concurrent.Future[akka.Done] = channel.closed()

}

@@AkkaGrpcGenerated
object Default@{service.name}Client {

def apply(settings: GrpcClientSettings)(implicit sys: ClassicActorSystemProvider): @{service.name}Client =
new Default@{service.name}Client(settings)
new Default@{service.name}Client(GrpcChannel(settings))
def apply(channel: GrpcChannel)(implicit sys: ClassicActorSystemProvider): @{service.name}Client =
new Default@{service.name}Client(channel)
}

@@AkkaGrpcGenerated
Expand Down
1 change: 1 addition & 0 deletions interop-tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ akka.http {
server {
default-host-header = "localhost.com"
preview.enable-http2 = on
remote-address-attribute = on
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (C) 2020-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.grpc.scaladsl

import scala.concurrent.Await
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.grpc.scaladsl.tools.MutableServiceDiscovery
import akka.grpc.{ GrpcChannel, GrpcClientSettings }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.RemoteAddress
import akka.http.scaladsl.server.Directives
import akka.stream.SystemMaterializer
import com.typesafe.config.{ Config, ConfigFactory }
import example.myapp.helloworld.grpc.helloworld._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Span
import org.scalatest.wordspec.AnyWordSpec

class GrpcClientSpecNetty extends GrpcChannelSpec()

class GrpcChannelSpec(config: Config = ConfigFactory.load())
extends AnyWordSpec
with Matchers
with BeforeAndAfterAll
with ScalaFutures {
implicit val system = ActorSystem("GrpcChannelSpec", config)
implicit val mat = SystemMaterializer(system).materializer
implicit val ec = system.dispatcher

override implicit val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, Span(10, org.scalatest.time.Millis))

"GrpcChannel" should {
"create separate connections for separate channels" in {
val clientAddresses = scala.collection.concurrent.TrieMap.empty[RemoteAddress.IP, Unit]
val service = new CountingGreeterServiceImpl()
val handler = GreeterServiceHandler(service)
val route = Directives.extractClientIP { clientIp =>
clientAddresses.put(clientIp.toIP.get, ())
Directives.handle(handler)
}

val server = Http().newServerAt("127.0.0.1", 0).bind(route).futureValue

val discovery = MutableServiceDiscovery(List(server))
val settings = GrpcClientSettings.usingServiceDiscovery("greeter", discovery).withTls(false)

val greeterClient1 = GreeterServiceClient(settings)
greeterClient1.sayHello(HelloRequest(s"Hello 1")).futureValue

val greeterClient2 = GreeterServiceClient(settings)
greeterClient2.sayHello(HelloRequest(s"Hello 2")).futureValue

clientAddresses.size should be(2)
}
"reuse a single connection for a shared channel" in {
val clientAddresses = scala.collection.concurrent.TrieMap.empty[RemoteAddress.IP, Unit]
val service = new CountingGreeterServiceImpl()
val handler = GreeterServiceHandler(service)
val route = Directives.extractClientIP { clientIp =>
clientAddresses.put(clientIp.toIP.get, ())
Directives.handle(handler)
}

val server = Http().newServerAt("127.0.0.1", 0).bind(route).futureValue

val discovery = MutableServiceDiscovery(List(server))
val settings = GrpcClientSettings.usingServiceDiscovery("greeter", discovery).withTls(false)
val channel = GrpcChannel(settings)

val greeterClient1 = GreeterServiceClient(channel)
greeterClient1.sayHello(HelloRequest(s"Hello 0")).futureValue

val greeterClient2 = GreeterServiceClient(channel)
greeterClient2.sayHello(HelloRequest(s"Hello 1")).futureValue

clientAddresses.size should be(1)
}
}

override def afterAll(): Unit = {
Await.result(system.terminate(), 10.seconds)
}
}
44 changes: 44 additions & 0 deletions runtime/src/main/scala/akka/grpc/GrpcChannel.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (C) 2018-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.grpc

import java.util.concurrent.CompletionStage

import scala.compat.java8.FutureConverters._
import scala.concurrent.Future

import akka.Done
import akka.actor.ClassicActorSystemProvider
import akka.annotation.InternalApi
import akka.grpc.internal.{ ChannelUtils, InternalChannel }
import akka.grpc.scaladsl.Grpc

class GrpcChannel(val settings: GrpcClientSettings, @InternalApi val internalChannel: InternalChannel)(
implicit sys: ClassicActorSystemProvider) {

Grpc(sys).registerChannel(this)

def closeCS(): CompletionStage[Done] =
close().toJava

def closedCS(): CompletionStage[Done] =
closed().toJava

def close(): Future[akka.Done] = {
Grpc(sys).deregisterChannel(this)
ChannelUtils.close(internalChannel)
}

def closed(): Future[akka.Done] =
internalChannel.done
}

object GrpcChannel {
def apply(settings: GrpcClientSettings)(implicit sys: ClassicActorSystemProvider): GrpcChannel = {
new GrpcChannel(
settings,
ChannelUtils.create(settings, akka.event.Logging(sys.classicSystem, classOf[GrpcChannel])))
}
}
19 changes: 18 additions & 1 deletion runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import akka.Done
import akka.annotation.InternalApi
import akka.event.LoggingAdapter
import io.grpc.{ ConnectivityState, ManagedChannel }

import scala.compat.java8.FutureConverters._
import scala.concurrent.{ Future, Promise }

import akka.actor.ClassicActorSystemProvider
import akka.grpc.GrpcClientSettings

/**
* Used to indicate that a gRPC client can not establish a connection
* after the configured number of attempts.
Expand All @@ -29,6 +31,21 @@ class ClientConnectionException(msg: String) extends RuntimeException(msg)
@InternalApi
object ChannelUtils {

/**
* INTERNAL API
*/
@InternalApi
def create(settings: GrpcClientSettings, log: LoggingAdapter)(
implicit sys: ClassicActorSystemProvider): InternalChannel = {
settings.backend match {
case "netty" =>
NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher)
case "akka-http" =>
AkkaHttpClientUtils.createChannel(settings, log)
case _ => throw new IllegalArgumentException(s"Unexpected backend [${settings.backend}]")
}
}

/**
* INTERNAL API
*/
Expand Down
66 changes: 0 additions & 66 deletions runtime/src/main/scala/akka/grpc/internal/ClientState.scala

This file was deleted.

Loading

0 comments on commit 911bd42

Please sign in to comment.