Skip to content

Commit

Permalink
fewer apply factories, use built in serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Oct 3, 2024
1 parent 03a3816 commit a94c67c
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.projection.grpc.replication

import akka.Done
import akka.actor.ExtendedActorSystem
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
Expand All @@ -32,92 +31,22 @@ import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors
import akka.projection.grpc.replication.scaladsl.Replication
import akka.projection.grpc.replication.scaladsl.ReplicationSettings
import akka.projection.r2dbc.scaladsl.R2dbcReplication
import akka.serialization.BaseSerializer
import akka.serialization.ByteBufferSerializer
import akka.serialization.SerializerWithStringManifest
import akka.testkit.SocketUtil
import com.google.protobuf.CodedInputStream
import com.google.protobuf.CodedOutputStream
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory
import scalapb.GeneratedMessage
import scalapb.GeneratedMessageCompanion

import java.nio.ByteBuffer
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

object ReplicationProtoEventIntegrationSpec {

class ProtobufSerializer(val system: ExtendedActorSystem)
extends SerializerWithStringManifest
with BaseSerializer
with ByteBufferSerializer {

private val TagChangedManifest = akka.projection.grpc.test.TagChanged.javaDescriptor.getName
private val GreetingChangedManifest = akka.projection.grpc.test.GreetingChanged.javaDescriptor.getName

override def manifest(o: AnyRef): String = {
o match {
case msg: GeneratedMessage => msg.companion.javaDescriptor.getName
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
}

override def toBinary(o: AnyRef): Array[Byte] = {
o match {
case msg: GeneratedMessage => msg.toByteArray
case _ =>
throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]")
}
}

override def toBinary(o: AnyRef, buf: ByteBuffer): Unit = {
o match {
case msg: GeneratedMessage =>
val output = CodedOutputStream.newInstance(buf)
msg.writeTo(output)
output.flush()
case _ =>
throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]")
}
}

override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
getCompanion(manifest).parseFrom(bytes).asInstanceOf[AnyRef]
}

override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = {
getCompanion(manifest).parseFrom(CodedInputStream.newInstance(buf)).asInstanceOf[AnyRef]
}

private def getCompanion(manifest: String): GeneratedMessageCompanion[_] = manifest match {
case GreetingChangedManifest => akka.projection.grpc.test.GreetingChanged.messageCompanion
case TagChangedManifest => akka.projection.grpc.test.TagChanged.messageCompanion
case unknown => throw new IllegalArgumentException(s"Unknown manifest $unknown")
}

}

