Skip to content

Commit

Permalink
fix: Do not special handle protobuf payloads for replication
Browse files Browse the repository at this point in the history
# Conflicts:
#	akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/EventProducerSettings.scala
  • Loading branch information
johanandren committed Oct 1, 2024
1 parent 38e01c5 commit f122c68
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import akka.Done
import akka.NotUsed
import akka.actor.ClassicActorSystemProvider
import akka.actor.ExtendedActorSystem
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
Expand All @@ -28,6 +29,8 @@ import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.consumer.scaladsl
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilderOverrides
import akka.projection.grpc.internal.AkkaProjectionGrpcSerialization
import akka.projection.grpc.internal.DelegateToAkkaSerialization
import akka.projection.grpc.internal.ConnectionException
import akka.projection.grpc.internal.ProtoAnySerialization
import akka.projection.grpc.internal.ProtobufProtocolConversions
Expand All @@ -53,14 +56,13 @@ import io.grpc.Status
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import java.time.Instant
import java.util.UUID
import java.util.concurrent.TimeUnit

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.projection.grpc.internal.proto.ReplayPersistenceId
import akka.projection.grpc.internal.proto.ReplicaInfo
import akka.projection.grpc.replication.scaladsl.ReplicationSettings
Expand Down Expand Up @@ -144,6 +146,35 @@ object GrpcReadJournal {
replicationSettings)
}

/**
* INTERNAL API
*
* Factory method for replication, with replication specific serialization
*/
@InternalApi private[akka] def apply(
settings: GrpcQuerySettings,
clientSettings: GrpcClientSettings,
replicationSettings: Option[ReplicationSettings[_]])(implicit system: ActorSystem[_]): GrpcReadJournal = {

// FIXME issue #702 This probably means that one GrpcReadJournal instance is created for each Projection instance,
// and therefore one grpc client for each. Is that fine or should the client be shared for same clientSettings?

val wireSerialization = new DelegateToAkkaSerialization(system)

if (settings.initialConsumerFilter.nonEmpty) {
ConsumerFilter(system.classicSystem.toTyped).ref ! ConsumerFilter.UpdateFilter(
settings.streamId,
settings.initialConsumerFilter)
}

new scaladsl.GrpcReadJournal(
system.classicSystem.asInstanceOf[ExtendedActorSystem],
settings,
withChannelBuilderOverrides(clientSettings),
wireSerialization,
replicationSettings)
}

private def withChannelBuilderOverrides(clientSettings: GrpcClientSettings): GrpcClientSettings = {
// compose with potential user overrides to allow overriding our defaults
clientSettings.withChannelBuilderOverrides(channelBuilderOverrides.andThen(clientSettings.channelBuilderOverrides))
Expand All @@ -160,7 +191,7 @@ final class GrpcReadJournal private (
system: ExtendedActorSystem,
settings: GrpcQuerySettings,
clientSettings: GrpcClientSettings,
protoAnySerialization: ProtoAnySerialization,
wireSerialization: AkkaProjectionGrpcSerialization,
replicationSettings: Option[ReplicationSettings[_]])
extends ReadJournal
with EventsBySliceQuery
Expand Down Expand Up @@ -424,7 +455,7 @@ final class GrpcReadJournal private (
// not the normal entity type which is internal to the producing side
streamId: String): EventEnvelope[Evt] = {
require(streamId == settings.streamId, s"Stream id mismatch, was [$streamId], expected [${settings.streamId}]")
ProtobufProtocolConversions.eventToEnvelope(event, protoAnySerialization)
ProtobufProtocolConversions.eventToEnvelope(event, wireSerialization)
}

private def filteredEventToEnvelope[Evt](filteredEvent: FilteredEvent, entityType: String): EventEnvelope[Evt] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
"Use Transformation.identity to pass through each event as is.")
}

private val protoAnySerialization = new ProtoAnySerialization(system)
private val protoAnyWireSerialization = new ProtoAnySerialization(system)
private val akkaOnlyWireSerialization = new DelegateToAkkaSerialization(system)

