Skip to content

Commit

Permalink
chore: Transformation.identity vs empty (#967)
Browse files Browse the repository at this point in the history
* empty and identity also in EventProducerPushDestination Transformation
* it's not mandatory in EventProducerPushDestination, but fail for exactly empty
* default for EventProducerPushDestination is identity
  • Loading branch information
patriknw authored Aug 30, 2023
1 parent 2df2920 commit ce6824b
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ final class EventProducerPushDestination private (
/**
* @param transformation A transformation to use for all events.
*/
def withTransformation(transformation: Transformation): EventProducerPushDestination =
def withTransformation(transformation: Transformation): EventProducerPushDestination = {
require(
transformation ne Transformation.empty,
s"Transformation must not be empty. Use Transformation.identity to pass through each event as is.")
copy(transformationForOrigin = (_, _) => transformation)
}

/**
* @param transformation A function to create a transformation from the origin id and request metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,18 @@ import scala.compat.java8.OptionConverters._
import scala.reflect.ClassTag

object Transformation {
val empty = new Transformation(scaladsl.EventProducerPushDestination.Transformation.empty)

/**
* Starting point for building `Transformation`. Registrations of actual transformations must
* be added. Use [[Transformation.identity]] to pass through each event as is.
*/
val empty: Transformation = new Transformation(scaladsl.EventProducerPushDestination.Transformation.empty)

/**
* No transformation. Pass through each event as is.
*/
val identity: Transformation =
new Transformation(scaladsl.EventProducerPushDestination.Transformation.identity)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,27 @@ object EventProducerPushDestination {
new EventProducerPushDestination(
None,
acceptedStreamId,
(_, _) => Transformation.empty,
(_, _) => Transformation.identity,
None,
immutable.Seq.empty,
protobufDescriptors,
EventProducerPushDestinationSettings(system))

@ApiMayChange
object Transformation {
val empty: Transformation = new Transformation(Map.empty, Predef.identity)

/**
* Starting point for building `Transformation`. Registrations of actual transformations must
* be added. Use [[Transformation.identity]] to pass through each event as is.
*/
val empty: Transformation =
new Transformation(Map.empty, Predef.identity)

/**
* No transformation. Pass through each event as is.
*/
val identity: Transformation =
new Transformation(Map.empty, Predef.identity)
}

/**
Expand Down Expand Up @@ -200,8 +212,12 @@ final class EventProducerPushDestination private[akka] (
/**
* @param transformation A transformation to use for all events.
*/
def withTransformation(transformation: Transformation): EventProducerPushDestination =
def withTransformation(transformation: Transformation): EventProducerPushDestination = {
require(
transformation ne Transformation.empty,
s"Transformation must not be empty. Use Transformation.identity to pass through each event as is.")
copy(transformationForOrigin = (_, _) => transformation)
}

/**
* @param transformation A function to create a transformation from the origin id and request metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ import com.google.protobuf.timestamp.Timestamp
import io.grpc.Status
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import scala.concurrent.Future

import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -78,6 +79,10 @@ import scala.concurrent.Future
eventsBySlicesPerStreamId.contains(s.streamId) ||
eventsBySlicesStartingFromSnapshotsPerStreamId.contains(s.streamId),
s"No events by slices query defined for stream id [${s.streamId}]")
require(
s.transformation ne Transformation.empty,
s"Transformation is not defined for stream id [${s.streamId}]. " +
"Use Transformation.identity to pass through each event as is.")
}

private val protoAnySerialization = new ProtoAnySerialization(system)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ private[akka] final class EventPusherConsumerServiceImpl(
}
val transformer =
destination.eventProducerPushDestination.transformationForOrigin(init.originId, metadata)
if (transformer eq EventProducerPushDestination.Transformation.empty)
throw new IllegalArgumentException(
s"Transformation must not be empty. Use Transformation.identity to pass through each event as is.")

// allow interceptor to block request based on metadata
val interceptedTail = destination.eventProducerPushDestination.interceptor match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ trait Mapper[A, B] {

@ApiMayChange
object Transformation {

/**
* Starting point for building `Transformation`. Registrations of actual transformations must
* be added. Use [[Transformation.identity]] to pass through each event as is.
*/
val empty: Transformation = new Transformation(scaladsl.EventProducer.Transformation.empty)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,17 @@ object EventProducer {
@ApiMayChange
object Transformation {

/**
* Starting point for building `Transformation`. Registrations of actual transformations must
* be added. Use [[Transformation.identity]] to pass through each event as is.
*/
val empty: Transformation = new Transformation(
mappers = Map.empty,
orElse = envelope =>
Future.failed(new IllegalArgumentException(s"Missing transformation for event [${envelope.event.getClass}]")))
Future.failed(
new IllegalArgumentException(
s"Missing transformation for event [${envelope.eventOption.map(_.getClass).getOrElse("")}]. " +
"Use Transformation.identity to pass through each event as is.")))

/**
* No transformation. Pass through each event as is.
Expand Down
2 changes: 2 additions & 0 deletions docs/src/main/paradox/grpc-producer-push.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ defines a single transformation to use for all producers, while @apidoc[EventPro
is invoked with an origin id for the producer and additional metadata specified when setting up the producer and can provide
transformations based on those.

Use @scala[`Transformation.identity`]@java[`Transformation.identity()`] to pass through each event as is.

The payload transformation also allows for arbitrary filtering logic, returning a @scala[None]@java[Optional.empty()] marks
the event as filtered and avoids storing the payload in the consumer journal.

Expand Down
2 changes: 2 additions & 0 deletions docs/src/main/paradox/grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ Java

To omit an event the transformation function can return @scala[`None`]@java[`Optional.empty()`].

Use @scala[`Transformation.identity`]@java[`Transformation.identity()`] to pass through each event as is.

That `EventProducer` service is started in an Akka gRPC server like this:

Scala
Expand Down

0 comments on commit ce6824b

Please sign in to comment.