From 84e19c646cd701d2b21dc637fcc953ff09e3b63c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 28 Aug 2023 15:05:25 +0200 Subject: [PATCH] perf: Avoid event serialization in producer push destination --- .../internal/ProtoAnySerializationSpec.scala | 59 ++++++- .../EventPusherConsumerServiceImpl.scala | 14 +- .../grpc/internal/ProtoAnySerialization.scala | 163 ++++++++++++++---- .../ProtobufProtocolConversions.scala | 61 +++++-- 4 files changed, 240 insertions(+), 57 deletions(-) diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/ProtoAnySerializationSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/ProtoAnySerializationSpec.scala index a89d25c58..529f2c071 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/ProtoAnySerializationSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/ProtoAnySerializationSpec.scala @@ -21,7 +21,15 @@ import org.scalatest.wordspec.AnyWordSpecLike import com.google.protobuf.any.{ Any => ScalaPbAny } import com.google.protobuf.{ Any => PbAny } -class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { +class ProtoAnySerializationSpec + extends ScalaTestWithActorTestKit(""" + akka.actor.serialization-bindings { + # // FIXME important to document this for the SerializedEvent optimization to work + "scalapb.GeneratedMessage" = proto + } + """) + with AnyWordSpecLike + with LogCapturing { private val serializationJava = new ProtoAnySerialization( @@ -43,6 +51,7 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp ProtoAnySerialization.Prefer.Scala) private val akkaSerialization = SerializationExtension(system.classicSystem) + private val akkaProtobufSerializer = akkaSerialization.serializerFor(classOf[com.google.protobuf.GeneratedMessageV3]) private val addLineItem = AddLineItem(name = "item", productId = "id", quantity = 10) @@ -60,6 +69,15 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp val deserializedEvent = serializationJava.deserialize(pbAny) deserializedEvent.getClass shouldBe classOf[com.google.protobuf.Timestamp] deserializedEvent shouldBe event + + val serializedEvent = serializationJava.toSerializedEvent(pbAny).get + serializedEvent.serializerId shouldBe akkaProtobufSerializer.identifier + serializedEvent.serializerManifest shouldBe event.getClass.getName + val deserializedEvent2 = akkaSerialization + .deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest) + .get + deserializedEvent2.getClass shouldBe classOf[com.google.protobuf.Timestamp] + deserializedEvent2 shouldBe event } "encode and decode ScalaPb proto message" in { @@ -68,6 +86,14 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp pbAny.typeUrl shouldBe "type.googleapis.com/akka.projection.grpc.internal.TestEvent" val deserializedEvent = serializationScala.deserialize(pbAny) deserializedEvent shouldBe event + + val serializedEvent = serializationScala.toSerializedEvent(pbAny).get + serializedEvent.serializerId shouldBe akkaProtobufSerializer.identifier + serializedEvent.serializerManifest shouldBe event.getClass.getName + val deserializedEvent2 = akkaSerialization + .deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest) + .get + deserializedEvent2 shouldBe event } "pass through Java proto Any" in { @@ -84,9 +110,11 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp serializationJava.deserialize(pbAny).asInstanceOf[PbAny] deserializedEvent.getTypeUrl shouldBe typeUrl deserializedEvent.getValue.toString(StandardCharsets.UTF_8) shouldBe value + + serializationJava.toSerializedEvent(pbAny) shouldBe None } - "pass through ScalaPb Any and decode it as preferred Any Any" in { + "pass through ScalaPb Any and decode it as preferred Any" in { val value = "hello" val typeUrl = "type.my.io/custom" val event = @@ -103,6 +131,9 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp serializationJava.deserialize(pbAny).asInstanceOf[PbAny] deserializedEventJava.getTypeUrl shouldBe typeUrl deserializedEventJava.getValue.toString(StandardCharsets.UTF_8) shouldBe value + + serializationScala.toSerializedEvent(pbAny) shouldBe None + serializationJava.toSerializedEvent(pbAny) shouldBe None } "pass through Java proto Any with Google typeUrl" in { @@ -124,6 +155,8 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp val deserializedEvent = serializationJava.deserialize(pbAny).asInstanceOf[PbAny] deserializedEvent.getTypeUrl shouldBe typeUrl com.google.protobuf.Timestamp.parseFrom(deserializedEvent.getValue) shouldBe value + + serializationJava.toSerializedEvent(pbAny) shouldBe None } "pass through ScalaPb Any with Google typeUrl" in { @@ -138,18 +171,25 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp serializationScala.deserialize(pbAny).asInstanceOf[ScalaPbAny] deserializedEvent.typeUrl shouldBe typeUrl TestEvent.parseFrom(deserializedEvent.value.toByteArray) shouldBe value + + serializationScala.toSerializedEvent(pbAny) shouldBe None } "encode and decode with Akka serialization with string manifest" in { val event = Address("akka", system.name, "localhost", 2552) val pbAny = serializationJava.serialize(event) val serializer = akkaSerialization.findSerializerFor(event) - // no manifest for String serializer - pbAny.typeUrl shouldBe s"ser.akka.io/${serializer.identifier}:${Serializers - .manifestFor(serializer, event)}" + val expectedTypeUrl = s"ser.akka.io/${serializer.identifier}:${Serializers.manifestFor(serializer, event)}" + pbAny.typeUrl shouldBe expectedTypeUrl val deserializedEvent = serializationJava.deserialize(pbAny) deserializedEvent shouldBe event + + val serializedEvent = serializationJava.toSerializedEvent(pbAny).get + val deserializedEvent2 = akkaSerialization + .deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest) + .get + deserializedEvent2 shouldBe event } "encode and decode with Akka serialization without string manifest" in { @@ -157,10 +197,17 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp val pbAny = serializationJava.serialize(event) val serializer = akkaSerialization.findSerializerFor(event) // no manifest for String serializer - pbAny.typeUrl shouldBe s"ser.akka.io/${serializer.identifier}" + val expectedTypeUrl = s"ser.akka.io/${serializer.identifier}" + pbAny.typeUrl shouldBe expectedTypeUrl val deserializedEvent = serializationJava.deserialize(pbAny) deserializedEvent shouldBe event + + val serializedEvent = serializationJava.toSerializedEvent(pbAny).get + val deserializedEvent2 = akkaSerialization + .deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest) + .get + deserializedEvent2 shouldBe event } "support se/deserializing java protobufs" in { diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala index 065b60e74..786033495 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala @@ -100,9 +100,14 @@ private[akka] final class EventPusherConsumerServiceImpl( interceptedTail .map { consumeEventIn => - if (consumeEventIn.message.isEvent) - ProtobufProtocolConversions.eventToEnvelope[Any](consumeEventIn.getEvent, protoAnySerialization) - else if (consumeEventIn.message.isFilteredEvent) { + if (consumeEventIn.message.isEvent) { + // When no transformation we don't need to deserialize the event and can use SerializedEvent + val deserializeEvent = transformer ne EventProducerPushDestination.Transformation.empty + ProtobufProtocolConversions.eventToEnvelope( + consumeEventIn.getEvent, + protoAnySerialization, + deserializeEvent) + } else if (consumeEventIn.message.isFilteredEvent) { ProtobufProtocolConversions.filteredEventToEnvelope[Any](consumeEventIn.getFilteredEvent) } else { throw new GrpcServiceException(Status.INVALID_ARGUMENT @@ -121,6 +126,9 @@ private[akka] final class EventPusherConsumerServiceImpl( transformedEventEnvelope.persistenceId, if (transformedEventEnvelope.filtered) " filtered" else "") + // Note that when there is no Transformation the event is SerializedEvent, which will be passed + // through the EventWriter and the journal can write that without additional serialization. + destination.eventWriter .askWithStatus[EventWriter.WriteAck](EventWriter.Write( transformedEventEnvelope.persistenceId, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala index 030b77115..1e28dc7c9 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala @@ -11,7 +11,11 @@ import scala.util.Try import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi +import akka.persistence.SerializedEvent +import akka.serialization.DisabledJavaSerializer import akka.serialization.SerializationExtension +import akka.serialization.Serializer +import akka.serialization.SerializerWithStringManifest import akka.serialization.Serializers import akka.util.ccompat.JavaConverters._ import com.google.common.base.CaseFormat @@ -51,14 +55,24 @@ import scalapb.options.Scalapb */ def parseFrom(bytes: ByteString): T + def messageClass: Class[_] + + def akkaSerializer: Option[Serializer] + } - private final class JavaPbResolvedType[T <: Message](parser: Parser[T]) extends ResolvedType[T] { + private final class JavaPbResolvedType[T <: Message]( + parser: Parser[T], + override val messageClass: Class[_], + override val akkaSerializer: Option[Serializer]) + extends ResolvedType[T] { override def parseFrom(bytes: ByteString): T = parser.parseFrom(bytes) } private final class ScalaPbResolvedType[T <: scalapb.GeneratedMessage]( - companion: scalapb.GeneratedMessageCompanion[_]) + companion: scalapb.GeneratedMessageCompanion[_], + override val messageClass: Class[_], + override val akkaSerializer: Option[Serializer]) extends ResolvedType[T] { override def parseFrom(bytes: ByteString): T = companion.parseFrom(bytes.newCodedInput()).asInstanceOf[T] } @@ -104,10 +118,21 @@ import scalapb.options.Scalapb import ProtoAnySerialization._ private val serialization = SerializationExtension(system.classicSystem) + private lazy val akkaProtobufSerializer: Serializer = + serialization.serializerFor(classOf[com.google.protobuf.GeneratedMessageV3]) + private lazy val isDefaultAkkaProtobufSerializer = { + val yes = akkaProtobufSerializer.getClass == classOf[akka.remote.serialization.ProtobufSerializer] + if (yes && akkaProtobufSerializer.isInstanceOf[SerializerWithStringManifest]) { + // just in case it is changed in the future + throw new IllegalStateException("ProtobufSerializer was expected to not be a SerializerWithStringManifest") + } + yes + } private val allDescriptors = flattenDescriptors(descriptors) - private val allTypes = (for { + // proto type name -> Descriptor + private val allTypes: Map[String, Descriptors.Descriptor] = (for { descriptor <- allDescriptors.values messageType <- descriptor.getMessageTypes.asScala } yield messageType.getFullName -> messageType).toMap @@ -139,12 +164,7 @@ import scalapb.options.Scalapb val serializer = serialization.findSerializerFor(otherAnyRef) val manifest = Serializers.manifestFor(serializer, otherAnyRef) val id = serializer.identifier - val typeUrl = - if (manifest.isEmpty) s"$AkkaSerializationTypeUrlPrefix$id" - else - s"$AkkaSerializationTypeUrlPrefix$id$AkkaTypeUrlManifestSeparator$manifest" - - ScalaPbAny(typeUrl, ByteString.copyFrom(bytes)) + ScalaPbAny(akkaSerializationTypeUrl(id, manifest), ByteString.copyFrom(bytes)) } } @@ -158,15 +178,7 @@ import scalapb.options.Scalapb } else if (typeUrl.startsWith(GoogleTypeUrlPrefix)) { decodeMessage(scalaPbAny) } else if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) { - val idAndManifest = - typeUrl.substring(AkkaSerializationTypeUrlPrefix.length) - val i = idAndManifest.indexOf(AkkaTypeUrlManifestSeparator) - val (id, manifest) = - if (i == -1) - idAndManifest.toInt -> "" - else - idAndManifest.substring(0, i).toInt -> idAndManifest.substring(i + 1) - + val (id, manifest) = akkaSerializerIdAndManifestFromTypeUrl(typeUrl) serialization.deserialize(scalaPbAny.value.toByteArray, id, manifest).get } else if (prefer == Prefer.Scala) { // when custom typeUrl @@ -177,6 +189,57 @@ import scalapb.options.Scalapb } } + def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent] = { + // see corresponding typeUrl cases in `deserialize` + + val typeUrl = scalaPbAny.typeUrl + if (typeUrl == ProtoAnyTypeUrl) { + // We don't try to optimize this case. One level of indirection too much, and probably not a common case. + None + } else if (typeUrl.startsWith(GoogleTypeUrlPrefix)) { + val resolvedType = resolveTypeUrl(typeUrl) + resolvedType.akkaSerializer match { + case Some(serializer) => + if (serializer.identifier == akkaProtobufSerializer.identifier && isDefaultAkkaProtobufSerializer) { + // this is the case for akka.remote.serialization.ProtobufSerializer, which is configured by default + // for protobuf messages + val manifest = if (serializer.includeManifest) resolvedType.messageClass.getName else "" + Some(new SerializedEvent(scalaPbAny.value.toByteArray, serializer.identifier, manifest)) + } else { + // we don't know how what a custom serializer would do + None + } + case None => + log.debug( + "No Akka Serializer for Protobuf message class [{}]. Consider defining it in `akka.actor.serialization-bindings`.", + resolvedType.messageClass.getName) + None + } + } else if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) { + val (id, manifest) = akkaSerializerIdAndManifestFromTypeUrl(typeUrl) + Some(new SerializedEvent(scalaPbAny.value.toByteArray, id, manifest)) + } else { + // We don't try to optimize this case. One level of indirection too much, and probably not a common case. + None + } + } + + private def akkaSerializationTypeUrl(serializerId: Int, manifest: String): String = { + if (manifest.isEmpty) s"$AkkaSerializationTypeUrlPrefix$serializerId" + else + s"$AkkaSerializationTypeUrlPrefix$serializerId$AkkaTypeUrlManifestSeparator$manifest" + } + + private def akkaSerializerIdAndManifestFromTypeUrl(typeUrl: String): (Int, String) = { + val idAndManifest = + typeUrl.substring(AkkaSerializationTypeUrlPrefix.length) + val i = idAndManifest.indexOf(AkkaTypeUrlManifestSeparator) + if (i == -1) + idAndManifest.toInt -> "" + else + idAndManifest.substring(0, i).toInt -> idAndManifest.substring(i + 1) + } + private def strippedFileName(fileName: String) = fileName.split(Array('/', '\\')).last.stripSuffix(".proto") @@ -215,7 +278,20 @@ import scalapb.options.Scalapb .getMethod("parser") .invoke(null) .asInstanceOf[Parser[com.google.protobuf.Message]] - Some(new JavaPbResolvedType(parser)) + val akkaSerializer = Try(serialization.serializerFor(clazz)).toOption + .flatMap { + case _: DisabledJavaSerializer => + // There will be a warning log from Akka "Using the Java serializer", which need additional clarification + // in this context. + log.warn( + "Missing Akka serializer for protobuf message class [{}]. Consider defining it in " + + "`akka.actor.serialization-bindings`. It was bound to disabled Java serializer, which " + + "is not what you want.", + clazz.getName) + None + case other => Some(other) + } + Some(new JavaPbResolvedType(parser, clazz, akkaSerializer)) } catch { case cnfe: ClassNotFoundException => @@ -271,7 +347,9 @@ import scalapb.options.Scalapb log.debug("Attempting to load scalapb.GeneratedMessageCompanion object {}", className) val companionObject = system.dynamicAccess.getObjectFor[GeneratedMessageCompanion[GeneratedMessage]](className).get - Some(new ScalaPbResolvedType(companionObject)) + val clazz = system.dynamicAccess.getClassFor[Any](className).get + val akkaSerializer = Try(serialization.serializerFor(clazz)).toOption + Some(new ScalaPbResolvedType(companionObject, clazz, akkaSerializer)) } catch { case cnfe: ClassNotFoundException => log.debug2("Failed to load class [{}] because: {}", className, cnfe.getMessage) @@ -304,8 +382,26 @@ import scalapb.options.Scalapb }) .get - private def resolveTypeUrl(typeName: String): Option[ResolvedType[_]] = + private def tryResolveTypeUrl(typeUrl: String): Option[ResolvedType[_]] = { + val typeName = typeNameFromTypeUrl(typeUrl) allTypes.get(typeName).map(resolveTypeDescriptor) + } + + private def resolveTypeUrl(typeUrl: String): ResolvedType[_] = + tryResolveTypeUrl(typeUrl) match { + case Some(parser) => + parser + case None => + val errMsg = + if (allDescriptors.isEmpty) + s"Unable to find Protobuf descriptor for type: [$typeUrl]. " + + "No descriptors defined when creating GrpcReadJournal. Note that GrpcReadJournal should be created " + + "with the GrpcReadJournal apply/create factory method and not from configuration via GrpcReadJournalProvider " + + "when using Protobuf serialization." + else + s"Unable to find Protobuf descriptor for type: [$typeUrl]." + throw SerializationException(errMsg) + } def encode(value: Any): ScalaPbAny = value match { @@ -338,18 +434,8 @@ import scalapb.options.Scalapb if (!typeUrl.startsWith(GoogleTypeUrlPrefix)) { log.warn2("Message type [{}] does not match type url prefix [{}]", typeUrl, GoogleTypeUrlPrefix) } - val typeName = typeUrl.split("/", 2) match { - case Array(_, typeName) => - typeName - case _ => - log.warn2( - "Message type [{}] does not have a url prefix, it should have one that matchers the type url prefix [{}]", - typeUrl, - GoogleTypeUrlPrefix) - typeUrl - } - resolveTypeUrl(typeName) match { + tryResolveTypeUrl(typeUrl) match { case Some(parser) => parser.parseFrom(any.value) case None => @@ -365,4 +451,17 @@ import scalapb.options.Scalapb } } + private def typeNameFromTypeUrl(typeUrl: String): String = { + typeUrl.split("/", 2) match { + case Array(_, typeName) => + typeName + case _ => + log.warn2( + "Message type [{}] does not have a url prefix, it should have one that matchers the type url prefix [{}]", + typeUrl, + GoogleTypeUrlPrefix) + typeUrl + } + } + } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala index 42cfcebac..4f0778c0d 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala @@ -121,25 +121,54 @@ private[akka] object ProtobufProtocolConversions { } } - def eventToEnvelope[Evt](event: Event, protoAnySerialization: ProtoAnySerialization): EventEnvelope[Evt] = { - val eventOffset = protocolOffsetToOffset(event.offset).asInstanceOf[TimestampOffset] - val evt = - event.payload.map(protoAnySerialization.deserialize(_).asInstanceOf[Evt]) + def eventToEnvelope[Evt](event: Event, protoAnySerialization: ProtoAnySerialization): EventEnvelope[Evt] = + eventToEnvelope(event, protoAnySerialization, deserializeEvent = true).asInstanceOf[EventEnvelope[Evt]] + def eventToEnvelope( + event: Event, + protoAnySerialization: ProtoAnySerialization, + deserializeEvent: Boolean): EventEnvelope[Any] = { + val eventOffset = protocolOffsetToOffset(event.offset).asInstanceOf[TimestampOffset] val metadata: Option[Any] = event.metadata.map(protoAnySerialization.deserialize) - new EventEnvelope( - eventOffset, - event.persistenceId, - event.seqNr, - evt, - eventOffset.timestamp.toEpochMilli, - eventMetadata = metadata, - PersistenceId.extractEntityType(event.persistenceId), - event.slice, - filtered = false, - source = event.source, - tags = event.tags.toSet) + def envelopeWithDeserializedEvent: EventEnvelope[Any] = { + val evt = event.payload.map(protoAnySerialization.deserialize) + new EventEnvelope( + eventOffset, + event.persistenceId, + event.seqNr, + evt, + eventOffset.timestamp.toEpochMilli, + eventMetadata = metadata, + PersistenceId.extractEntityType(event.persistenceId), + event.slice, + filtered = false, + source = event.source, + tags = event.tags.toSet) + } + + if (deserializeEvent || event.payload.isEmpty) { + envelopeWithDeserializedEvent + } else { + protoAnySerialization.toSerializedEvent(event.payload.get) match { + case Some(serializedEvent) => + new EventEnvelope( + eventOffset, + event.persistenceId, + event.seqNr, + eventOption = Some(serializedEvent), + eventOffset.timestamp.toEpochMilli, + eventMetadata = metadata, + PersistenceId.extractEntityType(event.persistenceId), + event.slice, + filtered = false, + source = event.source, + tags = event.tags.toSet) + case None => + // couldn't create SerializedEvent without deserialization, fallback to deserializeEvent = true + envelopeWithDeserializedEvent + } + } } def filteredEventToEnvelope[Evt](filtered: FilteredEvent): EventEnvelope[Evt] = {