Skip to content

Commit

Permalink
perf: Avoid event serialization in producer push destination (#966)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Sep 7, 2023
1 parent d94f009 commit 51abe98
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static void withFilters(ActorSystem<?> system) {
EventProducerPushDestination destination =
EventProducerPushDestination.create("stream-id", protoDescriptors, system)
.withConsumerFilters(
Arrays.asList(new ConsumerFilter.IncludeTopics(Collections.singleton("myhome/groundfloor/+/temperature")))
Collections.singletonList(new ConsumerFilter.IncludeTopics(Collections.singleton("myhome/groundfloor/+/temperature")))
);
// #consumerFilters
}
Expand All @@ -94,7 +94,8 @@ public static void withTransformations(ActorSystem<?> system) {
EventProducerPushDestination.create("stream-id", protoDescriptors, system)
.withTransformationForOrigin((String originId, Metadata metadata) ->
Transformation.empty()
.registerPersistenceIdMapper(envelope -> envelope.persistenceId().replace("originalPrefix", "newPrefix"))
.registerPersistenceIdMapper(system, envelope ->
envelope.persistenceId().replace("originalPrefix", "newPrefix"))
.registerTagMapper(String.class, envelope -> {
Set<String> newTags = new HashSet<>();
newTags.addAll(envelope.getTags());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
ProtoAnySerialization.Prefer.Scala)

private val akkaSerialization = SerializationExtension(system.classicSystem)
private val akkaProtobufSerializer = akkaSerialization.serializerFor(classOf[com.google.protobuf.GeneratedMessageV3])

private val addLineItem = AddLineItem(name = "item", productId = "id", quantity = 10)

Expand All @@ -60,6 +61,15 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
val deserializedEvent = serializationJava.deserialize(pbAny)
deserializedEvent.getClass shouldBe classOf[com.google.protobuf.Timestamp]
deserializedEvent shouldBe event

val serializedEvent = serializationJava.toSerializedEvent(pbAny).get
serializedEvent.serializerId shouldBe akkaProtobufSerializer.identifier
serializedEvent.serializerManifest shouldBe event.getClass.getName
val deserializedEvent2 = akkaSerialization
.deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest)
.get
deserializedEvent2.getClass shouldBe classOf[com.google.protobuf.Timestamp]
deserializedEvent2 shouldBe event
}

"encode and decode ScalaPb proto message" in {
Expand All @@ -68,6 +78,14 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
pbAny.typeUrl shouldBe "type.googleapis.com/akka.projection.grpc.internal.TestEvent"
val deserializedEvent = serializationScala.deserialize(pbAny)
deserializedEvent shouldBe event

val serializedEvent = serializationScala.toSerializedEvent(pbAny).get
serializedEvent.serializerId shouldBe akkaProtobufSerializer.identifier
serializedEvent.serializerManifest shouldBe event.getClass.getName
val deserializedEvent2 = akkaSerialization
.deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest)
.get
deserializedEvent2 shouldBe event
}

"pass through Java proto Any" in {
Expand All @@ -84,9 +102,11 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
serializationJava.deserialize(pbAny).asInstanceOf[PbAny]
deserializedEvent.getTypeUrl shouldBe typeUrl
deserializedEvent.getValue.toString(StandardCharsets.UTF_8) shouldBe value

serializationJava.toSerializedEvent(pbAny) shouldBe None
}

"pass through ScalaPb Any and decode it as preferred Any Any" in {
"pass through ScalaPb Any and decode it as preferred Any" in {
val value = "hello"
val typeUrl = "type.my.io/custom"
val event =
Expand All @@ -103,6 +123,9 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
serializationJava.deserialize(pbAny).asInstanceOf[PbAny]
deserializedEventJava.getTypeUrl shouldBe typeUrl
deserializedEventJava.getValue.toString(StandardCharsets.UTF_8) shouldBe value

serializationScala.toSerializedEvent(pbAny) shouldBe None
serializationJava.toSerializedEvent(pbAny) shouldBe None
}

"pass through Java proto Any with Google typeUrl" in {
Expand All @@ -124,6 +147,8 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
val deserializedEvent = serializationJava.deserialize(pbAny).asInstanceOf[PbAny]
deserializedEvent.getTypeUrl shouldBe typeUrl
com.google.protobuf.Timestamp.parseFrom(deserializedEvent.getValue) shouldBe value

serializationJava.toSerializedEvent(pbAny) shouldBe None
}

"pass through ScalaPb Any with Google typeUrl" in {
Expand All @@ -138,29 +163,43 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp
serializationScala.deserialize(pbAny).asInstanceOf[ScalaPbAny]
deserializedEvent.typeUrl shouldBe typeUrl
TestEvent.parseFrom(deserializedEvent.value.toByteArray) shouldBe value

serializationScala.toSerializedEvent(pbAny) shouldBe None
}

"encode and decode with Akka serialization with string manifest" in {
val event = Address("akka", system.name, "localhost", 2552)
val pbAny = serializationJava.serialize(event)
val serializer = akkaSerialization.findSerializerFor(event)
// no manifest for String serializer
pbAny.typeUrl shouldBe s"ser.akka.io/${serializer.identifier}:${Serializers
.manifestFor(serializer, event)}"
val expectedTypeUrl = s"ser.akka.io/${serializer.identifier}:${Serializers.manifestFor(serializer, event)}"
pbAny.typeUrl shouldBe expectedTypeUrl

val deserializedEvent = serializationJava.deserialize(pbAny)
deserializedEvent shouldBe event

val serializedEvent = serializationJava.toSerializedEvent(pbAny).get
val deserializedEvent2 = akkaSerialization
.deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest)
.get
deserializedEvent2 shouldBe event
}

