Skip to content

Commit

Permalink
feat: One more convenient/common producer transform signature (#981)
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren authored Aug 31, 2023
1 parent 4410d9f commit 72614aa
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class EventProducerPushSpec(testContainerConf: TestContainerConf)
.registerPersistenceIdMapper(envelope => envelope.persistenceId.replace("p-", s"$originId-"))
.registerTagMapper[StringValue](_ => Set("added-tag"))
// uppercase and turn into String instead of wire-protocol protobuf message
.registerPayloadMapper[StringValue, String](env => env.eventOption.map(name => name.value.toUpperCase))
.registerEnvelopeMapper[StringValue, String](env => env.eventOption.map(name => name.value.toUpperCase))
}
.withConsumerFilters(Vector(ConsumerFilter.ExcludeEntityIds(Set(consumerFilterExcludedPid.id))))
// #consumerSetup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public static void start(ActorSystem<?> system) {
Transformation.empty()
.registerMapper(
Integer.class, event -> Optional.of(Integer.valueOf(event * 2).toString()))
.registerEnvelopeMapper(Long.class, envelope -> Optional.of(envelope.event() + 1L))
.registerOrElseMapper(event -> Optional.of(event.toString()));
Transformation lowLevel = Transformation.empty().registerAsyncEnvelopeMapper(
Integer.class, envelope -> CompletableFuture.completedFuture(envelope.getOptionalEvent())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures {
Future.successful(env.eventMetadata))
transformer(envelope("whatever", Some("meta"))).futureValue should ===(Some("meta"))
}

"transform envelope" in {
val transformer = Transformation.empty.registerEnvelopeMapper((env: EventEnvelope[String]) => Some(env.event))
transformer(envelope("whatever", None)).futureValue should ===(Some("whatever"))
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# not yet in a published release
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.consumer.javadsl.Transformation.registerPayloadMapper")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.consumer.scaladsl.EventProducerPushDestination#Transformation.registerPayloadMapper")
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,25 @@ final class Transformation private (val delegate: scaladsl.EventProducerPushDest
}

/**
* Transform incoming event payloads.
*
* Events can be excluded by mapping the payload to `Optional.empty`.
*/
def registerMapper[A, B](inputEventClass: Class[A], f: JFunction[A, Optional[B]]): Transformation = {
implicit val ct: ClassTag[A] = ClassTag(inputEventClass)
new Transformation(delegate.registerEnvelopeMapper[A, B](envelope => f(envelope.event).asScala))
}

