Skip to content

Commit

Permalink
chore: A bit less noise on gRPC journal client fail/reconnect (#980)
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren authored Aug 31, 2023
1 parent 72614aa commit c970a82
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.consumer.scaladsl
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilderOverrides
import akka.projection.grpc.internal.ConnectionException
import akka.projection.grpc.internal.ProtoAnySerialization
import akka.projection.grpc.internal.ProtobufProtocolConversions
import akka.projection.grpc.internal.proto
Expand All @@ -52,6 +53,7 @@ import akka.util.Timeout
import com.google.protobuf.Descriptors
import com.google.protobuf.timestamp.Timestamp
import com.typesafe.config.Config
import io.grpc.Status
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand All @@ -61,7 +63,6 @@ import java.util.concurrent.TimeUnit
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

@ApiMayChange
object GrpcReadJournal {
val Identifier = "akka.projection.grpc.consumer"
Expand Down Expand Up @@ -339,6 +340,13 @@ final class GrpcReadJournal private (
addRequestHeaders(client.eventsBySlices())
.invoke(streamIn)
.recover {
case ex: akka.grpc.GrpcServiceException if ex.status.getCode == Status.Code.UNAVAILABLE =>
// this means we couldn't connect, will be retried, relatively common, so make it less noisy
throw new ConnectionException(
clientSettings.serviceName,
clientSettings.servicePortName.getOrElse(clientSettings.defaultPort.toString),
streamId)

case th: Throwable =>
throw new RuntimeException(s"Failure to consume gRPC event stream for [${streamId}]", th)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.grpc.internal

import akka.annotation.InternalApi

import scala.util.control.NoStackTrace

/**
* INTERNAL API
*/
@InternalApi
private[akka] final class ConnectionException(host: String, port: String, streamId: String)
extends RuntimeException(s"Connection to $host:$port for stream id $streamId failed or lost")
with NoStackTrace

0 comments on commit c970a82

Please sign in to comment.