Skip to content

Commit

Permalink
empty and identity also in EventProducerPushDestination Transformation
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Aug 29, 2023
1 parent b3344c0 commit 5920a9a
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ final class EventProducerPushDestination private (
/**
* @param transformation A transformation to use for all events.
*/
def withTransformation(transformation: Transformation): EventProducerPushDestination =
def withTransformation(transformation: Transformation): EventProducerPushDestination = {
require(
transformation ne Transformation.empty,
s"Transformation must not be empty. Use Transformation.identity to pass through each event as is.")
copy(transformationForOrigin = (_, _) => transformation)
}

/**
* @param transformation A function to create a transformation from the origin id and request metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,18 @@ import scala.compat.java8.OptionConverters._
import scala.reflect.ClassTag

object Transformation {
val empty = new Transformation(scaladsl.EventProducerPushDestination.Transformation.empty)

/**
* Starting point for building `Transformation`. Registrations of actual transformations must
* be added. Use [[Transformation.identity]] to pass through each event as is.
*/
val empty: Transformation = new Transformation(scaladsl.EventProducerPushDestination.Transformation.empty)

/**
* No transformation. Pass through each event as is.
*/
val identity: Transformation =
new Transformation(scaladsl.EventProducerPushDestination.Transformation.identity)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,31 @@ object EventProducerPushDestination {
new EventProducerPushDestination(
None,
acceptedStreamId,
(_, _) => Transformation.empty,
(_, _) => Transformation.identity,
None,
immutable.Seq.empty,
protobufDescriptors,
EventProducerPushDestinationSettings(system))

@ApiMayChange
object Transformation {
val empty: Transformation = new Transformation(Map.empty, Predef.identity)

/**
* Starting point for building `Transformation`. Registrations of actual transformations must
* be added. Use [[Transformation.identity]] to pass through each event as is.
*/
val empty: Transformation = new Transformation(
typedMappers = Map.empty,
untypedMappers = envelope =>
throw new IllegalArgumentException(
s"Missing transformation for event [${envelope.eventOption.map(_.getClass).getOrElse("")}]. " +
"Use Transformation.identity to pass through each event as is."))

/**
* No transformation. Pass through each event as is.
*/
val identity: Transformation =
new Transformation(Map.empty, Predef.identity)
}

/**
Expand Down Expand Up @@ -200,8 +216,12 @@ final class EventProducerPushDestination private[akka] (
/**
* @param transformation A transformation to use for all events.
*/
def withTransformation(transformation: Transformation): EventProducerPushDestination =
def withTransformation(transformation: Transformation): EventProducerPushDestination = {
require(
transformation ne Transformation.empty,
s"Transformation must not be empty. Use Transformation.identity to pass through each event as is.")
copy(transformationForOrigin = (_, _) => transformation)
}

/**
* @param transformation A function to create a transformation from the origin id and request metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ trait Mapper[A, B] {

@ApiMayChange
object Transformation {

/**
* Starting point for building `Transformation`. Registrations of actual transformations must
* be added. Use [[Transformation.identity]] to pass through each event as is.
*/
val empty: Transformation = new Transformation(scaladsl.EventProducer.Transformation.empty)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ object EventProducer {
@ApiMayChange
object Transformation {

/**
* Starting point for building `Transformation`. Registrations of actual transformations must
* be added. Use [[Transformation.identity]] to pass through each event as is.
*/
val empty: Transformation = new Transformation(
mappers = Map.empty,
orElse = envelope =>
Future.failed(
new IllegalArgumentException(
s"Missing transformation for event [${envelope.event.getClass}]. " +
s"Missing transformation for event [${envelope.eventOption.map(_.getClass).getOrElse("")}]. " +
"Use Transformation.identity to pass through each event as is.")))

/**
Expand Down
2 changes: 2 additions & 0 deletions docs/src/main/paradox/grpc-producer-push.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ defines a single transformation to use for all producers, while @apidoc[EventPro
is invoked with an origin id for the producer and additional metadata specified when setting up the producer and can provide
transformations based on those.

Use @scala[`Transformation.identity`]@java[`Transformation.identity()`] to pass through each event as is.

The payload transformation also allows for arbitrary filtering logic, returning a @scala[None]@java[Optional.empty()] marks
the event as filtered and avoids storing the payload in the consumer journal.

Expand Down

0 comments on commit 5920a9a

Please sign in to comment.