Skip to content

Commit

Permalink
perf: Avoid event serialization in producer push destination
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Sep 5, 2023
1 parent 85f3325 commit 84e19c6
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ import org.scalatest.wordspec.AnyWordSpecLike
import com.google.protobuf.any.{ Any => ScalaPbAny }
import com.google.protobuf.{ Any => PbAny }

class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {
class ProtoAnySerializationSpec
extends ScalaTestWithActorTestKit("""
akka.actor.serialization-bindings {
# // FIXME important to document this for the SerializedEvent optimization to work
"scalapb.GeneratedMessage" = proto
}
""")
with AnyWordSpecLike
with LogCapturing {

private val serializationJava =
new ProtoAnySerialization(
Expand All @@ -43,6 +51,7 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
ProtoAnySerialization.Prefer.Scala)

private val akkaSerialization = SerializationExtension(system.classicSystem)
private val akkaProtobufSerializer = akkaSerialization.serializerFor(classOf[com.google.protobuf.GeneratedMessageV3])

private val addLineItem = AddLineItem(name = "item", productId = "id", quantity = 10)

Expand All @@ -60,6 +69,15 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
val deserializedEvent = serializationJava.deserialize(pbAny)
deserializedEvent.getClass shouldBe classOf[com.google.protobuf.Timestamp]
deserializedEvent shouldBe event

val serializedEvent = serializationJava.toSerializedEvent(pbAny).get
serializedEvent.serializerId shouldBe akkaProtobufSerializer.identifier
serializedEvent.serializerManifest shouldBe event.getClass.getName
val deserializedEvent2 = akkaSerialization
.deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest)
.get
deserializedEvent2.getClass shouldBe classOf[com.google.protobuf.Timestamp]
deserializedEvent2 shouldBe event
}

"encode and decode ScalaPb proto message" in {
Expand All @@ -68,6 +86,14 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
pbAny.typeUrl shouldBe "type.googleapis.com/akka.projection.grpc.internal.TestEvent"
val deserializedEvent = serializationScala.deserialize(pbAny)
deserializedEvent shouldBe event

val serializedEvent = serializationScala.toSerializedEvent(pbAny).get
serializedEvent.serializerId shouldBe akkaProtobufSerializer.identifier
serializedEvent.serializerManifest shouldBe event.getClass.getName
val deserializedEvent2 = akkaSerialization
.deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest)
.get
deserializedEvent2 shouldBe event
}

"pass through Java proto Any" in {
Expand All @@ -84,9 +110,11 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
serializationJava.deserialize(pbAny).asInstanceOf[PbAny]
deserializedEvent.getTypeUrl shouldBe typeUrl
deserializedEvent.getValue.toString(StandardCharsets.UTF_8) shouldBe value

serializationJava.toSerializedEvent(pbAny) shouldBe None
}

"pass through ScalaPb Any and decode it as preferred Any Any" in {
"pass through ScalaPb Any and decode it as preferred Any" in {
val value = "hello"
val typeUrl = "type.my.io/custom"
val event =
Expand All @@ -103,6 +131,9 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
serializationJava.deserialize(pbAny).asInstanceOf[PbAny]
deserializedEventJava.getTypeUrl shouldBe typeUrl
deserializedEventJava.getValue.toString(StandardCharsets.UTF_8) shouldBe value

serializationScala.toSerializedEvent(pbAny) shouldBe None
serializationJava.toSerializedEvent(pbAny) shouldBe None
}

"pass through Java proto Any with Google typeUrl" in {
Expand All @@ -124,6 +155,8 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
val deserializedEvent = serializationJava.deserialize(pbAny).asInstanceOf[PbAny]
deserializedEvent.getTypeUrl shouldBe typeUrl
com.google.protobuf.Timestamp.parseFrom(deserializedEvent.getValue) shouldBe value

serializationJava.toSerializedEvent(pbAny) shouldBe None
}

"pass through ScalaPb Any with Google typeUrl" in {
Expand All @@ -138,29 +171,43 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
serializationScala.deserialize(pbAny).asInstanceOf[ScalaPbAny]
deserializedEvent.typeUrl shouldBe typeUrl
TestEvent.parseFrom(deserializedEvent.value.toByteArray) shouldBe value

serializationScala.toSerializedEvent(pbAny) shouldBe None
}

"encode and decode with Akka serialization with string manifest" in {
val event = Address("akka", system.name, "localhost", 2552)
val pbAny = serializationJava.serialize(event)
val serializer = akkaSerialization.findSerializerFor(event)
// no manifest for String serializer
pbAny.typeUrl shouldBe s"ser.akka.io/${serializer.identifier}:${Serializers
.manifestFor(serializer, event)}"
val expectedTypeUrl = s"ser.akka.io/${serializer.identifier}:${Serializers.manifestFor(serializer, event)}"
pbAny.typeUrl shouldBe expectedTypeUrl

val deserializedEvent = serializationJava.deserialize(pbAny)
deserializedEvent shouldBe event

val serializedEvent = serializationJava.toSerializedEvent(pbAny).get
val deserializedEvent2 = akkaSerialization
.deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest)
.get
deserializedEvent2 shouldBe event
}

"encode and decode with Akka serialization without string manifest" in {
val event = "e1"
val pbAny = serializationJava.serialize(event)
val serializer = akkaSerialization.findSerializerFor(event)
// no manifest for String serializer
pbAny.typeUrl shouldBe s"ser.akka.io/${serializer.identifier}"
val expectedTypeUrl = s"ser.akka.io/${serializer.identifier}"
pbAny.typeUrl shouldBe expectedTypeUrl

val deserializedEvent = serializationJava.deserialize(pbAny)
deserializedEvent shouldBe event

val serializedEvent = serializationJava.toSerializedEvent(pbAny).get
val deserializedEvent2 = akkaSerialization
.deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest)
.get
deserializedEvent2 shouldBe event
}

"support se/deserializing java protobufs" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,14 @@ private[akka] final class EventPusherConsumerServiceImpl(

interceptedTail
.map { consumeEventIn =>
if (consumeEventIn.message.isEvent)
ProtobufProtocolConversions.eventToEnvelope[Any](consumeEventIn.getEvent, protoAnySerialization)
else if (consumeEventIn.message.isFilteredEvent) {
if (consumeEventIn.message.isEvent) {
// When no transformation we don't need to deserialize the event and can use SerializedEvent
val deserializeEvent = transformer ne EventProducerPushDestination.Transformation.empty
ProtobufProtocolConversions.eventToEnvelope(
consumeEventIn.getEvent,
protoAnySerialization,
deserializeEvent)
} else if (consumeEventIn.message.isFilteredEvent) {
ProtobufProtocolConversions.filteredEventToEnvelope[Any](consumeEventIn.getFilteredEvent)
} else {
throw new GrpcServiceException(Status.INVALID_ARGUMENT
Expand All @@ -121,6 +126,9 @@ private[akka] final class EventPusherConsumerServiceImpl(
transformedEventEnvelope.persistenceId,
if (transformedEventEnvelope.filtered) " filtered" else "")

// Note that when there is no Transformation the event is SerializedEvent, which will be passed
// through the EventWriter and the journal can write that without additional serialization.

destination.eventWriter
.askWithStatus[EventWriter.WriteAck](EventWriter.Write(
transformedEventEnvelope.persistenceId,
Expand Down
Loading

0 comments on commit 84e19c6

Please sign in to comment.