private def config(dc: ReplicaId): Config =
ConfigFactory.parseString(s"""
akka.actor.provider = cluster
akka.actor {
serializers {
my-replication-serializer = "akka.projection.grpc.replication.ReplicationProtoEventIntegrationSpec$$ProtobufSerializer"
}
serialization-identifiers {
"akka.projection.grpc.replication.ReplicationProtoEventIntegrationSpec$$ProtobufSerializer" = 1528148901
}
serialization-bindings {
"scalapb.GeneratedMessage" = my-replication-serializer
}
}
akka.http.server.preview.enable-http2 = on
akka.persistence.r2dbc {
journal.table = "event_journal_${dc.id}"
Expand Down Expand Up @@ -156,6 +85,7 @@ object ReplicationProtoEventIntegrationSpec {
final case class SetTag(tag: String, replyTo: ActorRef[Done]) extends Command

// events are defined directly as proto messages
// Note: uses the default auto-serialization of protobuf generated messages
private implicit class ProtoLwwToScalaLww(protoLwwTime: akka.projection.grpc.test.LwwTime) {
def toScala: LwwTime = LwwTime(protoLwwTime.timestamp, ReplicaId(protoLwwTime.originReplica))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import java.util
import java.util.Optional
import java.util.UUID
import java.util.concurrent.CompletionStage

import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._

import akka.Done
import akka.NotUsed
import akka.actor.ClassicActorSystemProvider
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.grpc.GrpcClientSettings
Expand Down Expand Up @@ -72,14 +71,14 @@ object GrpcReadJournal {
clientSettings: GrpcClientSettings,
protobufDescriptors: java.util.List[Descriptors.FileDescriptor]): GrpcReadJournal = {
import akka.util.ccompat.JavaConverters._
val protoAnySerialization =
new ProtoAnySerialization(
system.classicSystem.toTyped,
protobufDescriptors.asScala.toVector,
ProtoAnySerialization.Prefer.Java)
new GrpcReadJournal(
scaladsl
.GrpcReadJournal(
settings,
clientSettings,
protobufDescriptors.asScala.toList,
ProtoAnySerialization.Prefer.Java,
replicationSettings = None)(system))
.GrpcReadJournal(settings, clientSettings, protoAnySerialization, replicationSettings = None)(system))
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import akka.Done
import akka.NotUsed
import akka.actor.ClassicActorSystemProvider
import akka.actor.ExtendedActorSystem
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
Expand All @@ -30,7 +29,6 @@ import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.consumer.scaladsl
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilderOverrides
import akka.projection.grpc.internal.ProjectionGrpcSerialization
import akka.projection.grpc.internal.DelegateToAkkaSerialization
import akka.projection.grpc.internal.ConnectionException
import akka.projection.grpc.internal.ProtoAnySerialization
import akka.projection.grpc.internal.ProtobufProtocolConversions
Expand Down Expand Up @@ -112,55 +110,25 @@ object GrpcReadJournal {
settings: GrpcQuerySettings,
clientSettings: GrpcClientSettings,
protobufDescriptors: immutable.Seq[Descriptors.FileDescriptor])(
implicit system: ClassicActorSystemProvider): GrpcReadJournal =
apply(settings, clientSettings, protobufDescriptors, ProtoAnySerialization.Prefer.Scala, replicationSettings = None)

/**
* INTERNAL API
*/
@InternalApi private[akka] def apply(
settings: GrpcQuerySettings,
clientSettings: GrpcClientSettings,
protobufDescriptors: immutable.Seq[Descriptors.FileDescriptor],
protobufPrefer: ProtoAnySerialization.Prefer,
replicationSettings: Option[ReplicationSettings[_]])(
implicit system: ClassicActorSystemProvider): GrpcReadJournal = {

// FIXME issue #702 This probably means that one GrpcReadJournal instance is created for each Projection instance,
// and therefore one grpc client for each. Is that fine or should the client be shared for same clientSettings?

val protoAnySerialization =
new ProtoAnySerialization(system.classicSystem.toTyped, protobufDescriptors, protobufPrefer)

if (settings.initialConsumerFilter.nonEmpty) {
ConsumerFilter(system.classicSystem.toTyped).ref ! ConsumerFilter.UpdateFilter(
settings.streamId,
settings.initialConsumerFilter)
}

new scaladsl.GrpcReadJournal(
system.classicSystem.asInstanceOf[ExtendedActorSystem],
settings,
withChannelBuilderOverrides(clientSettings),
protoAnySerialization,
replicationSettings)
new ProtoAnySerialization(system.classicSystem.toTyped, protobufDescriptors, ProtoAnySerialization.Prefer.Scala)
apply(settings, clientSettings, protoAnySerialization, replicationSettings = None)
}

/**
* INTERNAL API
*
* Factory method for replication, with replication specific serialization
*/
@InternalApi private[akka] def apply(
settings: GrpcQuerySettings,
clientSettings: GrpcClientSettings,
replicationSettings: Option[ReplicationSettings[_]])(implicit system: ActorSystem[_]): GrpcReadJournal = {
wireSerialization: ProjectionGrpcSerialization,
replicationSettings: Option[ReplicationSettings[_]])(
implicit system: ClassicActorSystemProvider): GrpcReadJournal = {

// FIXME issue #702 This probably means that one GrpcReadJournal instance is created for each Projection instance,
// and therefore one grpc client for each. Is that fine or should the client be shared for same clientSettings?

val wireSerialization = new DelegateToAkkaSerialization(system)

if (settings.initialConsumerFilter.nonEmpty) {
ConsumerFilter(system.classicSystem.toTyped).ref ! ConsumerFilter.UpdateFilter(
settings.streamId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.consumer.scaladsl.EventProducerPushDestination
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal
import akka.projection.grpc.internal.DelegateToAkkaSerialization
import akka.projection.grpc.producer.scaladsl.EventProducer
import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
Expand Down Expand Up @@ -164,7 +165,10 @@ private[akka] object ReplicationImpl {
case None => s2
}
}
val eventsBySlicesQuery = GrpcReadJournal(grpcQuerySettings, remoteReplica.grpcClientSettings, Some(settings))

val wireSerialization = new DelegateToAkkaSerialization(system)
val eventsBySlicesQuery =
GrpcReadJournal(grpcQuerySettings, remoteReplica.grpcClientSettings, wireSerialization, Some(settings))
log.infoN(
"Starting {} projection streams{} consuming events for Replicated Entity [{}] from [{}] (at {}:{})",
remoteReplica.numberOfConsumers,
Expand Down

0 comments on commit a94c67c

Please sign in to comment.