private val streamIdToSourceMap: Map[String, EventProducer.EventProducerSource] =
sources.map(s => s.streamId -> s).toMap
Expand All @@ -97,6 +98,10 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
.map(s => s"(stream id: [${s.streamId}], entity type: [${s.entityType}])")
.mkString(", "))

private def wireSerialization(eps: EventProducer.EventProducerSource): AkkaProjectionGrpcSerialization =
if (eps.settings.akkaSerializationOnly) akkaOnlyWireSerialization
else protoAnyWireSerialization

private def intercept(streamId: String, metadata: Metadata): Future[Done] =
interceptor match {
case Some(interceptor) => interceptor.intercept(streamId, metadata)
Expand Down Expand Up @@ -229,7 +234,7 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
ProtobufProtocolConversions.offsetToProtoOffset(env.offset)))))
case _ =>
import system.executionContext
transformAndEncodeEvent(producerSource.transformation, env, protoAnySerialization)
transformAndEncodeEvent(producerSource.transformation, env, wireSerialization(producerSource))
.map {
case Some(event) =>
if (log.isTraceEnabled)
Expand Down Expand Up @@ -307,7 +312,7 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
.flatMap(f => req.replicaInfo.map(f.createFilter))
.getOrElse((_: EventEnvelope[_]) => true)
if (eventOriginFilter(env)) {
transformAndEncodeEvent(producerSource.transformation, env, protoAnySerialization)
transformAndEncodeEvent(producerSource.transformation, env, wireSerialization(producerSource))
.map {
case Some(event) =>
log.traceN(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ private[akka] object EventPusher {
import akka.projection.grpc.internal.ProtobufProtocolConversions.transformAndEncodeEvent

implicit val ec: ExecutionContext = system.executionContext
val protoAnySerialization = new ProtoAnySerialization(system)
val wireSerialization =
if (eps.settings.akkaSerializationOnly) new DelegateToAkkaSerialization(system)
else new ProtoAnySerialization(system)

def filterAndTransformFlow(filters: Future[proto.ConsumerEventStart])
: Flow[(EventEnvelope[Event], ProjectionContext), (ConsumeEventIn, ProjectionContext), NotUsed] =
Expand Down Expand Up @@ -119,7 +121,7 @@ private[akka] object EventPusher {
envelope.sequenceNr,
startMessage.replicaInfo.fold("")(ri => s", remote replica [${ri.replicaId}]"))

transformAndEncodeEvent(eps.transformation, envelope, protoAnySerialization)
transformAndEncodeEvent(eps.transformation, envelope, wireSerialization)
} else {
if (logger.isTraceEnabled())
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package akka.projection.grpc.internal
import scala.collection.concurrent.TrieMap
import scala.collection.immutable
import scala.util.Try

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.annotation.InternalApi
Expand All @@ -23,6 +22,7 @@ import com.google.protobuf.Descriptors
import com.google.protobuf.GeneratedMessageV3
import com.google.protobuf.Message
import com.google.protobuf.Parser
import com.google.protobuf.UnsafeByteOperations
import com.google.protobuf.any.{ Any => ScalaPbAny }
import com.google.protobuf.{ Any => JavaPbAny }
import com.google.protobuf.{ Any => PbAny }
Expand Down Expand Up @@ -99,15 +99,45 @@ import scalapb.options.Scalapb
}
}

def akkaSerializationTypeUrl(serializerId: Int, manifest: String): String = {
if (manifest.isEmpty) s"$AkkaSerializationTypeUrlPrefix$serializerId"
else
s"$AkkaSerializationTypeUrlPrefix$serializerId$AkkaTypeUrlManifestSeparator$manifest"
}

def akkaSerializerIdAndManifestFromTypeUrl(typeUrl: String): (Int, String) = {
val idAndManifest =
typeUrl.substring(AkkaSerializationTypeUrlPrefix.length)
val i = idAndManifest.indexOf(AkkaTypeUrlManifestSeparator)
if (i == -1)
idAndManifest.toInt -> ""
else
idAndManifest.substring(0, i).toInt -> idAndManifest.substring(i + 1)
}

}

/**
* INTERNAL API
*/
@InternalApi private[akka] class ProtoAnySerialization(
@InternalApi
private[akka] trait AkkaProjectionGrpcSerialization {
def serialize(event: Any): ScalaPbAny
def deserialize(scalaPbAny: ScalaPbAny): Any
def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent]
}

/**
* Primarily intended for direct usage of grpc projections, where there may be a public (protobuf) protocol published
* by the producing service.
*
* INTERNAL API
*/
@InternalApi private[akka] final class ProtoAnySerialization(
system: ActorSystem[_],
descriptors: immutable.Seq[Descriptors.FileDescriptor],
prefer: ProtoAnySerialization.Prefer) {
prefer: ProtoAnySerialization.Prefer)
extends AkkaProjectionGrpcSerialization {
import ProtoAnySerialization._

private val serialization = SerializationExtension(system.classicSystem)
Expand Down Expand Up @@ -138,7 +168,7 @@ import scalapb.options.Scalapb
def this(system: ActorSystem[_]) =
this(system, descriptors = Nil, ProtoAnySerialization.Prefer.Scala)

def serialize(event: Any): ScalaPbAny = {
override def serialize(event: Any): ScalaPbAny = {
event match {
case scalaPbAny: ScalaPbAny if scalaPbAny.typeUrl.startsWith(GoogleTypeUrlPrefix) =>
ScalaPbAny(ProtoAnyTypeUrl, scalaPbAny.toByteString)
Expand All @@ -153,15 +183,15 @@ import scalapb.options.Scalapb
case other =>
// fallback to Akka serialization
val otherAnyRef = other.asInstanceOf[AnyRef]
val bytes = serialization.serialize(otherAnyRef).get
val serializer = serialization.findSerializerFor(otherAnyRef)
val bytes = serializer.toBinary(otherAnyRef)
val manifest = Serializers.manifestFor(serializer, otherAnyRef)
val id = serializer.identifier
ScalaPbAny(akkaSerializationTypeUrl(id, manifest), ByteString.copyFrom(bytes))
ScalaPbAny(akkaSerializationTypeUrl(id, manifest), UnsafeByteOperations.unsafeWrap(bytes))
}
}

def deserialize(scalaPbAny: ScalaPbAny): Any = {
override def deserialize(scalaPbAny: ScalaPbAny): Any = {
val typeUrl = scalaPbAny.typeUrl
if (typeUrl == ProtoAnyTypeUrl) {
if (prefer == Prefer.Scala)
Expand All @@ -172,6 +202,7 @@ import scalapb.options.Scalapb
decodeMessage(scalaPbAny)
} else if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) {
val (id, manifest) = akkaSerializerIdAndManifestFromTypeUrl(typeUrl)
// FIXME could potentially optimize to use byte buffer here instead of copy to byte array
serialization.deserialize(scalaPbAny.value.toByteArray, id, manifest).get
} else if (prefer == Prefer.Scala) {
// when custom typeUrl
Expand All @@ -182,7 +213,7 @@ import scalapb.options.Scalapb
}
}

def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent] = {
override def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent] = {
// see corresponding typeUrl cases in `deserialize`

val typeUrl = scalaPbAny.typeUrl
Expand Down Expand Up @@ -224,22 +255,6 @@ import scalapb.options.Scalapb
}
}

private def akkaSerializationTypeUrl(serializerId: Int, manifest: String): String = {
if (manifest.isEmpty) s"$AkkaSerializationTypeUrlPrefix$serializerId"
else
s"$AkkaSerializationTypeUrlPrefix$serializerId$AkkaTypeUrlManifestSeparator$manifest"
}

private def akkaSerializerIdAndManifestFromTypeUrl(typeUrl: String): (Int, String) = {
val idAndManifest =
typeUrl.substring(AkkaSerializationTypeUrlPrefix.length)
val i = idAndManifest.indexOf(AkkaTypeUrlManifestSeparator)
if (i == -1)
idAndManifest.toInt -> ""
else
idAndManifest.substring(0, i).toInt -> idAndManifest.substring(i + 1)
}

private def strippedFileName(fileName: String) =
fileName.split(Array('/', '\\')).last.stripSuffix(".proto")

Expand Down Expand Up @@ -451,3 +466,46 @@ import scalapb.options.Scalapb
}

}

