diff --git a/akka-projection-grpc-tests/src/test/java/akka/projection/grpc/consumer/javadsl/EventProducerPushDestinationCompileTest.java b/akka-projection-grpc-tests/src/test/java/akka/projection/grpc/consumer/javadsl/EventProducerPushDestinationCompileTest.java index 5fd7af66a..ef750d2d0 100644 --- a/akka-projection-grpc-tests/src/test/java/akka/projection/grpc/consumer/javadsl/EventProducerPushDestinationCompileTest.java +++ b/akka-projection-grpc-tests/src/test/java/akka/projection/grpc/consumer/javadsl/EventProducerPushDestinationCompileTest.java @@ -83,7 +83,7 @@ public static void withFilters(ActorSystem system) { EventProducerPushDestination destination = EventProducerPushDestination.create("stream-id", protoDescriptors, system) .withConsumerFilters( - Arrays.asList(new ConsumerFilter.IncludeTopics(Collections.singleton("myhome/groundfloor/+/temperature"))) + Collections.singletonList(new ConsumerFilter.IncludeTopics(Collections.singleton("myhome/groundfloor/+/temperature"))) ); // #consumerFilters } @@ -94,7 +94,8 @@ public static void withTransformations(ActorSystem system) { EventProducerPushDestination.create("stream-id", protoDescriptors, system) .withTransformationForOrigin((String originId, Metadata metadata) -> Transformation.empty() - .registerPersistenceIdMapper(envelope -> envelope.persistenceId().replace("originalPrefix", "newPrefix")) + .registerPersistenceIdMapper(system, envelope -> + envelope.persistenceId().replace("originalPrefix", "newPrefix")) .registerTagMapper(String.class, envelope -> { Set newTags = new HashSet<>(); newTags.addAll(envelope.getTags()); diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/ProtoAnySerializationSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/ProtoAnySerializationSpec.scala index a89d25c58..76be7416b 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/ProtoAnySerializationSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/ProtoAnySerializationSpec.scala @@ -43,6 +43,7 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp ProtoAnySerialization.Prefer.Scala) private val akkaSerialization = SerializationExtension(system.classicSystem) + private val akkaProtobufSerializer = akkaSerialization.serializerFor(classOf[com.google.protobuf.GeneratedMessageV3]) private val addLineItem = AddLineItem(name = "item", productId = "id", quantity = 10) @@ -60,6 +61,15 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp val deserializedEvent = serializationJava.deserialize(pbAny) deserializedEvent.getClass shouldBe classOf[com.google.protobuf.Timestamp] deserializedEvent shouldBe event + + val serializedEvent = serializationJava.toSerializedEvent(pbAny).get + serializedEvent.serializerId shouldBe akkaProtobufSerializer.identifier + serializedEvent.serializerManifest shouldBe event.getClass.getName + val deserializedEvent2 = akkaSerialization + .deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest) + .get + deserializedEvent2.getClass shouldBe classOf[com.google.protobuf.Timestamp] + deserializedEvent2 shouldBe event } "encode and decode ScalaPb proto message" in { @@ -68,6 +78,14 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp pbAny.typeUrl shouldBe "type.googleapis.com/akka.projection.grpc.internal.TestEvent" val deserializedEvent = serializationScala.deserialize(pbAny) deserializedEvent shouldBe event + + val serializedEvent = serializationScala.toSerializedEvent(pbAny).get + serializedEvent.serializerId shouldBe akkaProtobufSerializer.identifier + serializedEvent.serializerManifest shouldBe event.getClass.getName + val deserializedEvent2 = akkaSerialization + .deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest) + .get + deserializedEvent2 shouldBe event } "pass through Java proto Any" in { @@ -84,9 +102,11 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp serializationJava.deserialize(pbAny).asInstanceOf[PbAny] deserializedEvent.getTypeUrl shouldBe typeUrl deserializedEvent.getValue.toString(StandardCharsets.UTF_8) shouldBe value + + serializationJava.toSerializedEvent(pbAny) shouldBe None } - "pass through ScalaPb Any and decode it as preferred Any Any" in { + "pass through ScalaPb Any and decode it as preferred Any" in { val value = "hello" val typeUrl = "type.my.io/custom" val event = @@ -103,6 +123,9 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp serializationJava.deserialize(pbAny).asInstanceOf[PbAny] deserializedEventJava.getTypeUrl shouldBe typeUrl deserializedEventJava.getValue.toString(StandardCharsets.UTF_8) shouldBe value + + serializationScala.toSerializedEvent(pbAny) shouldBe None + serializationJava.toSerializedEvent(pbAny) shouldBe None } "pass through Java proto Any with Google typeUrl" in { @@ -124,6 +147,8 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp val deserializedEvent = serializationJava.deserialize(pbAny).asInstanceOf[PbAny] deserializedEvent.getTypeUrl shouldBe typeUrl com.google.protobuf.Timestamp.parseFrom(deserializedEvent.getValue) shouldBe value + + serializationJava.toSerializedEvent(pbAny) shouldBe None } "pass through ScalaPb Any with Google typeUrl" in { @@ -138,18 +163,25 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp serializationScala.deserialize(pbAny).asInstanceOf[ScalaPbAny] deserializedEvent.typeUrl shouldBe typeUrl TestEvent.parseFrom(deserializedEvent.value.toByteArray) shouldBe value + + serializationScala.toSerializedEvent(pbAny) shouldBe None } "encode and decode with Akka serialization with string manifest" in { val event = Address("akka", system.name, "localhost", 2552) val pbAny = serializationJava.serialize(event) val serializer = akkaSerialization.findSerializerFor(event) - // no manifest for String serializer - pbAny.typeUrl shouldBe s"ser.akka.io/${serializer.identifier}:${Serializers - .manifestFor(serializer, event)}" + val expectedTypeUrl = s"ser.akka.io/${serializer.identifier}:${Serializers.manifestFor(serializer, event)}" + pbAny.typeUrl shouldBe expectedTypeUrl val deserializedEvent = serializationJava.deserialize(pbAny) deserializedEvent shouldBe event + + val serializedEvent = serializationJava.toSerializedEvent(pbAny).get + val deserializedEvent2 = akkaSerialization + .deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest) + .get + deserializedEvent2 shouldBe event } "encode and decode with Akka serialization without string manifest" in { @@ -157,10 +189,17 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp val pbAny = serializationJava.serialize(event) val serializer = akkaSerialization.findSerializerFor(event) // no manifest for String serializer - pbAny.typeUrl shouldBe s"ser.akka.io/${serializer.identifier}" + val expectedTypeUrl = s"ser.akka.io/${serializer.identifier}" + pbAny.typeUrl shouldBe expectedTypeUrl val deserializedEvent = serializationJava.deserialize(pbAny) deserializedEvent shouldBe event + + val serializedEvent = serializationJava.toSerializedEvent(pbAny).get + val deserializedEvent2 = akkaSerialization + .deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest) + .get + deserializedEvent2 shouldBe event } "support se/deserializing java protobufs" in { diff --git a/akka-projection-grpc/src/main/mima-filters/1.5.0-M3.backwards.excludes/SerializedEvent.excludes b/akka-projection-grpc/src/main/mima-filters/1.5.0-M3.backwards.excludes/SerializedEvent.excludes new file mode 100644 index 000000000..66b5692aa --- /dev/null +++ b/akka-projection-grpc/src/main/mima-filters/1.5.0-M3.backwards.excludes/SerializedEvent.excludes @@ -0,0 +1,7 @@ +# not yet in a published release, or internals +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.consumer.javadsl.Transformation.registerPersistenceIdMapper") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.consumer.scaladsl.EventProducerPushDestination#Transformation.registerPersistenceIdMapper") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.ProtoAnySerialization#JavaPbResolvedType.this") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.projection.grpc.internal.ProtoAnySerialization#ResolvedType.messageClass") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.projection.grpc.internal.ProtoAnySerialization#ResolvedType.akkaSerializer") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.ProtoAnySerialization#ScalaPbResolvedType.this") diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/Transformation.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/Transformation.scala index cb82a90f7..8a003c101 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/Transformation.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/Transformation.scala @@ -15,6 +15,8 @@ import java.util.function.{ Function => JFunction } import scala.compat.java8.OptionConverters._ import scala.reflect.ClassTag +import akka.actor.typed.ActorSystem + object Transformation { /** @@ -49,8 +51,8 @@ final class Transformation private (val delegate: scaladsl.EventProducerPushDest * same incoming persistence id to the same stored persistence id to not introduce gaps in the sequence numbers * and break consuming projections. */ - def registerPersistenceIdMapper(f: JFunction[EventEnvelope[Any], String]): Transformation = { - new Transformation(delegate.registerPersistenceIdMapper(f.apply)) + def registerPersistenceIdMapper(system: ActorSystem[_], f: JFunction[EventEnvelope[Any], String]): Transformation = { + new Transformation(delegate.registerPersistenceIdMapper(f.apply)(system)) } /** diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/EventProducerPushDestination.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/EventProducerPushDestination.scala index d182bf73f..47510f53a 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/EventProducerPushDestination.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/EventProducerPushDestination.scala @@ -10,7 +10,6 @@ import akka.grpc.scaladsl.Metadata import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse import akka.persistence.FilteredPayload -import akka.persistence.query.Offset import akka.persistence.query.typed.EventEnvelope import akka.projection.grpc.consumer.ConsumerFilter.FilterCriteria import akka.projection.grpc.consumer.EventProducerPushDestinationSettings @@ -18,10 +17,12 @@ import akka.projection.grpc.internal.EventPusherConsumerServiceImpl import akka.projection.grpc.internal.ProtoAnySerialization.Prefer import akka.projection.grpc.internal.proto.EventConsumerServicePowerApiHandler import com.google.protobuf.Descriptors - import scala.collection.immutable import scala.reflect.ClassTag +import akka.persistence.Persistence +import akka.persistence.typed.PersistenceId + /** * A passive consumer service for event producer push that can be bound as a gRPC endpoint accepting active producers * pushing events, for example to run a projection piercing firewalls or NAT. Events are pushed directly into the @@ -86,7 +87,7 @@ object EventProducerPushDestination { val mapTags = { (eventEnvelope: EventEnvelope[Any]) => val newTags = f(eventEnvelope.asInstanceOf[EventEnvelope[A]]) if (newTags eq eventEnvelope.tags) eventEnvelope - else copyEnvelope(eventEnvelope)(tags = newTags) + else eventEnvelope.withTags(newTags) } appendMapper(clazz, mapTags) } @@ -96,11 +97,17 @@ object EventProducerPushDestination { * same incoming persistence id to the same stored persistence id to not introduce gaps in the sequence numbers * and break consuming projections. */ - def registerPersistenceIdMapper(f: EventEnvelope[Any] => String): Transformation = { + def registerPersistenceIdMapper(f: EventEnvelope[Any] => String)( + implicit system: ActorSystem[_]): Transformation = { val mapId = { (eventEnvelope: EventEnvelope[Any]) => val newPid = f(eventEnvelope) - if (newPid eq eventEnvelope.persistenceId) eventEnvelope - else copyEnvelope(eventEnvelope)(persistenceId = newPid) + if (newPid eq eventEnvelope.persistenceId) + eventEnvelope + else { + val entityType = PersistenceId.extractEntityType(newPid) + val slice = Persistence(system).sliceForPersistenceId(newPid) + eventEnvelope.withPersistenceId(newPid, entityType, slice) + } } // needs to be untyped since not mapping filtered events the same way will cause gaps in seqnrs new Transformation(typedMappers, untypedMappers.andThen(mapId)) @@ -126,7 +133,7 @@ object EventProducerPushDestination { else { val newMaybePayload = f(eventEnvelope.asInstanceOf[EventEnvelope[A]]) if (newMaybePayload eq eventEnvelope.eventOption) eventEnvelope - else copyEnvelope(eventEnvelope)(eventOption = newMaybePayload) + else eventEnvelope.withEventOption(newMaybePayload) } } appendMapper(clazz, mapPayload) @@ -141,7 +148,7 @@ object EventProducerPushDestination { else { val newMaybePayload = f(eventEnvelope) if (newMaybePayload eq eventEnvelope.eventOption) eventEnvelope - else copyEnvelope(eventEnvelope)(eventOption = newMaybePayload) + else eventEnvelope.withEventOption(newMaybePayload) } } new Transformation(typedMappers, untypedMappers.andThen(anyPayloadMapper)) @@ -163,30 +170,6 @@ object EventProducerPushDestination { untypedMappers) } - private def copyEnvelope(original: EventEnvelope[Any])( - offset: Offset = original.offset, - persistenceId: String = original.persistenceId, - sequenceNr: Long = original.sequenceNr, - eventOption: Option[Any] = original.eventOption, - timestamp: Long = original.timestamp, - eventMetadata: Option[Any] = original.eventMetadata, - entityType: String = original.entityType, - slice: Int = original.slice, - filtered: Boolean = original.filtered, - source: String = original.source, - tags: Set[String] = original.tags): EventEnvelope[Any] = - new EventEnvelope[Any]( - offset, - persistenceId, - sequenceNr, - eventOption, - timestamp, - eventMetadata, - entityType, - slice, - filtered, - source, - tags) } def grpcServiceHandler(eventConsumer: EventProducerPushDestination)( diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala index 065b60e74..786033495 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala @@ -100,9 +100,14 @@ private[akka] final class EventPusherConsumerServiceImpl( interceptedTail .map { consumeEventIn => - if (consumeEventIn.message.isEvent) - ProtobufProtocolConversions.eventToEnvelope[Any](consumeEventIn.getEvent, protoAnySerialization) - else if (consumeEventIn.message.isFilteredEvent) { + if (consumeEventIn.message.isEvent) { + // When no transformation we don't need to deserialize the event and can use SerializedEvent + val deserializeEvent = transformer ne EventProducerPushDestination.Transformation.empty + ProtobufProtocolConversions.eventToEnvelope( + consumeEventIn.getEvent, + protoAnySerialization, + deserializeEvent) + } else if (consumeEventIn.message.isFilteredEvent) { ProtobufProtocolConversions.filteredEventToEnvelope[Any](consumeEventIn.getFilteredEvent) } else { throw new GrpcServiceException(Status.INVALID_ARGUMENT @@ -121,6 +126,9 @@ private[akka] final class EventPusherConsumerServiceImpl( transformedEventEnvelope.persistenceId, if (transformedEventEnvelope.filtered) " filtered" else "") + // Note that when there is no Transformation the event is SerializedEvent, which will be passed + // through the EventWriter and the journal can write that without additional serialization. + destination.eventWriter .askWithStatus[EventWriter.WriteAck](EventWriter.Write( transformedEventEnvelope.persistenceId, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala index 030b77115..e337d1f7e 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala @@ -11,7 +11,10 @@ import scala.util.Try import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi +import akka.persistence.SerializedEvent import akka.serialization.SerializationExtension +import akka.serialization.Serializer +import akka.serialization.SerializerWithStringManifest import akka.serialization.Serializers import akka.util.ccompat.JavaConverters._ import com.google.common.base.CaseFormat @@ -51,14 +54,18 @@ import scalapb.options.Scalapb */ def parseFrom(bytes: ByteString): T + def messageClass: Class[_] + } - private final class JavaPbResolvedType[T <: Message](parser: Parser[T]) extends ResolvedType[T] { + private final class JavaPbResolvedType[T <: Message](parser: Parser[T], override val messageClass: Class[_]) + extends ResolvedType[T] { override def parseFrom(bytes: ByteString): T = parser.parseFrom(bytes) } private final class ScalaPbResolvedType[T <: scalapb.GeneratedMessage]( - companion: scalapb.GeneratedMessageCompanion[_]) + companion: scalapb.GeneratedMessageCompanion[_], + override val messageClass: Class[_]) extends ResolvedType[T] { override def parseFrom(bytes: ByteString): T = companion.parseFrom(bytes.newCodedInput()).asInstanceOf[T] } @@ -104,10 +111,21 @@ import scalapb.options.Scalapb import ProtoAnySerialization._ private val serialization = SerializationExtension(system.classicSystem) + private lazy val akkaProtobufSerializer: Serializer = + serialization.serializerFor(classOf[com.google.protobuf.GeneratedMessageV3]) + private lazy val isDefaultAkkaProtobufSerializer = { + val yes = akkaProtobufSerializer.getClass == classOf[akka.remote.serialization.ProtobufSerializer] + if (yes && akkaProtobufSerializer.isInstanceOf[SerializerWithStringManifest]) { + // just in case it is changed in the future + throw new IllegalStateException("ProtobufSerializer was expected to not be a SerializerWithStringManifest") + } + yes + } private val allDescriptors = flattenDescriptors(descriptors) - private val allTypes = (for { + // proto type name -> Descriptor + private val allTypes: Map[String, Descriptors.Descriptor] = (for { descriptor <- allDescriptors.values messageType <- descriptor.getMessageTypes.asScala } yield messageType.getFullName -> messageType).toMap @@ -139,12 +157,7 @@ import scalapb.options.Scalapb val serializer = serialization.findSerializerFor(otherAnyRef) val manifest = Serializers.manifestFor(serializer, otherAnyRef) val id = serializer.identifier - val typeUrl = - if (manifest.isEmpty) s"$AkkaSerializationTypeUrlPrefix$id" - else - s"$AkkaSerializationTypeUrlPrefix$id$AkkaTypeUrlManifestSeparator$manifest" - - ScalaPbAny(typeUrl, ByteString.copyFrom(bytes)) + ScalaPbAny(akkaSerializationTypeUrl(id, manifest), ByteString.copyFrom(bytes)) } } @@ -158,15 +171,7 @@ import scalapb.options.Scalapb } else if (typeUrl.startsWith(GoogleTypeUrlPrefix)) { decodeMessage(scalaPbAny) } else if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) { - val idAndManifest = - typeUrl.substring(AkkaSerializationTypeUrlPrefix.length) - val i = idAndManifest.indexOf(AkkaTypeUrlManifestSeparator) - val (id, manifest) = - if (i == -1) - idAndManifest.toInt -> "" - else - idAndManifest.substring(0, i).toInt -> idAndManifest.substring(i + 1) - + val (id, manifest) = akkaSerializerIdAndManifestFromTypeUrl(typeUrl) serialization.deserialize(scalaPbAny.value.toByteArray, id, manifest).get } else if (prefer == Prefer.Scala) { // when custom typeUrl @@ -177,6 +182,64 @@ import scalapb.options.Scalapb } } + def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent] = { + // see corresponding typeUrl cases in `deserialize` + + val typeUrl = scalaPbAny.typeUrl + if (typeUrl == ProtoAnyTypeUrl) { + // We don't try to optimize this case. One level of indirection too much, and probably not a common case. + None + } else if (typeUrl.startsWith(GoogleTypeUrlPrefix)) { + val resolvedType = resolveTypeUrl(typeUrl) + akkaSerializerForSerializedEvent(resolvedType.messageClass).map { serializer => + val manifest = if (serializer.includeManifest) resolvedType.messageClass.getName else "" + new SerializedEvent(scalaPbAny.value.toByteArray, serializer.identifier, manifest) + } + } else if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) { + val (id, manifest) = akkaSerializerIdAndManifestFromTypeUrl(typeUrl) + Some(new SerializedEvent(scalaPbAny.value.toByteArray, id, manifest)) + } else { + // We don't try to optimize this case. One level of indirection too much, and probably not a common case. + None + } + } + + private def akkaSerializerForSerializedEvent(messageClass: Class[_]): Option[Serializer] = { + val akkaSerializer = Try(serialization.serializerFor(messageClass)).toOption + akkaSerializer match { + case s @ Some(serializer) => + if (serializer.identifier == akkaProtobufSerializer.identifier && isDefaultAkkaProtobufSerializer) { + // this is the case for akka.remote.serialization.ProtobufSerializer, which is configured by default + // for protobuf messages + s + } else { + // we don't know how what a custom serializer would do + None + } + case None => + log.debug( + "No Akka Serializer for Protobuf message class [{}]. Consider defining it in `akka.actor.serialization-bindings`.", + messageClass.getName) + None + } + } + + private def akkaSerializationTypeUrl(serializerId: Int, manifest: String): String = { + if (manifest.isEmpty) s"$AkkaSerializationTypeUrlPrefix$serializerId" + else + s"$AkkaSerializationTypeUrlPrefix$serializerId$AkkaTypeUrlManifestSeparator$manifest" + } + + private def akkaSerializerIdAndManifestFromTypeUrl(typeUrl: String): (Int, String) = { + val idAndManifest = + typeUrl.substring(AkkaSerializationTypeUrlPrefix.length) + val i = idAndManifest.indexOf(AkkaTypeUrlManifestSeparator) + if (i == -1) + idAndManifest.toInt -> "" + else + idAndManifest.substring(0, i).toInt -> idAndManifest.substring(i + 1) + } + private def strippedFileName(fileName: String) = fileName.split(Array('/', '\\')).last.stripSuffix(".proto") @@ -215,7 +278,7 @@ import scalapb.options.Scalapb .getMethod("parser") .invoke(null) .asInstanceOf[Parser[com.google.protobuf.Message]] - Some(new JavaPbResolvedType(parser)) + Some(new JavaPbResolvedType(parser, clazz)) } catch { case cnfe: ClassNotFoundException => @@ -271,7 +334,8 @@ import scalapb.options.Scalapb log.debug("Attempting to load scalapb.GeneratedMessageCompanion object {}", className) val companionObject = system.dynamicAccess.getObjectFor[GeneratedMessageCompanion[GeneratedMessage]](className).get - Some(new ScalaPbResolvedType(companionObject)) + val clazz = system.dynamicAccess.getClassFor[Any](className).get + Some(new ScalaPbResolvedType(companionObject, clazz)) } catch { case cnfe: ClassNotFoundException => log.debug2("Failed to load class [{}] because: {}", className, cnfe.getMessage) @@ -304,8 +368,26 @@ import scalapb.options.Scalapb }) .get - private def resolveTypeUrl(typeName: String): Option[ResolvedType[_]] = + private def tryResolveTypeUrl(typeUrl: String): Option[ResolvedType[_]] = { + val typeName = typeNameFromTypeUrl(typeUrl) allTypes.get(typeName).map(resolveTypeDescriptor) + } + + private def resolveTypeUrl(typeUrl: String): ResolvedType[_] = + tryResolveTypeUrl(typeUrl) match { + case Some(parser) => + parser + case None => + val errMsg = + if (allDescriptors.isEmpty) + s"Unable to find Protobuf descriptor for type: [$typeUrl]. " + + "No descriptors defined when creating GrpcReadJournal. Note that GrpcReadJournal should be created " + + "with the GrpcReadJournal apply/create factory method and not from configuration via GrpcReadJournalProvider " + + "when using Protobuf serialization." + else + s"Unable to find Protobuf descriptor for type: [$typeUrl]." + throw SerializationException(errMsg) + } def encode(value: Any): ScalaPbAny = value match { @@ -338,18 +420,8 @@ import scalapb.options.Scalapb if (!typeUrl.startsWith(GoogleTypeUrlPrefix)) { log.warn2("Message type [{}] does not match type url prefix [{}]", typeUrl, GoogleTypeUrlPrefix) } - val typeName = typeUrl.split("/", 2) match { - case Array(_, typeName) => - typeName - case _ => - log.warn2( - "Message type [{}] does not have a url prefix, it should have one that matchers the type url prefix [{}]", - typeUrl, - GoogleTypeUrlPrefix) - typeUrl - } - resolveTypeUrl(typeName) match { + tryResolveTypeUrl(typeUrl) match { case Some(parser) => parser.parseFrom(any.value) case None => @@ -365,4 +437,17 @@ import scalapb.options.Scalapb } } + private def typeNameFromTypeUrl(typeUrl: String): String = { + typeUrl.split("/", 2) match { + case Array(_, typeName) => + typeName + case _ => + log.warn2( + "Message type [{}] does not have a url prefix, it should have one that matchers the type url prefix [{}]", + typeUrl, + GoogleTypeUrlPrefix) + typeUrl + } + } + } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala index 42cfcebac..4f0778c0d 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala @@ -121,25 +121,54 @@ private[akka] object ProtobufProtocolConversions { } } - def eventToEnvelope[Evt](event: Event, protoAnySerialization: ProtoAnySerialization): EventEnvelope[Evt] = { - val eventOffset = protocolOffsetToOffset(event.offset).asInstanceOf[TimestampOffset] - val evt = - event.payload.map(protoAnySerialization.deserialize(_).asInstanceOf[Evt]) + def eventToEnvelope[Evt](event: Event, protoAnySerialization: ProtoAnySerialization): EventEnvelope[Evt] = + eventToEnvelope(event, protoAnySerialization, deserializeEvent = true).asInstanceOf[EventEnvelope[Evt]] + def eventToEnvelope( + event: Event, + protoAnySerialization: ProtoAnySerialization, + deserializeEvent: Boolean): EventEnvelope[Any] = { + val eventOffset = protocolOffsetToOffset(event.offset).asInstanceOf[TimestampOffset] val metadata: Option[Any] = event.metadata.map(protoAnySerialization.deserialize) - new EventEnvelope( - eventOffset, - event.persistenceId, - event.seqNr, - evt, - eventOffset.timestamp.toEpochMilli, - eventMetadata = metadata, - PersistenceId.extractEntityType(event.persistenceId), - event.slice, - filtered = false, - source = event.source, - tags = event.tags.toSet) + def envelopeWithDeserializedEvent: EventEnvelope[Any] = { + val evt = event.payload.map(protoAnySerialization.deserialize) + new EventEnvelope( + eventOffset, + event.persistenceId, + event.seqNr, + evt, + eventOffset.timestamp.toEpochMilli, + eventMetadata = metadata, + PersistenceId.extractEntityType(event.persistenceId), + event.slice, + filtered = false, + source = event.source, + tags = event.tags.toSet) + } + + if (deserializeEvent || event.payload.isEmpty) { + envelopeWithDeserializedEvent + } else { + protoAnySerialization.toSerializedEvent(event.payload.get) match { + case Some(serializedEvent) => + new EventEnvelope( + eventOffset, + event.persistenceId, + event.seqNr, + eventOption = Some(serializedEvent), + eventOffset.timestamp.toEpochMilli, + eventMetadata = metadata, + PersistenceId.extractEntityType(event.persistenceId), + event.slice, + filtered = false, + source = event.source, + tags = event.tags.toSet) + case None => + // couldn't create SerializedEvent without deserialization, fallback to deserializeEvent = true + envelopeWithDeserializedEvent + } + } } def filteredEventToEnvelope[Evt](filtered: FilteredEvent): EventEnvelope[Evt] = {