"encode and decode with Akka serialization without string manifest" in {
val event = "e1"
val pbAny = serializationJava.serialize(event)
val serializer = akkaSerialization.findSerializerFor(event)
// no manifest for String serializer
pbAny.typeUrl shouldBe s"ser.akka.io/${serializer.identifier}"
val expectedTypeUrl = s"ser.akka.io/${serializer.identifier}"
pbAny.typeUrl shouldBe expectedTypeUrl

val deserializedEvent = serializationJava.deserialize(pbAny)
deserializedEvent shouldBe event

val serializedEvent = serializationJava.toSerializedEvent(pbAny).get
val deserializedEvent2 = akkaSerialization
.deserialize(serializedEvent.bytes, serializedEvent.serializerId, serializedEvent.serializerManifest)
.get
deserializedEvent2 shouldBe event
}

"support se/deserializing java protobufs" in {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# not yet in a published release, or internals
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.consumer.javadsl.Transformation.registerPersistenceIdMapper")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.consumer.scaladsl.EventProducerPushDestination#Transformation.registerPersistenceIdMapper")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.ProtoAnySerialization#JavaPbResolvedType.this")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.projection.grpc.internal.ProtoAnySerialization#ResolvedType.messageClass")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.projection.grpc.internal.ProtoAnySerialization#ResolvedType.akkaSerializer")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.ProtoAnySerialization#ScalaPbResolvedType.this")
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import java.util.function.{ Function => JFunction }
import scala.compat.java8.OptionConverters._
import scala.reflect.ClassTag

import akka.actor.typed.ActorSystem

