From 0e95c38be0d51d0de0adbbc108252dcbae4484fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 1 Oct 2024 16:05:34 +0200 Subject: [PATCH 1/4] fix: Do not special handle protobuf payloads for replication --- .../consumer/scaladsl/GrpcReadJournal.scala | 39 ++++++- .../internal/EventProducerServiceImpl.scala | 11 +- .../grpc/internal/EventPusher.scala | 6 +- .../grpc/internal/ProtoAnySerialization.scala | 106 ++++++++++++++---- .../ProtobufProtocolConversions.scala | 12 +- .../grpc/producer/EventProducerSettings.scala | 21 +++- .../internal/ReplicationImpl.scala | 12 +- 7 files changed, 157 insertions(+), 50 deletions(-) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index 94a2ffc93..328813baf 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -8,6 +8,7 @@ import akka.Done import akka.NotUsed import akka.actor.ClassicActorSystemProvider import akka.actor.ExtendedActorSystem +import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.adapter._ import akka.annotation.InternalApi @@ -28,6 +29,8 @@ import akka.projection.grpc.consumer.ConsumerFilter import akka.projection.grpc.consumer.GrpcQuerySettings import akka.projection.grpc.consumer.scaladsl import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilderOverrides +import akka.projection.grpc.internal.AkkaProjectionGrpcSerialization +import akka.projection.grpc.internal.DelegateToAkkaSerialization import akka.projection.grpc.internal.ConnectionException import akka.projection.grpc.internal.ProtoAnySerialization import akka.projection.grpc.internal.ProtobufProtocolConversions @@ -53,14 +56,13 @@ import io.grpc.Status import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder import org.slf4j.Logger import org.slf4j.LoggerFactory + import java.time.Instant import java.util.UUID import java.util.concurrent.TimeUnit - import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future - import akka.projection.grpc.internal.proto.ReplayPersistenceId import akka.projection.grpc.internal.proto.ReplicaInfo import akka.projection.grpc.replication.scaladsl.ReplicationSettings @@ -144,6 +146,35 @@ object GrpcReadJournal { replicationSettings) } + /** + * INTERNAL API + * + * Factory method for replication, with replication specific serialization + */ + @InternalApi private[akka] def apply( + settings: GrpcQuerySettings, + clientSettings: GrpcClientSettings, + replicationSettings: Option[ReplicationSettings[_]])(implicit system: ActorSystem[_]): GrpcReadJournal = { + + // FIXME issue #702 This probably means that one GrpcReadJournal instance is created for each Projection instance, + // and therefore one grpc client for each. Is that fine or should the client be shared for same clientSettings? + + val wireSerialization = new DelegateToAkkaSerialization(system) + + if (settings.initialConsumerFilter.nonEmpty) { + ConsumerFilter(system.classicSystem.toTyped).ref ! ConsumerFilter.UpdateFilter( + settings.streamId, + settings.initialConsumerFilter) + } + + new scaladsl.GrpcReadJournal( + system.classicSystem.asInstanceOf[ExtendedActorSystem], + settings, + withChannelBuilderOverrides(clientSettings), + wireSerialization, + replicationSettings) + } + private def withChannelBuilderOverrides(clientSettings: GrpcClientSettings): GrpcClientSettings = { // compose with potential user overrides to allow overriding our defaults clientSettings.withChannelBuilderOverrides(channelBuilderOverrides.andThen(clientSettings.channelBuilderOverrides)) @@ -160,7 +191,7 @@ final class GrpcReadJournal private ( system: ExtendedActorSystem, settings: GrpcQuerySettings, clientSettings: GrpcClientSettings, - protoAnySerialization: ProtoAnySerialization, + wireSerialization: AkkaProjectionGrpcSerialization, replicationSettings: Option[ReplicationSettings[_]]) extends ReadJournal with EventsBySliceQuery @@ -424,7 +455,7 @@ final class GrpcReadJournal private ( // not the normal entity type which is internal to the producing side streamId: String): EventEnvelope[Evt] = { require(streamId == settings.streamId, s"Stream id mismatch, was [$streamId], expected [${settings.streamId}]") - ProtobufProtocolConversions.eventToEnvelope(event, protoAnySerialization) + ProtobufProtocolConversions.eventToEnvelope(event, wireSerialization) } private def filteredEventToEnvelope[Evt](filteredEvent: FilteredEvent, entityType: String): EventEnvelope[Evt] = { diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala index 79923b354..d9d580fce 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala @@ -86,7 +86,8 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation "Use Transformation.identity to pass through each event as is.") } - private val protoAnySerialization = new ProtoAnySerialization(system) + private val protoAnyWireSerialization = new ProtoAnySerialization(system) + private val akkaOnlyWireSerialization = new DelegateToAkkaSerialization(system) private val streamIdToSourceMap: Map[String, EventProducer.EventProducerSource] = sources.map(s => s.streamId -> s).toMap @@ -97,6 +98,10 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation .map(s => s"(stream id: [${s.streamId}], entity type: [${s.entityType}])") .mkString(", ")) + private def wireSerialization(eps: EventProducer.EventProducerSource): AkkaProjectionGrpcSerialization = + if (eps.settings.akkaSerializationOnly) akkaOnlyWireSerialization + else protoAnyWireSerialization + private def intercept(streamId: String, metadata: Metadata): Future[Done] = interceptor match { case Some(interceptor) => interceptor.intercept(streamId, metadata) @@ -229,7 +234,7 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation ProtobufProtocolConversions.offsetToProtoOffset(env.offset))))) case _ => import system.executionContext - transformAndEncodeEvent(producerSource.transformation, env, protoAnySerialization) + transformAndEncodeEvent(producerSource.transformation, env, wireSerialization(producerSource)) .map { case Some(event) => if (log.isTraceEnabled) @@ -307,7 +312,7 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation .flatMap(f => req.replicaInfo.map(f.createFilter)) .getOrElse((_: EventEnvelope[_]) => true) if (eventOriginFilter(env)) { - transformAndEncodeEvent(producerSource.transformation, env, protoAnySerialization) + transformAndEncodeEvent(producerSource.transformation, env, wireSerialization(producerSource)) .map { case Some(event) => log.traceN( diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala index 078903c2b..88d9b3494 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala @@ -65,7 +65,9 @@ private[akka] object EventPusher { import akka.projection.grpc.internal.ProtobufProtocolConversions.transformAndEncodeEvent implicit val ec: ExecutionContext = system.executionContext - val protoAnySerialization = new ProtoAnySerialization(system) + val wireSerialization = + if (eps.settings.akkaSerializationOnly) new DelegateToAkkaSerialization(system) + else new ProtoAnySerialization(system) def filterAndTransformFlow(filters: Future[proto.ConsumerEventStart]) : Flow[(EventEnvelope[Event], ProjectionContext), (ConsumeEventIn, ProjectionContext), NotUsed] = @@ -119,7 +121,7 @@ private[akka] object EventPusher { envelope.sequenceNr, startMessage.replicaInfo.fold("")(ri => s", remote replica [${ri.replicaId}]")) - transformAndEncodeEvent(eps.transformation, envelope, protoAnySerialization) + transformAndEncodeEvent(eps.transformation, envelope, wireSerialization) } else { if (logger.isTraceEnabled()) logger.trace( diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala index e337d1f7e..f1ebbf845 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala @@ -7,7 +7,6 @@ package akka.projection.grpc.internal import scala.collection.concurrent.TrieMap import scala.collection.immutable import scala.util.Try - import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi @@ -23,6 +22,7 @@ import com.google.protobuf.Descriptors import com.google.protobuf.GeneratedMessageV3 import com.google.protobuf.Message import com.google.protobuf.Parser +import com.google.protobuf.UnsafeByteOperations import com.google.protobuf.any.{ Any => ScalaPbAny } import com.google.protobuf.{ Any => JavaPbAny } import com.google.protobuf.{ Any => PbAny } @@ -99,15 +99,45 @@ import scalapb.options.Scalapb } } + def akkaSerializationTypeUrl(serializerId: Int, manifest: String): String = { + if (manifest.isEmpty) s"$AkkaSerializationTypeUrlPrefix$serializerId" + else + s"$AkkaSerializationTypeUrlPrefix$serializerId$AkkaTypeUrlManifestSeparator$manifest" + } + + def akkaSerializerIdAndManifestFromTypeUrl(typeUrl: String): (Int, String) = { + val idAndManifest = + typeUrl.substring(AkkaSerializationTypeUrlPrefix.length) + val i = idAndManifest.indexOf(AkkaTypeUrlManifestSeparator) + if (i == -1) + idAndManifest.toInt -> "" + else + idAndManifest.substring(0, i).toInt -> idAndManifest.substring(i + 1) + } + } /** * INTERNAL API */ -@InternalApi private[akka] class ProtoAnySerialization( +@InternalApi +private[akka] trait AkkaProjectionGrpcSerialization { + def serialize(event: Any): ScalaPbAny + def deserialize(scalaPbAny: ScalaPbAny): Any + def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent] +} + +/** + * Primarily intended for direct usage of grpc projections, where there may be a public (protobuf) protocol published + * by the producing service. + * + * INTERNAL API + */ +@InternalApi private[akka] final class ProtoAnySerialization( system: ActorSystem[_], descriptors: immutable.Seq[Descriptors.FileDescriptor], - prefer: ProtoAnySerialization.Prefer) { + prefer: ProtoAnySerialization.Prefer) + extends AkkaProjectionGrpcSerialization { import ProtoAnySerialization._ private val serialization = SerializationExtension(system.classicSystem) @@ -138,7 +168,7 @@ import scalapb.options.Scalapb def this(system: ActorSystem[_]) = this(system, descriptors = Nil, ProtoAnySerialization.Prefer.Scala) - def serialize(event: Any): ScalaPbAny = { + override def serialize(event: Any): ScalaPbAny = { event match { case scalaPbAny: ScalaPbAny if scalaPbAny.typeUrl.startsWith(GoogleTypeUrlPrefix) => ScalaPbAny(ProtoAnyTypeUrl, scalaPbAny.toByteString) @@ -153,15 +183,15 @@ import scalapb.options.Scalapb case other => // fallback to Akka serialization val otherAnyRef = other.asInstanceOf[AnyRef] - val bytes = serialization.serialize(otherAnyRef).get val serializer = serialization.findSerializerFor(otherAnyRef) + val bytes = serializer.toBinary(otherAnyRef) val manifest = Serializers.manifestFor(serializer, otherAnyRef) val id = serializer.identifier - ScalaPbAny(akkaSerializationTypeUrl(id, manifest), ByteString.copyFrom(bytes)) + ScalaPbAny(akkaSerializationTypeUrl(id, manifest), UnsafeByteOperations.unsafeWrap(bytes)) } } - def deserialize(scalaPbAny: ScalaPbAny): Any = { + override def deserialize(scalaPbAny: ScalaPbAny): Any = { val typeUrl = scalaPbAny.typeUrl if (typeUrl == ProtoAnyTypeUrl) { if (prefer == Prefer.Scala) @@ -172,6 +202,7 @@ import scalapb.options.Scalapb decodeMessage(scalaPbAny) } else if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) { val (id, manifest) = akkaSerializerIdAndManifestFromTypeUrl(typeUrl) + // FIXME could potentially optimize to use byte buffer here instead of copy to byte array serialization.deserialize(scalaPbAny.value.toByteArray, id, manifest).get } else if (prefer == Prefer.Scala) { // when custom typeUrl @@ -182,7 +213,7 @@ import scalapb.options.Scalapb } } - def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent] = { + override def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent] = { // see corresponding typeUrl cases in `deserialize` val typeUrl = scalaPbAny.typeUrl @@ -224,22 +255,6 @@ import scalapb.options.Scalapb } } - private def akkaSerializationTypeUrl(serializerId: Int, manifest: String): String = { - if (manifest.isEmpty) s"$AkkaSerializationTypeUrlPrefix$serializerId" - else - s"$AkkaSerializationTypeUrlPrefix$serializerId$AkkaTypeUrlManifestSeparator$manifest" - } - - private def akkaSerializerIdAndManifestFromTypeUrl(typeUrl: String): (Int, String) = { - val idAndManifest = - typeUrl.substring(AkkaSerializationTypeUrlPrefix.length) - val i = idAndManifest.indexOf(AkkaTypeUrlManifestSeparator) - if (i == -1) - idAndManifest.toInt -> "" - else - idAndManifest.substring(0, i).toInt -> idAndManifest.substring(i + 1) - } - private def strippedFileName(fileName: String) = fileName.split(Array('/', '\\')).last.stripSuffix(".proto") @@ -451,3 +466,46 @@ import scalapb.options.Scalapb } } + +/** + * Primarily intended for replication where there is no public protocol + * + * INTERNAL API + */ +@InternalApi +private[akka] final class DelegateToAkkaSerialization(system: ActorSystem[_]) extends AkkaProjectionGrpcSerialization { + import ProtoAnySerialization._ + private val serialization = SerializationExtension(system.classicSystem) + + override def serialize(event: Any): ScalaPbAny = { + val anyRefEvent = event.asInstanceOf[AnyRef] + val serializer = serialization.findSerializerFor(anyRefEvent) + val bytes = serializer.toBinary(anyRefEvent) + val manifest = Serializers.manifestFor(serializer, anyRefEvent) + val id = serializer.identifier + ScalaPbAny(akkaSerializationTypeUrl(id, manifest), UnsafeByteOperations.unsafeWrap(bytes)) + } + + override def deserialize(event: ScalaPbAny): Any = { + val typeUrl = event.typeUrl + if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) { + val (id, manifest) = akkaSerializerIdAndManifestFromTypeUrl(typeUrl) + // FIXME could potentially optimize to use byte buffer here instead of copy to byte array + serialization.deserialize(event.value.toByteArray, id, manifest).get + } else { + throw new IllegalArgumentException( + s"Got event with type url: [${typeUrl}] but only type urls with Akka serializer prefix ($AkkaSerializationTypeUrlPrefix) supported") + } + } + + override def toSerializedEvent(event: ScalaPbAny): Option[SerializedEvent] = { + val typeUrl = event.typeUrl + if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) { + val (id, manifest) = akkaSerializerIdAndManifestFromTypeUrl(typeUrl) + Some(new SerializedEvent(event.value.toByteArray, id, manifest)) + } else { + // We don't try to optimize this case. One level of indirection too much, and probably not a common case. + None + } + } +} diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala index 3ff15312e..4182e9f3d 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala @@ -105,7 +105,7 @@ private[akka] object ProtobufProtocolConversions { def transformAndEncodeEvent( transformation: Transformation, env: EventEnvelope[_], - protoAnySerialization: ProtoAnySerialization)( + protoAnySerialization: AkkaProjectionGrpcSerialization)( implicit executionContext: ExecutionContext): Future[Option[Event]] = { env.eventOption match { case Some(_) => @@ -147,22 +147,22 @@ private[akka] object ProtobufProtocolConversions { } } - def eventToEnvelope[Evt](event: Event, protoAnySerialization: ProtoAnySerialization): EventEnvelope[Evt] = + def eventToEnvelope[Evt](event: Event, protoAnySerialization: AkkaProjectionGrpcSerialization): EventEnvelope[Evt] = eventToEnvelope(event, protoAnySerialization, deserializeEvent = true).asInstanceOf[EventEnvelope[Evt]] def eventToEnvelope( event: Event, - protoAnySerialization: ProtoAnySerialization, + wireSerialization: AkkaProjectionGrpcSerialization, deserializeEvent: Boolean): EventEnvelope[Any] = { val eventOffset = populateSeenIfNeeded( TimestampOffset.toTimestampOffset(protocolOffsetToOffset(event.offset)), event.persistenceId, event.seqNr) - val metadata: Option[Any] = event.metadata.map(protoAnySerialization.deserialize) + val metadata: Option[Any] = event.metadata.map(wireSerialization.deserialize) def envelopeWithDeserializedEvent: EventEnvelope[Any] = { - val evt = event.payload.map(protoAnySerialization.deserialize) + val evt = event.payload.map(wireSerialization.deserialize) new EventEnvelope( eventOffset, event.persistenceId, @@ -180,7 +180,7 @@ private[akka] object ProtobufProtocolConversions { if (deserializeEvent || event.payload.isEmpty) { envelopeWithDeserializedEvent } else { - protoAnySerialization.toSerializedEvent(event.payload.get) match { + wireSerialization.toSerializedEvent(event.payload.get) match { case Some(serializedEvent) => new EventEnvelope( eventOffset, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/EventProducerSettings.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/EventProducerSettings.scala index add0854ce..e93a6c3da 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/EventProducerSettings.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/EventProducerSettings.scala @@ -5,6 +5,7 @@ package akka.projection.grpc.producer import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi import akka.util.JavaDurationConverters.JavaDurationOps import com.typesafe.config.Config @@ -23,7 +24,8 @@ object EventProducerSettings { transformationParallelism = config.getInt("transformation-parallelism"), replayParallelism = config.getInt("filter.replay-parallelism"), topicTagPrefix = config.getString("filter.topic-tag-prefix"), - keepAliveInterval = config.getDuration("keep-alive-interval").asScala) + keepAliveInterval = config.getDuration("keep-alive-interval").asScala, + akkaSerializationOnly = false) } /** Java API */ @@ -40,8 +42,23 @@ final class EventProducerSettings private ( val transformationParallelism: Int, val replayParallelism: Int, val topicTagPrefix: String, - val keepAliveInterval: FiniteDuration) { + val keepAliveInterval: FiniteDuration, + val akkaSerializationOnly: Boolean) { require(transformationParallelism >= 1, "Configuration property [transformation-parallelism] must be >= 1.") require(replayParallelism >= 1, "Configuration property [replay-parallelism] must be >= 1.") + /** + * INTERNAL API + */ + @InternalApi + private[akka] def withAkkaSerializationOnly(): EventProducerSettings = { + new EventProducerSettings( + queryPluginId, + transformationParallelism, + replayParallelism, + topicTagPrefix, + keepAliveInterval, + akkaSerializationOnly = true) + } + } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala index f3cac05ac..d72da3f7e 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala @@ -36,7 +36,6 @@ import akka.projection.grpc.consumer.ConsumerFilter import akka.projection.grpc.consumer.GrpcQuerySettings import akka.projection.grpc.consumer.scaladsl.EventProducerPushDestination import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal -import akka.projection.grpc.internal.ProtoAnySerialization import akka.projection.grpc.producer.scaladsl.EventProducer import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation @@ -98,7 +97,7 @@ private[akka] object ReplicationImpl { settings.entityTypeKey.name, settings.streamId, Transformation.identity, - settings.eventProducerSettings, + settings.eventProducerSettings.withAkkaSerializationOnly(), settings.producerFilter) .withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId)) @@ -165,12 +164,7 @@ private[akka] object ReplicationImpl { case None => s2 } } - val eventsBySlicesQuery = GrpcReadJournal( - grpcQuerySettings, - remoteReplica.grpcClientSettings, - Nil, - ProtoAnySerialization.Prefer.Scala, - Some(settings)) + val eventsBySlicesQuery = GrpcReadJournal(grpcQuerySettings, remoteReplica.grpcClientSettings, Some(settings)) log.infoN( "Starting {} projection streams{} consuming events for Replicated Entity [{}] from [{}] (at {}:{})", remoteReplica.numberOfConsumers, @@ -347,7 +341,7 @@ private[akka] object ReplicationImpl { settings.entityTypeKey.name, settings.streamId, Transformation.identity, - settings.eventProducerSettings) + settings.eventProducerSettings.withAkkaSerializationOnly()) .withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId)) val epp = From 30f4a74731b5758fa662eac6f4ce4db636d8cae3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 2 Oct 2024 11:15:55 +0200 Subject: [PATCH 2/4] Test coverage --- .../test/replication_integration_events.proto | 20 + ...ReplicationProtoEventIntegrationSpec.scala | 356 ++++++++++++++++++ ... => AkkaProjectionGrpcSerialization.scala} | 20 +- 3 files changed, 386 insertions(+), 10 deletions(-) create mode 100644 akka-projection-grpc-integration/src/test/protobuf/akka/projection/grpc/test/replication_integration_events.proto create mode 100644 akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/ReplicationProtoEventIntegrationSpec.scala rename akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/{ProtoAnySerialization.scala => AkkaProjectionGrpcSerialization.scala} (100%) diff --git a/akka-projection-grpc-integration/src/test/protobuf/akka/projection/grpc/test/replication_integration_events.proto b/akka-projection-grpc-integration/src/test/protobuf/akka/projection/grpc/test/replication_integration_events.proto new file mode 100644 index 000000000..d6487f47e --- /dev/null +++ b/akka-projection-grpc-integration/src/test/protobuf/akka/projection/grpc/test/replication_integration_events.proto @@ -0,0 +1,20 @@ +// Copyright 2022 Lightbend Inc. + +syntax = "proto3"; + +package akka.projection.grpc.test; + +message GreetingChanged { + string greeting = 1; + LwwTime timestamp = 2; +} + +message TagChanged { + string tag = 1; + LwwTime timestamp = 2; +} + +message LwwTime { + uint64 timestamp = 1; + string origin_replica = 2; +} \ No newline at end of file diff --git a/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/ReplicationProtoEventIntegrationSpec.scala b/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/ReplicationProtoEventIntegrationSpec.scala new file mode 100644 index 000000000..a37fdc0e6 --- /dev/null +++ b/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/ReplicationProtoEventIntegrationSpec.scala @@ -0,0 +1,356 @@ +/* + * Copyright (C) 2009-2023 Lightbend Inc. + */ + +package akka.projection.grpc.replication + +import akka.Done +import akka.actor.ExtendedActorSystem +import akka.actor.testkit.typed.scaladsl.ActorTestKit +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.LoggerOps +import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps +import akka.cluster.MemberStatus +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import akka.grpc.GrpcClientSettings +import akka.http.scaladsl.Http +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.crdt.LwwTime +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.projection.grpc.TestContainerConf +import akka.projection.grpc.TestDbLifecycle +import akka.projection.grpc.producer.EventProducerSettings +import akka.projection.grpc.replication.scaladsl.Replica +import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors +import akka.projection.grpc.replication.scaladsl.Replication +import akka.projection.grpc.replication.scaladsl.ReplicationSettings +import akka.projection.r2dbc.scaladsl.R2dbcReplication +import akka.serialization.BaseSerializer +import akka.serialization.ByteBufferSerializer +import akka.serialization.SerializerWithStringManifest +import akka.testkit.SocketUtil +import com.google.protobuf.CodedInputStream +import com.google.protobuf.CodedOutputStream +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterAll +import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory +import scalapb.GeneratedMessage +import scalapb.GeneratedMessageCompanion + +import java.nio.ByteBuffer +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt + +object ReplicationProtoEventIntegrationSpec { + + class ProtobufSerializer(val system: ExtendedActorSystem) + extends SerializerWithStringManifest + with BaseSerializer + with ByteBufferSerializer { + + private val TagChangedManifest = akka.projection.grpc.test.TagChanged.javaDescriptor.getName + private val GreetingChangedManifest = akka.projection.grpc.test.GreetingChanged.javaDescriptor.getName + + override def manifest(o: AnyRef): String = { + o match { + case msg: GeneratedMessage => msg.companion.javaDescriptor.getName + case _ => + throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") + } + } + + override def toBinary(o: AnyRef): Array[Byte] = { + o match { + case msg: GeneratedMessage => msg.toByteArray + case _ => + throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]") + } + } + + override def toBinary(o: AnyRef, buf: ByteBuffer): Unit = { + o match { + case msg: GeneratedMessage => + val output = CodedOutputStream.newInstance(buf) + msg.writeTo(output) + output.flush() + case _ => + throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]") + } + } + + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { + getCompanion(manifest).parseFrom(bytes).asInstanceOf[AnyRef] + } + + override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = { + getCompanion(manifest).parseFrom(CodedInputStream.newInstance(buf)).asInstanceOf[AnyRef] + } + + private def getCompanion(manifest: String): GeneratedMessageCompanion[_] = manifest match { + case GreetingChangedManifest => akka.projection.grpc.test.GreetingChanged.messageCompanion + case TagChangedManifest => akka.projection.grpc.test.TagChanged.messageCompanion + case unknown => throw new IllegalArgumentException(s"Unknown manifest $unknown") + } + + } + + private def config(dc: ReplicaId): Config = + ConfigFactory.parseString(s""" + akka.actor.provider = cluster + akka.actor { + serializers { + my-replication-serializer = "akka.projection.grpc.replication.ReplicationProtoEventIntegrationSpec$$ProtobufSerializer" + } + serialization-identifiers { + "akka.projection.grpc.replication.ReplicationProtoEventIntegrationSpec$$ProtobufSerializer" = 1528148901 + } + serialization-bindings { + "scalapb.GeneratedMessage" = my-replication-serializer + } + } + akka.http.server.preview.enable-http2 = on + akka.persistence.r2dbc { + journal.table = "event_journal_${dc.id}" + query { + refresh-interval = 500 millis + # reducing this to have quicker test, triggers backtracking earlier + backtracking.behind-current-time = 3 seconds + } + } + akka.projection.grpc { + producer { + query-plugin-id = "akka.persistence.r2dbc.query" + } + } + akka.projection.r2dbc.offset-store { + timestamp-offset-table = "akka_projection_timestamp_offset_store_${dc.id}" + } + akka.remote.artery.canonical.host = "127.0.0.1" + akka.remote.artery.canonical.port = 0 + akka.actor.testkit.typed { + filter-leeway = 10s + system-shutdown-default = 30s + } + """) + + private val DCA = ReplicaId("DCA") + private val DCB = ReplicaId("DCB") + + object LWWHelloWorld { + + val EntityType: EntityTypeKey[Command] = EntityTypeKey[Command]("hello-world") + + sealed trait Command + final case class Get(replyTo: ActorRef[String]) extends Command + final case class SetGreeting(newGreeting: String, replyTo: ActorRef[Done]) extends Command + final case class SetTag(tag: String, replyTo: ActorRef[Done]) extends Command + + // events are defined directly as proto messages + private implicit class ProtoLwwToScalaLww(protoLwwTime: akka.projection.grpc.test.LwwTime) { + def toScala: LwwTime = LwwTime(protoLwwTime.timestamp, ReplicaId(protoLwwTime.originReplica)) + } + private implicit class ScalaLwwToProtoLww(lwwTime: LwwTime) { + def toProto: akka.projection.grpc.test.LwwTime = + akka.projection.grpc.test.LwwTime(lwwTime.timestamp, lwwTime.originReplica.id) + } + + object State { + val initial = + State("Hello world", LwwTime(Long.MinValue, ReplicaId("")), "", LwwTime(Long.MinValue, ReplicaId(""))) + } + + case class State(greeting: String, greetingTimestamp: LwwTime, tag: String, tagTimestamp: LwwTime) + + def apply(replicatedBehaviors: ReplicatedBehaviors[Command, GeneratedMessage, State]) = + replicatedBehaviors.setup { replicationContext => + EventSourcedBehavior[Command, GeneratedMessage, State]( + replicationContext.persistenceId, + State.initial, { + case (State(greeting, _, _, _), Get(replyTo)) => + replyTo ! greeting + Effect.none + case (state, SetGreeting(greeting, replyTo)) => + Effect + .persist( + akka.projection.grpc.test.GreetingChanged( + greeting, + Some(state.greetingTimestamp + .increase(replicationContext.currentTimeMillis(), replicationContext.replicaId) + .toProto))) + .thenRun((_: State) => replyTo ! Done) + case (state, SetTag(tag, replyTo)) => + Effect + .persist( + akka.projection.grpc.test.TagChanged( + tag, + Some(state.greetingTimestamp + .increase(replicationContext.currentTimeMillis(), replicationContext.replicaId) + .toProto))) + .thenRun((_: State) => replyTo ! Done) + }, { + case (currentState, akka.projection.grpc.test.GreetingChanged(newGreeting, newTimestamp, _)) => + if (newTimestamp.get.toScala.isAfter(currentState.greetingTimestamp)) + currentState.copy(newGreeting, newTimestamp.get.toScala) + else currentState + case (currentState, akka.projection.grpc.test.TagChanged(newTag, newTimestamp, _)) => + if (newTimestamp.get.toScala.isAfter(currentState.tagTimestamp)) + currentState.copy(tag = newTag, tagTimestamp = newTimestamp.get.toScala) + else currentState + case (_, unknown) => + throw new IllegalArgumentException(s"Unknown proto event type ${unknown.getClass}") + }) + .withTaggerForState { + case (state, _) => if (state.tag == "") Set.empty else Set(state.tag) + } + } + } +} + +class ReplicationProtoEventIntegrationSpec(testContainerConf: TestContainerConf) + extends ScalaTestWithActorTestKit( + akka.actor + .ActorSystem( + "ReplicationProtoEventIntegrationSpecA", + ReplicationProtoEventIntegrationSpec + .config(ReplicationProtoEventIntegrationSpec.DCA) + .withFallback(testContainerConf.config)) + .toTyped) + with AnyWordSpecLike + with TestDbLifecycle + with BeforeAndAfterAll + with LogCapturing { + import ReplicationProtoEventIntegrationSpec._ + implicit val ec: ExecutionContext = system.executionContext + + def this() = this(new TestContainerConf) + + private val logger = LoggerFactory.getLogger(classOf[ReplicationIntegrationSpec]) + override def typedSystem: ActorSystem[_] = testKit.system + + private val systems = Seq[ActorSystem[_]]( + typedSystem, + akka.actor + .ActorSystem( + "ReplicationIntegrationSpecB", + ReplicationProtoEventIntegrationSpec.config(DCB).withFallback(testContainerConf.config)) + .toTyped) + + private val grpcPorts = SocketUtil.temporaryServerAddresses(systems.size, "127.0.0.1").map(_.getPort) + private val allDcsAndPorts = Seq(DCA, DCB).zip(grpcPorts) + private val allReplicas = allDcsAndPorts.map { + case (id, port) => + Replica(id, 2, GrpcClientSettings.connectToServiceAt("127.0.0.1", port).withTls(false)) + }.toSet + + private val testKitsPerDc = Map(DCA -> testKit, DCB -> ActorTestKit(systems(1))) + private val systemPerDc = Map(DCA -> system, DCB -> systems(1)) + private val entityIds = Set("one", "two", "three") + + override protected def beforeAll(): Unit = { + super.beforeAll() + systemPerDc.values.foreach(beforeAllDeleteFromTables) + } + + def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): Replication[LWWHelloWorld.Command] = { + val settings = ReplicationSettings[LWWHelloWorld.Command]( + LWWHelloWorld.EntityType.name, + selfReplicaId, + EventProducerSettings(replicaSystem), + allReplicas, + 10.seconds, + 8, + R2dbcReplication()) + Replication.grpcReplication(settings)(ReplicationProtoEventIntegrationSpec.LWWHelloWorld.apply)(replicaSystem) + } + + "Replication over gRPC with protobuf events" should { + "form two one node clusters" in { + testKitsPerDc.values.foreach { testKit => + val cluster = Cluster(testKit.system) + cluster.manager ! Join(cluster.selfMember.address) + testKit.createTestProbe().awaitAssert { + cluster.selfMember.status should ===(MemberStatus.Up) + } + } + } + + "start two replicas" in { + val replicasStarted = Future.sequence(allReplicas.zipWithIndex.map { + case (replica, index) => + val system = systems(index) + logger + .infoN( + "Starting replica [{}], system [{}] on port [{}]", + replica.replicaId, + system.name, + replica.grpcClientSettings.defaultPort) + val started = startReplica(system, replica.replicaId) + val grpcPort = grpcPorts(index) + + // start producer server + Http()(system) + .newServerAt("127.0.0.1", grpcPort) + .bind(started.createSingleServiceHandler()) + .map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext) + .map(_ => replica.replicaId -> started) + }) + + replicasStarted.futureValue + logger.info("Both replication/producer services bound") + } + + "replicate writes from one dc to the other" in { + systemPerDc.keys.foreach { dc => + withClue(s"from ${dc.id}") { + Future + .sequence(entityIds.map { entityId => + logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id) + ClusterSharding(systemPerDc(dc)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting(s"hello 1 from ${dc.id}", _)) + }) + .futureValue + + testKitsPerDc.values.foreach { testKit => + withClue(s"on ${testKit.system.name}") { + val probe = testKit.createTestProbe() + + entityIds.foreach { entityId => + withClue(s"for entity id $entityId") { + val entityRef = ClusterSharding(testKit.system) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + + probe.awaitAssert({ + entityRef + .ask(LWWHelloWorld.Get.apply) + .futureValue should ===(s"hello 1 from ${dc.id}") + }, 10.seconds) + } + } + } + } + } + } + } + } + + protected override def afterAll(): Unit = { + logger.info("Shutting down both DCs") + systems.foreach(_.terminate()) // speed up termination by terminating all at the once + // and then make sure they are completely shutdown + systems.foreach { system => + ActorTestKit.shutdown(system) + } + super.afterAll() + } +} diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/AkkaProjectionGrpcSerialization.scala similarity index 100% rename from akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala rename to akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/AkkaProjectionGrpcSerialization.scala index f1ebbf845..d95c66082 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/AkkaProjectionGrpcSerialization.scala @@ -31,6 +31,16 @@ import scalapb.GeneratedMessage import scalapb.GeneratedMessageCompanion import scalapb.options.Scalapb +/** + * INTERNAL API + */ +@InternalApi +private[akka] trait AkkaProjectionGrpcSerialization { + def serialize(event: Any): ScalaPbAny + def deserialize(scalaPbAny: ScalaPbAny): Any + def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent] +} + /** * INTERNAL API */ @@ -117,16 +127,6 @@ import scalapb.options.Scalapb } -/** - * INTERNAL API - */ -@InternalApi -private[akka] trait AkkaProjectionGrpcSerialization { - def serialize(event: Any): ScalaPbAny - def deserialize(scalaPbAny: ScalaPbAny): Any - def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent] -} - /** * Primarily intended for direct usage of grpc projections, where there may be a public (protobuf) protocol published * by the producing service. From 03a38167584fc0967a944fc7300379a37d558de0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 3 Oct 2024 10:13:48 +0200 Subject: [PATCH 3/4] Drop Akka name prefix --- .../projection/grpc/consumer/scaladsl/GrpcReadJournal.scala | 4 ++-- .../projection/grpc/internal/EventProducerServiceImpl.scala | 2 +- ...erialization.scala => ProjectionGrpcSerialization.scala} | 6 +++--- .../grpc/internal/ProtobufProtocolConversions.scala | 6 +++--- 4 files changed, 9 insertions(+), 9 deletions(-) rename akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/{AkkaProjectionGrpcSerialization.scala => ProjectionGrpcSerialization.scala} (99%) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index 328813baf..74c7ca6b5 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -29,7 +29,7 @@ import akka.projection.grpc.consumer.ConsumerFilter import akka.projection.grpc.consumer.GrpcQuerySettings import akka.projection.grpc.consumer.scaladsl import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilderOverrides -import akka.projection.grpc.internal.AkkaProjectionGrpcSerialization +import akka.projection.grpc.internal.ProjectionGrpcSerialization import akka.projection.grpc.internal.DelegateToAkkaSerialization import akka.projection.grpc.internal.ConnectionException import akka.projection.grpc.internal.ProtoAnySerialization @@ -191,7 +191,7 @@ final class GrpcReadJournal private ( system: ExtendedActorSystem, settings: GrpcQuerySettings, clientSettings: GrpcClientSettings, - wireSerialization: AkkaProjectionGrpcSerialization, + wireSerialization: ProjectionGrpcSerialization, replicationSettings: Option[ReplicationSettings[_]]) extends ReadJournal with EventsBySliceQuery diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala index d9d580fce..f2ba02988 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala @@ -98,7 +98,7 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation .map(s => s"(stream id: [${s.streamId}], entity type: [${s.entityType}])") .mkString(", ")) - private def wireSerialization(eps: EventProducer.EventProducerSource): AkkaProjectionGrpcSerialization = + private def wireSerialization(eps: EventProducer.EventProducerSource): ProjectionGrpcSerialization = if (eps.settings.akkaSerializationOnly) akkaOnlyWireSerialization else protoAnyWireSerialization diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/AkkaProjectionGrpcSerialization.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProjectionGrpcSerialization.scala similarity index 99% rename from akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/AkkaProjectionGrpcSerialization.scala rename to akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProjectionGrpcSerialization.scala index d95c66082..3962029e9 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/AkkaProjectionGrpcSerialization.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProjectionGrpcSerialization.scala @@ -35,7 +35,7 @@ import scalapb.options.Scalapb * INTERNAL API */ @InternalApi -private[akka] trait AkkaProjectionGrpcSerialization { +private[akka] trait ProjectionGrpcSerialization { def serialize(event: Any): ScalaPbAny def deserialize(scalaPbAny: ScalaPbAny): Any def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent] @@ -137,7 +137,7 @@ private[akka] trait AkkaProjectionGrpcSerialization { system: ActorSystem[_], descriptors: immutable.Seq[Descriptors.FileDescriptor], prefer: ProtoAnySerialization.Prefer) - extends AkkaProjectionGrpcSerialization { + extends ProjectionGrpcSerialization { import ProtoAnySerialization._ private val serialization = SerializationExtension(system.classicSystem) @@ -473,7 +473,7 @@ private[akka] trait AkkaProjectionGrpcSerialization { * INTERNAL API */ @InternalApi -private[akka] final class DelegateToAkkaSerialization(system: ActorSystem[_]) extends AkkaProjectionGrpcSerialization { +private[akka] final class DelegateToAkkaSerialization(system: ActorSystem[_]) extends ProjectionGrpcSerialization { import ProtoAnySerialization._ private val serialization = SerializationExtension(system.classicSystem) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala index 4182e9f3d..298baddbf 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala @@ -105,7 +105,7 @@ private[akka] object ProtobufProtocolConversions { def transformAndEncodeEvent( transformation: Transformation, env: EventEnvelope[_], - protoAnySerialization: AkkaProjectionGrpcSerialization)( + protoAnySerialization: ProjectionGrpcSerialization)( implicit executionContext: ExecutionContext): Future[Option[Event]] = { env.eventOption match { case Some(_) => @@ -147,12 +147,12 @@ private[akka] object ProtobufProtocolConversions { } } - def eventToEnvelope[Evt](event: Event, protoAnySerialization: AkkaProjectionGrpcSerialization): EventEnvelope[Evt] = + def eventToEnvelope[Evt](event: Event, protoAnySerialization: ProjectionGrpcSerialization): EventEnvelope[Evt] = eventToEnvelope(event, protoAnySerialization, deserializeEvent = true).asInstanceOf[EventEnvelope[Evt]] def eventToEnvelope( event: Event, - wireSerialization: AkkaProjectionGrpcSerialization, + wireSerialization: ProjectionGrpcSerialization, deserializeEvent: Boolean): EventEnvelope[Any] = { val eventOffset = populateSeenIfNeeded( TimestampOffset.toTimestampOffset(protocolOffsetToOffset(event.offset)), From a94c67c9798741dd3bfdf9f573914e675b93e753 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 3 Oct 2024 10:25:29 +0200 Subject: [PATCH 4/4] fewer apply factories, use built in serializer --- ...ReplicationProtoEventIntegrationSpec.scala | 72 +------------------ .../consumer/javadsl/GrpcReadJournal.scala | 15 ++-- .../consumer/scaladsl/GrpcReadJournal.scala | 42 ++--------- .../internal/ReplicationImpl.scala | 6 +- 4 files changed, 18 insertions(+), 117 deletions(-) diff --git a/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/ReplicationProtoEventIntegrationSpec.scala b/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/ReplicationProtoEventIntegrationSpec.scala index a37fdc0e6..7a04c7e1a 100644 --- a/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/ReplicationProtoEventIntegrationSpec.scala +++ b/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/ReplicationProtoEventIntegrationSpec.scala @@ -5,7 +5,6 @@ package akka.projection.grpc.replication import akka.Done -import akka.actor.ExtendedActorSystem import akka.actor.testkit.typed.scaladsl.ActorTestKit import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -32,92 +31,22 @@ import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors import akka.projection.grpc.replication.scaladsl.Replication import akka.projection.grpc.replication.scaladsl.ReplicationSettings import akka.projection.r2dbc.scaladsl.R2dbcReplication -import akka.serialization.BaseSerializer -import akka.serialization.ByteBufferSerializer -import akka.serialization.SerializerWithStringManifest import akka.testkit.SocketUtil -import com.google.protobuf.CodedInputStream -import com.google.protobuf.CodedOutputStream import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfterAll import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory import scalapb.GeneratedMessage -import scalapb.GeneratedMessageCompanion - -import java.nio.ByteBuffer import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.DurationInt object ReplicationProtoEventIntegrationSpec { - class ProtobufSerializer(val system: ExtendedActorSystem) - extends SerializerWithStringManifest - with BaseSerializer - with ByteBufferSerializer { - - private val TagChangedManifest = akka.projection.grpc.test.TagChanged.javaDescriptor.getName - private val GreetingChangedManifest = akka.projection.grpc.test.GreetingChanged.javaDescriptor.getName - - override def manifest(o: AnyRef): String = { - o match { - case msg: GeneratedMessage => msg.companion.javaDescriptor.getName - case _ => - throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") - } - } - - override def toBinary(o: AnyRef): Array[Byte] = { - o match { - case msg: GeneratedMessage => msg.toByteArray - case _ => - throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]") - } - } - - override def toBinary(o: AnyRef, buf: ByteBuffer): Unit = { - o match { - case msg: GeneratedMessage => - val output = CodedOutputStream.newInstance(buf) - msg.writeTo(output) - output.flush() - case _ => - throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]") - } - } - - override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { - getCompanion(manifest).parseFrom(bytes).asInstanceOf[AnyRef] - } - - override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = { - getCompanion(manifest).parseFrom(CodedInputStream.newInstance(buf)).asInstanceOf[AnyRef] - } - - private def getCompanion(manifest: String): GeneratedMessageCompanion[_] = manifest match { - case GreetingChangedManifest => akka.projection.grpc.test.GreetingChanged.messageCompanion - case TagChangedManifest => akka.projection.grpc.test.TagChanged.messageCompanion - case unknown => throw new IllegalArgumentException(s"Unknown manifest $unknown") - } - - } - private def config(dc: ReplicaId): Config = ConfigFactory.parseString(s""" akka.actor.provider = cluster - akka.actor { - serializers { - my-replication-serializer = "akka.projection.grpc.replication.ReplicationProtoEventIntegrationSpec$$ProtobufSerializer" - } - serialization-identifiers { - "akka.projection.grpc.replication.ReplicationProtoEventIntegrationSpec$$ProtobufSerializer" = 1528148901 - } - serialization-bindings { - "scalapb.GeneratedMessage" = my-replication-serializer - } - } akka.http.server.preview.enable-http2 = on akka.persistence.r2dbc { journal.table = "event_journal_${dc.id}" @@ -156,6 +85,7 @@ object ReplicationProtoEventIntegrationSpec { final case class SetTag(tag: String, replyTo: ActorRef[Done]) extends Command // events are defined directly as proto messages + // Note: uses the default auto-serialization of protobuf generated messages private implicit class ProtoLwwToScalaLww(protoLwwTime: akka.projection.grpc.test.LwwTime) { def toScala: LwwTime = LwwTime(protoLwwTime.timestamp, ReplicaId(protoLwwTime.originReplica)) } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala index 6df6b6871..baa4e52be 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala @@ -9,13 +9,12 @@ import java.util import java.util.Optional import java.util.UUID import java.util.concurrent.CompletionStage - import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ - import akka.Done import akka.NotUsed import akka.actor.ClassicActorSystemProvider +import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.grpc.GrpcClientSettings @@ -72,14 +71,14 @@ object GrpcReadJournal { clientSettings: GrpcClientSettings, protobufDescriptors: java.util.List[Descriptors.FileDescriptor]): GrpcReadJournal = { import akka.util.ccompat.JavaConverters._ + val protoAnySerialization = + new ProtoAnySerialization( + system.classicSystem.toTyped, + protobufDescriptors.asScala.toVector, + ProtoAnySerialization.Prefer.Java) new GrpcReadJournal( scaladsl - .GrpcReadJournal( - settings, - clientSettings, - protobufDescriptors.asScala.toList, - ProtoAnySerialization.Prefer.Java, - replicationSettings = None)(system)) + .GrpcReadJournal(settings, clientSettings, protoAnySerialization, replicationSettings = None)(system)) } } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index 74c7ca6b5..115118aac 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -8,7 +8,6 @@ import akka.Done import akka.NotUsed import akka.actor.ClassicActorSystemProvider import akka.actor.ExtendedActorSystem -import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.adapter._ import akka.annotation.InternalApi @@ -30,7 +29,6 @@ import akka.projection.grpc.consumer.GrpcQuerySettings import akka.projection.grpc.consumer.scaladsl import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilderOverrides import akka.projection.grpc.internal.ProjectionGrpcSerialization -import akka.projection.grpc.internal.DelegateToAkkaSerialization import akka.projection.grpc.internal.ConnectionException import akka.projection.grpc.internal.ProtoAnySerialization import akka.projection.grpc.internal.ProtobufProtocolConversions @@ -112,55 +110,25 @@ object GrpcReadJournal { settings: GrpcQuerySettings, clientSettings: GrpcClientSettings, protobufDescriptors: immutable.Seq[Descriptors.FileDescriptor])( - implicit system: ClassicActorSystemProvider): GrpcReadJournal = - apply(settings, clientSettings, protobufDescriptors, ProtoAnySerialization.Prefer.Scala, replicationSettings = None) - - /** - * INTERNAL API - */ - @InternalApi private[akka] def apply( - settings: GrpcQuerySettings, - clientSettings: GrpcClientSettings, - protobufDescriptors: immutable.Seq[Descriptors.FileDescriptor], - protobufPrefer: ProtoAnySerialization.Prefer, - replicationSettings: Option[ReplicationSettings[_]])( implicit system: ClassicActorSystemProvider): GrpcReadJournal = { - - // FIXME issue #702 This probably means that one GrpcReadJournal instance is created for each Projection instance, - // and therefore one grpc client for each. Is that fine or should the client be shared for same clientSettings? - val protoAnySerialization = - new ProtoAnySerialization(system.classicSystem.toTyped, protobufDescriptors, protobufPrefer) - - if (settings.initialConsumerFilter.nonEmpty) { - ConsumerFilter(system.classicSystem.toTyped).ref ! ConsumerFilter.UpdateFilter( - settings.streamId, - settings.initialConsumerFilter) - } - - new scaladsl.GrpcReadJournal( - system.classicSystem.asInstanceOf[ExtendedActorSystem], - settings, - withChannelBuilderOverrides(clientSettings), - protoAnySerialization, - replicationSettings) + new ProtoAnySerialization(system.classicSystem.toTyped, protobufDescriptors, ProtoAnySerialization.Prefer.Scala) + apply(settings, clientSettings, protoAnySerialization, replicationSettings = None) } /** * INTERNAL API - * - * Factory method for replication, with replication specific serialization */ @InternalApi private[akka] def apply( settings: GrpcQuerySettings, clientSettings: GrpcClientSettings, - replicationSettings: Option[ReplicationSettings[_]])(implicit system: ActorSystem[_]): GrpcReadJournal = { + wireSerialization: ProjectionGrpcSerialization, + replicationSettings: Option[ReplicationSettings[_]])( + implicit system: ClassicActorSystemProvider): GrpcReadJournal = { // FIXME issue #702 This probably means that one GrpcReadJournal instance is created for each Projection instance, // and therefore one grpc client for each. Is that fine or should the client be shared for same clientSettings? - val wireSerialization = new DelegateToAkkaSerialization(system) - if (settings.initialConsumerFilter.nonEmpty) { ConsumerFilter(system.classicSystem.toTyped).ref ! ConsumerFilter.UpdateFilter( settings.streamId, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala index d72da3f7e..d5875761a 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala @@ -36,6 +36,7 @@ import akka.projection.grpc.consumer.ConsumerFilter import akka.projection.grpc.consumer.GrpcQuerySettings import akka.projection.grpc.consumer.scaladsl.EventProducerPushDestination import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal +import akka.projection.grpc.internal.DelegateToAkkaSerialization import akka.projection.grpc.producer.scaladsl.EventProducer import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation @@ -164,7 +165,10 @@ private[akka] object ReplicationImpl { case None => s2 } } - val eventsBySlicesQuery = GrpcReadJournal(grpcQuerySettings, remoteReplica.grpcClientSettings, Some(settings)) + + val wireSerialization = new DelegateToAkkaSerialization(system) + val eventsBySlicesQuery = + GrpcReadJournal(grpcQuerySettings, remoteReplica.grpcClientSettings, wireSerialization, Some(settings)) log.infoN( "Starting {} projection streams{} consuming events for Replicated Entity [{}] from [{}] (at {}:{})", remoteReplica.numberOfConsumers,