Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Do not special handle protobuf payloads for replication #1212

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import akka.Done
import akka.NotUsed
import akka.actor.ClassicActorSystemProvider
import akka.actor.ExtendedActorSystem
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
Expand All @@ -28,6 +29,8 @@ import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.consumer.scaladsl
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilderOverrides
import akka.projection.grpc.internal.AkkaProjectionGrpcSerialization
import akka.projection.grpc.internal.DelegateToAkkaSerialization
import akka.projection.grpc.internal.ConnectionException
import akka.projection.grpc.internal.ProtoAnySerialization
import akka.projection.grpc.internal.ProtobufProtocolConversions
Expand All @@ -53,14 +56,13 @@ import io.grpc.Status
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import java.time.Instant
import java.util.UUID
import java.util.concurrent.TimeUnit

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.projection.grpc.internal.proto.ReplayPersistenceId
import akka.projection.grpc.internal.proto.ReplicaInfo
import akka.projection.grpc.replication.scaladsl.ReplicationSettings
Expand Down Expand Up @@ -144,6 +146,35 @@ object GrpcReadJournal {
replicationSettings)
}

/**
* INTERNAL API
*
* Factory method for replication, with replication specific serialization
*/
@InternalApi private[akka] def apply(
settings: GrpcQuerySettings,
clientSettings: GrpcClientSettings,
replicationSettings: Option[ReplicationSettings[_]])(implicit system: ActorSystem[_]): GrpcReadJournal = {

// FIXME issue #702 This probably means that one GrpcReadJournal instance is created for each Projection instance,
// and therefore one grpc client for each. Is that fine or should the client be shared for same clientSettings?

val wireSerialization = new DelegateToAkkaSerialization(system)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This apply is the same as the other apply above, aside from the serialization. Where is the other used? Do we need both? Is it even so that we can look at replicationSettings.isDefined to select serializer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified to a single internal apply factory now.


if (settings.initialConsumerFilter.nonEmpty) {
ConsumerFilter(system.classicSystem.toTyped).ref ! ConsumerFilter.UpdateFilter(
settings.streamId,
settings.initialConsumerFilter)
}

new scaladsl.GrpcReadJournal(
system.classicSystem.asInstanceOf[ExtendedActorSystem],
settings,
withChannelBuilderOverrides(clientSettings),
wireSerialization,
replicationSettings)
}

private def withChannelBuilderOverrides(clientSettings: GrpcClientSettings): GrpcClientSettings = {
// compose with potential user overrides to allow overriding our defaults
clientSettings.withChannelBuilderOverrides(channelBuilderOverrides.andThen(clientSettings.channelBuilderOverrides))
Expand All @@ -160,7 +191,7 @@ final class GrpcReadJournal private (
system: ExtendedActorSystem,
settings: GrpcQuerySettings,
clientSettings: GrpcClientSettings,
protoAnySerialization: ProtoAnySerialization,
wireSerialization: AkkaProjectionGrpcSerialization,
replicationSettings: Option[ReplicationSettings[_]])
extends ReadJournal
with EventsBySliceQuery
Expand Down Expand Up @@ -424,7 +455,7 @@ final class GrpcReadJournal private (
// not the normal entity type which is internal to the producing side
streamId: String): EventEnvelope[Evt] = {
require(streamId == settings.streamId, s"Stream id mismatch, was [$streamId], expected [${settings.streamId}]")
ProtobufProtocolConversions.eventToEnvelope(event, protoAnySerialization)
ProtobufProtocolConversions.eventToEnvelope(event, wireSerialization)
}

private def filteredEventToEnvelope[Evt](filteredEvent: FilteredEvent, entityType: String): EventEnvelope[Evt] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
"Use Transformation.identity to pass through each event as is.")
}

private val protoAnySerialization = new ProtoAnySerialization(system)
private val protoAnyWireSerialization = new ProtoAnySerialization(system)
private val akkaOnlyWireSerialization = new DelegateToAkkaSerialization(system)

private val streamIdToSourceMap: Map[String, EventProducer.EventProducerSource] =
sources.map(s => s.streamId -> s).toMap
Expand All @@ -97,6 +98,10 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
.map(s => s"(stream id: [${s.streamId}], entity type: [${s.entityType}])")
.mkString(", "))

