diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestination.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestination.scala index ae6b5f0ce..248d3bbe3 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestination.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestination.scala @@ -107,8 +107,12 @@ final class EventProducerPushDestination private ( /** * @param transformation A transformation to use for all events. */ - def withTransformation(transformation: Transformation): EventProducerPushDestination = + def withTransformation(transformation: Transformation): EventProducerPushDestination = { + require( + transformation ne Transformation.empty, + s"Transformation must not be empty. Use Transformation.identity to pass through each event as is.") copy(transformationForOrigin = (_, _) => transformation) + } /** * @param transformation A function to create a transformation from the origin id and request metadata diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/Transformation.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/Transformation.scala index 9117201db..f4ebd97ad 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/Transformation.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/Transformation.scala @@ -16,7 +16,18 @@ import scala.compat.java8.OptionConverters._ import scala.reflect.ClassTag object Transformation { - val empty = new Transformation(scaladsl.EventProducerPushDestination.Transformation.empty) + + /** + * Starting point for building `Transformation`. Registrations of actual transformations must + * be added. Use [[Transformation.identity]] to pass through each event as is. + */ + val empty: Transformation = new Transformation(scaladsl.EventProducerPushDestination.Transformation.empty) + + /** + * No transformation. Pass through each event as is. + */ + val identity: Transformation = + new Transformation(scaladsl.EventProducerPushDestination.Transformation.identity) } /** diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/EventProducerPushDestination.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/EventProducerPushDestination.scala index fc67af20e..dc55c7f31 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/EventProducerPushDestination.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/EventProducerPushDestination.scala @@ -47,7 +47,7 @@ object EventProducerPushDestination { new EventProducerPushDestination( None, acceptedStreamId, - (_, _) => Transformation.empty, + (_, _) => Transformation.identity, None, immutable.Seq.empty, protobufDescriptors, @@ -55,7 +55,19 @@ object EventProducerPushDestination { @ApiMayChange object Transformation { - val empty: Transformation = new Transformation(Map.empty, Predef.identity) + + /** + * Starting point for building `Transformation`. Registrations of actual transformations must + * be added. Use [[Transformation.identity]] to pass through each event as is. + */ + val empty: Transformation = + new Transformation(Map.empty, Predef.identity) + + /** + * No transformation. Pass through each event as is. + */ + val identity: Transformation = + new Transformation(Map.empty, Predef.identity) } /** @@ -200,8 +212,12 @@ final class EventProducerPushDestination private[akka] ( /** * @param transformation A transformation to use for all events. */ - def withTransformation(transformation: Transformation): EventProducerPushDestination = + def withTransformation(transformation: Transformation): EventProducerPushDestination = { + require( + transformation ne Transformation.empty, + s"Transformation must not be empty. Use Transformation.identity to pass through each event as is.") copy(transformationForOrigin = (_, _) => transformation) + } /** * @param transformation A function to create a transformation from the origin id and request metadata diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala index 07d7c7a0e..4ad535ae5 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala @@ -39,9 +39,10 @@ import com.google.protobuf.timestamp.Timestamp import io.grpc.Status import org.slf4j.Logger import org.slf4j.LoggerFactory - import scala.concurrent.Future +import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation + /** * INTERNAL API */ @@ -78,6 +79,10 @@ import scala.concurrent.Future eventsBySlicesPerStreamId.contains(s.streamId) || eventsBySlicesStartingFromSnapshotsPerStreamId.contains(s.streamId), s"No events by slices query defined for stream id [${s.streamId}]") + require( + s.transformation ne Transformation.empty, + s"Transformation is not defined for stream id [${s.streamId}]. " + + "Use Transformation.identity to pass through each event as is.") } private val protoAnySerialization = new ProtoAnySerialization(system) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala index 87e9d8b16..065b60e74 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala @@ -82,6 +82,9 @@ private[akka] final class EventPusherConsumerServiceImpl( } val transformer = destination.eventProducerPushDestination.transformationForOrigin(init.originId, metadata) + if (transformer eq EventProducerPushDestination.Transformation.empty) + throw new IllegalArgumentException( + s"Transformation must not be empty. Use Transformation.identity to pass through each event as is.") // allow interceptor to block request based on metadata val interceptedTail = destination.eventProducerPushDestination.interceptor match { diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/Transformation.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/Transformation.scala index 8c467305f..19244c1ab 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/Transformation.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/Transformation.scala @@ -25,6 +25,11 @@ trait Mapper[A, B] { @ApiMayChange object Transformation { + + /** + * Starting point for building `Transformation`. Registrations of actual transformations must + * be added. Use [[Transformation.identity]] to pass through each event as is. + */ val empty: Transformation = new Transformation(scaladsl.EventProducer.Transformation.empty) /** diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala index 74950d374..e8267dc89 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala @@ -105,10 +105,17 @@ object EventProducer { @ApiMayChange object Transformation { + /** + * Starting point for building `Transformation`. Registrations of actual transformations must + * be added. Use [[Transformation.identity]] to pass through each event as is. + */ val empty: Transformation = new Transformation( mappers = Map.empty, orElse = envelope => - Future.failed(new IllegalArgumentException(s"Missing transformation for event [${envelope.event.getClass}]"))) + Future.failed( + new IllegalArgumentException( + s"Missing transformation for event [${envelope.eventOption.map(_.getClass).getOrElse("")}]. " + + "Use Transformation.identity to pass through each event as is."))) /** * No transformation. Pass through each event as is. diff --git a/docs/src/main/paradox/grpc-producer-push.md b/docs/src/main/paradox/grpc-producer-push.md index fbe10ab3f..fe5f4faea 100644 --- a/docs/src/main/paradox/grpc-producer-push.md +++ b/docs/src/main/paradox/grpc-producer-push.md @@ -80,6 +80,8 @@ defines a single transformation to use for all producers, while @apidoc[EventPro is invoked with an origin id for the producer and additional metadata specified when setting up the producer and can provide transformations based on those. +Use @scala[`Transformation.identity`]@java[`Transformation.identity()`] to pass through each event as is. + The payload transformation also allows for arbitrary filtering logic, returning a @scala[None]@java[Optional.empty()] marks the event as filtered and avoids storing the payload in the consumer journal. diff --git a/docs/src/main/paradox/grpc.md b/docs/src/main/paradox/grpc.md index 33cb0ed75..b6bcdf195 100644 --- a/docs/src/main/paradox/grpc.md +++ b/docs/src/main/paradox/grpc.md @@ -145,6 +145,8 @@ Java To omit an event the transformation function can return @scala[`None`]@java[`Optional.empty()`]. +Use @scala[`Transformation.identity`]@java[`Transformation.identity()`] to pass through each event as is. + That `EventProducer` service is started in an Akka gRPC server like this: Scala