/**
* Transform incoming event payloads, with access to the entire envelope for additional metadata.
*
* Events can be excluded by mapping the payload `Optional.empty`.
*/
def registerPayloadMapper[A, B](
def registerEnvelopeMapper[A, B](
inputEventClass: Class[A],
f: JFunction[EventEnvelope[A], Optional[B]]): Transformation = {
implicit val ct: ClassTag[A] = ClassTag(inputEventClass)
new Transformation(delegate.registerPayloadMapper[A, B](envelope => f(envelope).asScala))
new Transformation(delegate.registerEnvelopeMapper[A, B](envelope => f(envelope).asScala))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,19 @@ object EventProducerPushDestination {
}

/**
* Transform incoming event payloads.
*
* Events can be excluded by mapping the payload to `None`.
*/
def registerPayloadMapper[A: ClassTag, B](f: EventEnvelope[A] => Option[B]): Transformation = {
def registerMapper[A: ClassTag, B](f: A => Option[B]): Transformation =
registerEnvelopeMapper[A, B](envelope => f(envelope.event))

/**
* Transform incoming event payloads, with access to the entire envelope for additional metadata.
*
* Events can be excluded by mapping the payload to `None`.
*/
def registerEnvelopeMapper[A: ClassTag, B](f: EventEnvelope[A] => Option[B]): Transformation = {
val clazz = implicitly[ClassTag[A]].runtimeClass
val mapPayload = { (eventEnvelope: EventEnvelope[Any]) =>
if (eventEnvelope.filtered) eventEnvelope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import akka.persistence.query.typed.EventEnvelope
import akka.projection.grpc.producer.scaladsl

import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.{ Function => JFunction }
import scala.compat.java8.FutureConverters._
Expand Down Expand Up @@ -49,6 +50,10 @@ object Transformation {
@ApiMayChange
final class Transformation private[akka] (private[akka] val delegate: scaladsl.EventProducer.Transformation) {

/**
* @param f A function that is fed each event payload of type `A` and returns an
* async payload to emit, or `Optional.empty()` to filter the event from being produced.
*/
def registerAsyncMapper[A, B](
inputEventClass: Class[A],
f: JFunction[A, CompletionStage[Optional[B]]]): Transformation = {
Expand All @@ -57,11 +62,29 @@ final class Transformation private[akka] (private[akka] val delegate: scaladsl.E
delegate.registerAsyncMapper[A, B](event => f.apply(event).toScala.map(_.asScala)(ExecutionContexts.parasitic)))
}

/**
* @param f A function that is fed each event payload of type `A` and returns a
* payload to emit, or `Optional.empty()` to filter the event from being produced.
*/
def registerMapper[A, B](inputEventClass: Class[A], f: JFunction[A, Optional[B]]): Transformation = {
implicit val ct: ClassTag[A] = ClassTag(inputEventClass)
new Transformation(delegate.registerMapper[A, B](event => f.apply(event).asScala))
}

/**
* @param f A function that is fed each event envelope for payloads of type `A` and returns a
* payload to emit, or `Optional.empty()` to filter the event from being produced.
*/
def registerEnvelopeMapper[A, B](
inputEventClass: Class[A],
f: JFunction[EventEnvelope[A], Optional[B]]): Transformation = {
registerAsyncEnvelopeMapper[A, B](inputEventClass, envelope => CompletableFuture.completedFuture(f(envelope)))
}

/**
* @param f A function that is fed each event envelope for payloads of type `A` and returns an
* async payload to emit, or `Optional.empty()` to filter the event from being produced.
*/
def registerAsyncEnvelopeMapper[A, B](
inputEventClass: Class[A],
f: JFunction[EventEnvelope[A], CompletionStage[Optional[B]]]): Transformation = {
Expand All @@ -70,6 +93,11 @@ final class Transformation private[akka] (private[akka] val delegate: scaladsl.E
f.apply(envelope).toScala.map(_.asScala)(ExecutionContexts.parasitic)))
}

/**
* @param f A function that is fed each event payload, that did not match any other registered mappers, returns an
* async payload to emit, or `Optional.empty()` to filter the event from being produced. Replaces any previous "orElse"
* mapper defined.
*/
def registerAsyncOrElseMapper(f: AnyRef => CompletionStage[Optional[AnyRef]]): Transformation = {
new Transformation(
delegate.registerAsyncOrElseMapper(
Expand All @@ -79,10 +107,20 @@ final class Transformation private[akka] (private[akka] val delegate: scaladsl.E
.map(_.asScala)(ExecutionContexts.parasitic)))
}

/**
* @param f A function that is fed each event payload, that did not match any other registered mappers, returns a
* payload to emit, or `Optional.empty()` to filter the event from being produced. Replaces any previous "orElse"
* mapper defined.
*/
def registerOrElseMapper(f: AnyRef => Optional[AnyRef]): Transformation = {
new Transformation(delegate.registerOrElseMapper(event => f.apply(event.asInstanceOf[AnyRef]).asScala))
}

/**
* @param f A function that is fed each event envelope, that did not match any other registered mappers, returns an
* async payload to emit, or `Optional.empty()` to filter the event from being produced. Replaces any previous "orElse"
* mapper defined.
*/
def registerAsyncEnvelopeOrElseMapper(
f: JFunction[EventEnvelope[Any], CompletionStage[Optional[Any]]]): Transformation = {
new Transformation(delegate.registerAsyncEnvelopeOrElseMapper(envelope =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,32 +134,64 @@ object EventProducer {
private[akka] val orElse: EventEnvelope[Any] => Future[Option[Any]]) {

/**
* @param f A function that is fed each event, and the possible additional metadata
* @param f A function that is fed each event envelope where the payload is of type `A` and returns an
* async payload to emit, or `None` to filter the event from being produced.
*/
def registerAsyncEnvelopeMapper[A: ClassTag, B](f: EventEnvelope[A] => Future[Option[B]]): Transformation = {
val clazz = implicitly[ClassTag[A]].runtimeClass
new Transformation(mappers.updated(clazz, f.asInstanceOf[EventEnvelope[Any] => Future[Option[Any]]]), orElse)
}

/**
* @param f A function that is fed each event payload of type `A` and returns an
* async payload to emit, or `None` to filter the event from being produced.
*/
def registerAsyncMapper[A: ClassTag, B](f: A => Future[Option[B]]): Transformation = {
val clazz = implicitly[ClassTag[A]].runtimeClass
new Transformation(
mappers.updated(clazz, (envelope: EventEnvelope[Any]) => f(envelope.event.asInstanceOf[A])),
orElse)
}

/**
* @param f A function that is fed each event payload of type `A` and returns a
* payload to emit, or `None` to filter the event from being produced.
*/
def registerMapper[A: ClassTag, B](f: A => Option[B]): Transformation = {
registerAsyncMapper[A, B](event => Future.successful(f(event)))
}

/**
* @param f A function that is fed each event envelope where the payload is of type `A` and returns a
* payload to emit, or `None` to filter the event from being produced.
*/
def registerEnvelopeMapper[A: ClassTag, B](f: EventEnvelope[A] => Option[B]): Transformation = {
registerAsyncEnvelopeMapper[A, B](event => Future.successful(f(event)))
}

/**
* @param f A function that is fed each event payload, that did not match any other registered mappers, returns an
* async payload to emit, or `None` to filter the event from being produced. Replaces any previous "orElse"
* mapper defined.
*/
def registerAsyncOrElseMapper(f: Any => Future[Option[Any]]): Transformation = {
new Transformation(mappers, (envelope: EventEnvelope[Any]) => f(envelope.event))
}

/**
* @param f A function that is fed each event payload, that did not match any other registered mappers, returns a
* payload to emit, or `None` to filter the event from being produced. Replaces any previous "orElse"
* mapper defined.
*/
def registerOrElseMapper(f: Any => Option[Any]): Transformation = {
registerAsyncOrElseMapper(event => Future.successful(f(event)))
}

/**
* @param m A function that is fed each event envelope, that did not match any other registered mappers, returns a
* payload to emit, or `None` to filter the event from being produced. Replaces any previous "orElse"
* mapper defined.
*/
def registerAsyncEnvelopeOrElseMapper(m: EventEnvelope[Any] => Future[Option[Any]]): Transformation = {
new Transformation(mappers, m)
}
Expand Down

0 comments on commit 72614aa

Please sign in to comment.