private def wireSerialization(eps: EventProducer.EventProducerSource): AkkaProjectionGrpcSerialization =
if (eps.settings.akkaSerializationOnly) akkaOnlyWireSerialization
else protoAnyWireSerialization

private def intercept(streamId: String, metadata: Metadata): Future[Done] =
interceptor match {
case Some(interceptor) => interceptor.intercept(streamId, metadata)
Expand Down Expand Up @@ -229,7 +234,7 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
ProtobufProtocolConversions.offsetToProtoOffset(env.offset)))))
case _ =>
import system.executionContext
transformAndEncodeEvent(producerSource.transformation, env, protoAnySerialization)
transformAndEncodeEvent(producerSource.transformation, env, wireSerialization(producerSource))
.map {
case Some(event) =>
if (log.isTraceEnabled)
Expand Down Expand Up @@ -307,7 +312,7 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
.flatMap(f => req.replicaInfo.map(f.createFilter))
.getOrElse((_: EventEnvelope[_]) => true)
if (eventOriginFilter(env)) {
transformAndEncodeEvent(producerSource.transformation, env, protoAnySerialization)
transformAndEncodeEvent(producerSource.transformation, env, wireSerialization(producerSource))
.map {
case Some(event) =>
log.traceN(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ private[akka] object EventPusher {
import akka.projection.grpc.internal.ProtobufProtocolConversions.transformAndEncodeEvent

implicit val ec: ExecutionContext = system.executionContext
val protoAnySerialization = new ProtoAnySerialization(system)
val wireSerialization =
if (eps.settings.akkaSerializationOnly) new DelegateToAkkaSerialization(system)
else new ProtoAnySerialization(system)

def filterAndTransformFlow(filters: Future[proto.ConsumerEventStart])
: Flow[(EventEnvelope[Event], ProjectionContext), (ConsumeEventIn, ProjectionContext), NotUsed] =
Expand Down Expand Up @@ -119,7 +121,7 @@ private[akka] object EventPusher {
envelope.sequenceNr,
startMessage.replicaInfo.fold("")(ri => s", remote replica [${ri.replicaId}]"))

transformAndEncodeEvent(eps.transformation, envelope, protoAnySerialization)
transformAndEncodeEvent(eps.transformation, envelope, wireSerialization)
} else {
if (logger.isTraceEnabled())
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package akka.projection.grpc.internal
import scala.collection.concurrent.TrieMap
import scala.collection.immutable
import scala.util.Try

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.annotation.InternalApi
Expand All @@ -23,6 +22,7 @@ import com.google.protobuf.Descriptors
import com.google.protobuf.GeneratedMessageV3
import com.google.protobuf.Message
import com.google.protobuf.Parser
import com.google.protobuf.UnsafeByteOperations
import com.google.protobuf.any.{ Any => ScalaPbAny }
import com.google.protobuf.{ Any => JavaPbAny }
import com.google.protobuf.{ Any => PbAny }
Expand Down Expand Up @@ -99,15 +99,45 @@ import scalapb.options.Scalapb
}
}

def akkaSerializationTypeUrl(serializerId: Int, manifest: String): String = {
if (manifest.isEmpty) s"$AkkaSerializationTypeUrlPrefix$serializerId"
else
s"$AkkaSerializationTypeUrlPrefix$serializerId$AkkaTypeUrlManifestSeparator$manifest"
}

def akkaSerializerIdAndManifestFromTypeUrl(typeUrl: String): (Int, String) = {
val idAndManifest =
typeUrl.substring(AkkaSerializationTypeUrlPrefix.length)
val i = idAndManifest.indexOf(AkkaTypeUrlManifestSeparator)
if (i == -1)
idAndManifest.toInt -> ""
else
idAndManifest.substring(0, i).toInt -> idAndManifest.substring(i + 1)
}

}

/**
* INTERNAL API
*/
@InternalApi private[akka] class ProtoAnySerialization(
@InternalApi
private[akka] trait AkkaProjectionGrpcSerialization {
def serialize(event: Any): ScalaPbAny
def deserialize(scalaPbAny: ScalaPbAny): Any
def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent]
}

/**
* Primarily intended for direct usage of grpc projections, where there may be a public (protobuf) protocol published
* by the producing service.
*
* INTERNAL API
*/
@InternalApi private[akka] final class ProtoAnySerialization(
system: ActorSystem[_],
descriptors: immutable.Seq[Descriptors.FileDescriptor],
prefer: ProtoAnySerialization.Prefer) {
prefer: ProtoAnySerialization.Prefer)
extends AkkaProjectionGrpcSerialization {
import ProtoAnySerialization._

private val serialization = SerializationExtension(system.classicSystem)
Expand Down Expand Up @@ -138,7 +168,7 @@ import scalapb.options.Scalapb
def this(system: ActorSystem[_]) =
this(system, descriptors = Nil, ProtoAnySerialization.Prefer.Scala)

def serialize(event: Any): ScalaPbAny = {
override def serialize(event: Any): ScalaPbAny = {
event match {
case scalaPbAny: ScalaPbAny if scalaPbAny.typeUrl.startsWith(GoogleTypeUrlPrefix) =>
ScalaPbAny(ProtoAnyTypeUrl, scalaPbAny.toByteString)
Expand All @@ -153,15 +183,15 @@ import scalapb.options.Scalapb
case other =>
// fallback to Akka serialization
val otherAnyRef = other.asInstanceOf[AnyRef]
val bytes = serialization.serialize(otherAnyRef).get
val serializer = serialization.findSerializerFor(otherAnyRef)
val bytes = serializer.toBinary(otherAnyRef)
val manifest = Serializers.manifestFor(serializer, otherAnyRef)
val id = serializer.identifier
ScalaPbAny(akkaSerializationTypeUrl(id, manifest), ByteString.copyFrom(bytes))
ScalaPbAny(akkaSerializationTypeUrl(id, manifest), UnsafeByteOperations.unsafeWrap(bytes))
}
}

def deserialize(scalaPbAny: ScalaPbAny): Any = {
override def deserialize(scalaPbAny: ScalaPbAny): Any = {
val typeUrl = scalaPbAny.typeUrl
if (typeUrl == ProtoAnyTypeUrl) {
if (prefer == Prefer.Scala)
Expand All @@ -172,6 +202,7 @@ import scalapb.options.Scalapb
decodeMessage(scalaPbAny)
} else if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) {
val (id, manifest) = akkaSerializerIdAndManifestFromTypeUrl(typeUrl)
// FIXME could potentially optimize to use byte buffer here instead of copy to byte array
serialization.deserialize(scalaPbAny.value.toByteArray, id, manifest).get
} else if (prefer == Prefer.Scala) {
// when custom typeUrl
Expand All @@ -182,7 +213,7 @@ import scalapb.options.Scalapb
}
}

def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent] = {
override def toSerializedEvent(scalaPbAny: ScalaPbAny): Option[SerializedEvent] = {
// see corresponding typeUrl cases in `deserialize`

val typeUrl = scalaPbAny.typeUrl
Expand Down Expand Up @@ -224,22 +255,6 @@ import scalapb.options.Scalapb
}
}

private def akkaSerializationTypeUrl(serializerId: Int, manifest: String): String = {
if (manifest.isEmpty) s"$AkkaSerializationTypeUrlPrefix$serializerId"
else
s"$AkkaSerializationTypeUrlPrefix$serializerId$AkkaTypeUrlManifestSeparator$manifest"
}

private def akkaSerializerIdAndManifestFromTypeUrl(typeUrl: String): (Int, String) = {
val idAndManifest =
typeUrl.substring(AkkaSerializationTypeUrlPrefix.length)
val i = idAndManifest.indexOf(AkkaTypeUrlManifestSeparator)
if (i == -1)
idAndManifest.toInt -> ""
else
idAndManifest.substring(0, i).toInt -> idAndManifest.substring(i + 1)
}

private def strippedFileName(fileName: String) =
fileName.split(Array('/', '\\')).last.stripSuffix(".proto")

Expand Down Expand Up @@ -451,3 +466,46 @@ import scalapb.options.Scalapb
}

}

/**
* Primarily intended for replication where there is no public protocol
*
* INTERNAL API
*/
@InternalApi
private[akka] final class DelegateToAkkaSerialization(system: ActorSystem[_]) extends AkkaProjectionGrpcSerialization {
import ProtoAnySerialization._
private val serialization = SerializationExtension(system.classicSystem)

override def serialize(event: Any): ScalaPbAny = {
val anyRefEvent = event.asInstanceOf[AnyRef]
val serializer = serialization.findSerializerFor(anyRefEvent)
val bytes = serializer.toBinary(anyRefEvent)
val manifest = Serializers.manifestFor(serializer, anyRefEvent)
val id = serializer.identifier
ScalaPbAny(akkaSerializationTypeUrl(id, manifest), UnsafeByteOperations.unsafeWrap(bytes))
}

override def deserialize(event: ScalaPbAny): Any = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked in detail at this PR, but could we keep deserialization side as it was? If we serialize with AkkaSerializationTypeUrlPrefix that would still be deserialized correctly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if we can have no special casing that is better and less trickery to try to understand, all messages are fed to the configured akka serializer. The any-passthrough is such a weird special case that it is very unlikely that anybody else than us ourselves is doing that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

val typeUrl = event.typeUrl
if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) {
val (id, manifest) = akkaSerializerIdAndManifestFromTypeUrl(typeUrl)
// FIXME could potentially optimize to use byte buffer here instead of copy to byte array
serialization.deserialize(event.value.toByteArray, id, manifest).get
} else {
throw new IllegalArgumentException(
s"Got event with type url: [${typeUrl}] but only type urls with Akka serializer prefix ($AkkaSerializationTypeUrlPrefix) supported")
}
}