object Transformation {

/**
Expand Down Expand Up @@ -49,8 +51,8 @@ final class Transformation private (val delegate: scaladsl.EventProducerPushDest
* same incoming persistence id to the same stored persistence id to not introduce gaps in the sequence numbers
* and break consuming projections.
*/
def registerPersistenceIdMapper(f: JFunction[EventEnvelope[Any], String]): Transformation = {
new Transformation(delegate.registerPersistenceIdMapper(f.apply))
def registerPersistenceIdMapper(system: ActorSystem[_], f: JFunction[EventEnvelope[Any], String]): Transformation = {
new Transformation(delegate.registerPersistenceIdMapper(f.apply)(system))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ import akka.grpc.scaladsl.Metadata
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.persistence.FilteredPayload
import akka.persistence.query.Offset
import akka.persistence.query.typed.EventEnvelope
import akka.projection.grpc.consumer.ConsumerFilter.FilterCriteria
import akka.projection.grpc.consumer.EventProducerPushDestinationSettings
import akka.projection.grpc.internal.EventPusherConsumerServiceImpl
import akka.projection.grpc.internal.ProtoAnySerialization.Prefer
import akka.projection.grpc.internal.proto.EventConsumerServicePowerApiHandler
import com.google.protobuf.Descriptors

import scala.collection.immutable
import scala.reflect.ClassTag

import akka.persistence.Persistence
import akka.persistence.typed.PersistenceId

/**
* A passive consumer service for event producer push that can be bound as a gRPC endpoint accepting active producers
* pushing events, for example to run a projection piercing firewalls or NAT. Events are pushed directly into the
Expand Down Expand Up @@ -86,7 +87,7 @@ object EventProducerPushDestination {
val mapTags = { (eventEnvelope: EventEnvelope[Any]) =>
val newTags = f(eventEnvelope.asInstanceOf[EventEnvelope[A]])
if (newTags eq eventEnvelope.tags) eventEnvelope
else copyEnvelope(eventEnvelope)(tags = newTags)
else eventEnvelope.withTags(newTags)
}
appendMapper(clazz, mapTags)
}
Expand All @@ -96,11 +97,17 @@ object EventProducerPushDestination {
* same incoming persistence id to the same stored persistence id to not introduce gaps in the sequence numbers
* and break consuming projections.
*/
def registerPersistenceIdMapper(f: EventEnvelope[Any] => String): Transformation = {
def registerPersistenceIdMapper(f: EventEnvelope[Any] => String)(
implicit system: ActorSystem[_]): Transformation = {
val mapId = { (eventEnvelope: EventEnvelope[Any]) =>
val newPid = f(eventEnvelope)
if (newPid eq eventEnvelope.persistenceId) eventEnvelope
else copyEnvelope(eventEnvelope)(persistenceId = newPid)
if (newPid eq eventEnvelope.persistenceId)
eventEnvelope
else {
val entityType = PersistenceId.extractEntityType(newPid)
val slice = Persistence(system).sliceForPersistenceId(newPid)
eventEnvelope.withPersistenceId(newPid, entityType, slice)
}
}
// needs to be untyped since not mapping filtered events the same way will cause gaps in seqnrs
new Transformation(typedMappers, untypedMappers.andThen(mapId))
Expand All @@ -126,7 +133,7 @@ object EventProducerPushDestination {
else {
val newMaybePayload = f(eventEnvelope.asInstanceOf[EventEnvelope[A]])
if (newMaybePayload eq eventEnvelope.eventOption) eventEnvelope
else copyEnvelope(eventEnvelope)(eventOption = newMaybePayload)
else eventEnvelope.withEventOption(newMaybePayload)
}
}
appendMapper(clazz, mapPayload)
Expand All @@ -141,7 +148,7 @@ object EventProducerPushDestination {
else {
val newMaybePayload = f(eventEnvelope)
if (newMaybePayload eq eventEnvelope.eventOption) eventEnvelope
else copyEnvelope(eventEnvelope)(eventOption = newMaybePayload)
else eventEnvelope.withEventOption(newMaybePayload)
}
}
new Transformation(typedMappers, untypedMappers.andThen(anyPayloadMapper))
Expand All @@ -163,30 +170,6 @@ object EventProducerPushDestination {
untypedMappers)
}

private def copyEnvelope(original: EventEnvelope[Any])(
offset: Offset = original.offset,
persistenceId: String = original.persistenceId,
sequenceNr: Long = original.sequenceNr,
eventOption: Option[Any] = original.eventOption,
timestamp: Long = original.timestamp,
eventMetadata: Option[Any] = original.eventMetadata,
entityType: String = original.entityType,
slice: Int = original.slice,
filtered: Boolean = original.filtered,
source: String = original.source,
tags: Set[String] = original.tags): EventEnvelope[Any] =
new EventEnvelope[Any](
offset,
persistenceId,
sequenceNr,
eventOption,
timestamp,
eventMetadata,
entityType,
slice,
filtered,
source,
tags)
}

def grpcServiceHandler(eventConsumer: EventProducerPushDestination)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,14 @@ private[akka] final class EventPusherConsumerServiceImpl(

interceptedTail
.map { consumeEventIn =>
if (consumeEventIn.message.isEvent)
ProtobufProtocolConversions.eventToEnvelope[Any](consumeEventIn.getEvent, protoAnySerialization)
else if (consumeEventIn.message.isFilteredEvent) {
if (consumeEventIn.message.isEvent) {
// When no transformation we don't need to deserialize the event and can use SerializedEvent
val deserializeEvent = transformer ne EventProducerPushDestination.Transformation.empty
ProtobufProtocolConversions.eventToEnvelope(
consumeEventIn.getEvent,
protoAnySerialization,
deserializeEvent)
} else if (consumeEventIn.message.isFilteredEvent) {
ProtobufProtocolConversions.filteredEventToEnvelope[Any](consumeEventIn.getFilteredEvent)
} else {
throw new GrpcServiceException(Status.INVALID_ARGUMENT
Expand All @@ -121,6 +126,9 @@ private[akka] final class EventPusherConsumerServiceImpl(
transformedEventEnvelope.persistenceId,
if (transformedEventEnvelope.filtered) " filtered" else "")

// Note that when there is no Transformation the event is SerializedEvent, which will be passed
// through the EventWriter and the journal can write that without additional serialization.

destination.eventWriter
.askWithStatus[EventWriter.WriteAck](EventWriter.Write(
transformedEventEnvelope.persistenceId,
Expand Down
Loading

0 comments on commit 51abe98

Please sign in to comment.