/**
* Primarily intended for replication where there is no public protocol
*
* INTERNAL API
*/
@InternalApi
private[akka] final class DelegateToAkkaSerialization(system: ActorSystem[_]) extends AkkaProjectionGrpcSerialization {
import ProtoAnySerialization._
private val serialization = SerializationExtension(system.classicSystem)

override def serialize(event: Any): ScalaPbAny = {
val anyRefEvent = event.asInstanceOf[AnyRef]
val serializer = serialization.findSerializerFor(anyRefEvent)
val bytes = serializer.toBinary(anyRefEvent)
val manifest = Serializers.manifestFor(serializer, anyRefEvent)
val id = serializer.identifier
ScalaPbAny(akkaSerializationTypeUrl(id, manifest), UnsafeByteOperations.unsafeWrap(bytes))
}

override def deserialize(event: ScalaPbAny): Any = {
val typeUrl = event.typeUrl
if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) {
val (id, manifest) = akkaSerializerIdAndManifestFromTypeUrl(typeUrl)
// FIXME could potentially optimize to use byte buffer here instead of copy to byte array
serialization.deserialize(event.value.toByteArray, id, manifest).get
} else {
throw new IllegalArgumentException(
s"Got event with type url: [${typeUrl}] but only type urls with Akka serializer prefix ($AkkaSerializationTypeUrlPrefix) supported")
}
}