override def toSerializedEvent(event: ScalaPbAny): Option[SerializedEvent] = {
val typeUrl = event.typeUrl
if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) {
val (id, manifest) = akkaSerializerIdAndManifestFromTypeUrl(typeUrl)
Some(new SerializedEvent(event.value.toByteArray, id, manifest))
} else {
// We don't try to optimize this case. One level of indirection too much, and probably not a common case.
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private[akka] object ProtobufProtocolConversions {
def transformAndEncodeEvent(
transformation: Transformation,
env: EventEnvelope[_],
protoAnySerialization: ProtoAnySerialization)(
protoAnySerialization: AkkaProjectionGrpcSerialization)(
implicit executionContext: ExecutionContext): Future[Option[Event]] = {
env.eventOption match {
case Some(_) =>
Expand Down Expand Up @@ -147,22 +147,22 @@ private[akka] object ProtobufProtocolConversions {
}
}

def eventToEnvelope[Evt](event: Event, protoAnySerialization: ProtoAnySerialization): EventEnvelope[Evt] =
def eventToEnvelope[Evt](event: Event, protoAnySerialization: AkkaProjectionGrpcSerialization): EventEnvelope[Evt] =
eventToEnvelope(event, protoAnySerialization, deserializeEvent = true).asInstanceOf[EventEnvelope[Evt]]

def eventToEnvelope(
event: Event,
protoAnySerialization: ProtoAnySerialization,
wireSerialization: AkkaProjectionGrpcSerialization,
deserializeEvent: Boolean): EventEnvelope[Any] = {
val eventOffset = populateSeenIfNeeded(
TimestampOffset.toTimestampOffset(protocolOffsetToOffset(event.offset)),
event.persistenceId,
event.seqNr)

val metadata: Option[Any] = event.metadata.map(protoAnySerialization.deserialize)
val metadata: Option[Any] = event.metadata.map(wireSerialization.deserialize)

def envelopeWithDeserializedEvent: EventEnvelope[Any] = {
val evt = event.payload.map(protoAnySerialization.deserialize)
val evt = event.payload.map(wireSerialization.deserialize)
new EventEnvelope(
eventOffset,
event.persistenceId,
Expand All @@ -180,7 +180,7 @@ private[akka] object ProtobufProtocolConversions {
if (deserializeEvent || event.payload.isEmpty) {
envelopeWithDeserializedEvent
} else {
protoAnySerialization.toSerializedEvent(event.payload.get) match {
wireSerialization.toSerializedEvent(event.payload.get) match {
case Some(serializedEvent) =>
new EventEnvelope(
eventOffset,
Expand Down
Loading
Loading