override def toSerializedEvent(event: ScalaPbAny): Option[SerializedEvent] = {
val typeUrl = event.typeUrl
if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) {
val (id, manifest) = akkaSerializerIdAndManifestFromTypeUrl(typeUrl)
Some(new SerializedEvent(event.value.toByteArray, id, manifest))
} else {
// We don't try to optimize this case. One level of indirection too much, and probably not a common case.
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private[akka] object ProtobufProtocolConversions {
def transformAndEncodeEvent(
transformation: Transformation,
env: EventEnvelope[_],
protoAnySerialization: ProtoAnySerialization)(
protoAnySerialization: AkkaProjectionGrpcSerialization)(
implicit executionContext: ExecutionContext): Future[Option[Event]] = {
env.eventOption match {
case Some(_) =>
Expand Down Expand Up @@ -147,22 +147,22 @@ private[akka] object ProtobufProtocolConversions {
}
}

def eventToEnvelope[Evt](event: Event, protoAnySerialization: ProtoAnySerialization): EventEnvelope[Evt] =
def eventToEnvelope[Evt](event: Event, protoAnySerialization: AkkaProjectionGrpcSerialization): EventEnvelope[Evt] =
eventToEnvelope(event, protoAnySerialization, deserializeEvent = true).asInstanceOf[EventEnvelope[Evt]]

def eventToEnvelope(
event: Event,
protoAnySerialization: ProtoAnySerialization,
wireSerialization: AkkaProjectionGrpcSerialization,
deserializeEvent: Boolean): EventEnvelope[Any] = {
val eventOffset = populateSeenIfNeeded(
TimestampOffset.toTimestampOffset(protocolOffsetToOffset(event.offset)),
event.persistenceId,
event.seqNr)

val metadata: Option[Any] = event.metadata.map(protoAnySerialization.deserialize)
val metadata: Option[Any] = event.metadata.map(wireSerialization.deserialize)

def envelopeWithDeserializedEvent: EventEnvelope[Any] = {
val evt = event.payload.map(protoAnySerialization.deserialize)
val evt = event.payload.map(wireSerialization.deserialize)
new EventEnvelope(
eventOffset,
event.persistenceId,
Expand All @@ -180,7 +180,7 @@ private[akka] object ProtobufProtocolConversions {
if (deserializeEvent || event.payload.isEmpty) {
envelopeWithDeserializedEvent
} else {
protoAnySerialization.toSerializedEvent(event.payload.get) match {
wireSerialization.toSerializedEvent(event.payload.get) match {
case Some(serializedEvent) =>
new EventEnvelope(
eventOffset,
Expand Down
Loading

0 comments on commit f122c68

Please sign in to comment.