Skip to content

Commit

Permalink
env with methods
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Sep 5, 2023
1 parent bf80c74 commit 85f3325
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static void withFilters(ActorSystem<?> system) {
EventProducerPushDestination destination =
EventProducerPushDestination.create("stream-id", protoDescriptors, system)
.withConsumerFilters(
Arrays.asList(new ConsumerFilter.IncludeTopics(Collections.singleton("myhome/groundfloor/+/temperature")))
Collections.singletonList(new ConsumerFilter.IncludeTopics(Collections.singleton("myhome/groundfloor/+/temperature")))
);
// #consumerFilters
}
Expand All @@ -94,7 +94,8 @@ public static void withTransformations(ActorSystem<?> system) {
EventProducerPushDestination.create("stream-id", protoDescriptors, system)
.withTransformationForOrigin((String originId, Metadata metadata) ->
Transformation.empty()
.registerPersistenceIdMapper(envelope -> envelope.persistenceId().replace("originalPrefix", "newPrefix"))
.registerPersistenceIdMapper(system, envelope ->
envelope.persistenceId().replace("originalPrefix", "newPrefix"))
.registerTagMapper(String.class, envelope -> {
Set<String> newTags = new HashSet<>();
newTags.addAll(envelope.getTags());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import java.util.function.{ Function => JFunction }
import scala.compat.java8.OptionConverters._
import scala.reflect.ClassTag

import akka.actor.typed.ActorSystem

object Transformation {

/**
Expand Down Expand Up @@ -49,8 +51,8 @@ final class Transformation private (val delegate: scaladsl.EventProducerPushDest
* same incoming persistence id to the same stored persistence id to not introduce gaps in the sequence numbers
* and break consuming projections.
*/
def registerPersistenceIdMapper(f: JFunction[EventEnvelope[Any], String]): Transformation = {
new Transformation(delegate.registerPersistenceIdMapper(f.apply))
def registerPersistenceIdMapper(system: ActorSystem[_], f: JFunction[EventEnvelope[Any], String]): Transformation = {
new Transformation(delegate.registerPersistenceIdMapper(f.apply)(system))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ import akka.grpc.scaladsl.Metadata
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.persistence.FilteredPayload
import akka.persistence.query.Offset
import akka.persistence.query.typed.EventEnvelope
import akka.projection.grpc.consumer.ConsumerFilter.FilterCriteria
import akka.projection.grpc.consumer.EventProducerPushDestinationSettings
import akka.projection.grpc.internal.EventPusherConsumerServiceImpl
import akka.projection.grpc.internal.ProtoAnySerialization.Prefer
import akka.projection.grpc.internal.proto.EventConsumerServicePowerApiHandler
import com.google.protobuf.Descriptors

import scala.collection.immutable
import scala.reflect.ClassTag

import akka.persistence.Persistence
import akka.persistence.typed.PersistenceId

/**
* A passive consumer service for event producer push that can be bound as a gRPC endpoint accepting active producers
* pushing events, for example to run a projection piercing firewalls or NAT. Events are pushed directly into the
Expand Down Expand Up @@ -86,7 +87,7 @@ object EventProducerPushDestination {
val mapTags = { (eventEnvelope: EventEnvelope[Any]) =>
val newTags = f(eventEnvelope.asInstanceOf[EventEnvelope[A]])
if (newTags eq eventEnvelope.tags) eventEnvelope
else copyEnvelope(eventEnvelope)(tags = newTags)
else eventEnvelope.withTags(newTags)
}
appendMapper(clazz, mapTags)
}
Expand All @@ -96,11 +97,17 @@ object EventProducerPushDestination {
* same incoming persistence id to the same stored persistence id to not introduce gaps in the sequence numbers
* and break consuming projections.
*/
def registerPersistenceIdMapper(f: EventEnvelope[Any] => String): Transformation = {
def registerPersistenceIdMapper(f: EventEnvelope[Any] => String)(
implicit system: ActorSystem[_]): Transformation = {
val mapId = { (eventEnvelope: EventEnvelope[Any]) =>
val newPid = f(eventEnvelope)
if (newPid eq eventEnvelope.persistenceId) eventEnvelope
else copyEnvelope(eventEnvelope)(persistenceId = newPid)
if (newPid eq eventEnvelope.persistenceId)
eventEnvelope
else {
val entityType = PersistenceId.extractEntityType(newPid)
val slice = Persistence(system).sliceForPersistenceId(newPid)
eventEnvelope.withPersistenceId(newPid, entityType, slice)
}
}
// needs to be untyped since not mapping filtered events the same way will cause gaps in seqnrs
new Transformation(typedMappers, untypedMappers.andThen(mapId))
Expand All @@ -126,7 +133,7 @@ object EventProducerPushDestination {
else {
val newMaybePayload = f(eventEnvelope.asInstanceOf[EventEnvelope[A]])
if (newMaybePayload eq eventEnvelope.eventOption) eventEnvelope
else copyEnvelope(eventEnvelope)(eventOption = newMaybePayload)
else eventEnvelope.withEventOption(newMaybePayload)
}
}
appendMapper(clazz, mapPayload)
Expand All @@ -141,7 +148,7 @@ object EventProducerPushDestination {
else {
val newMaybePayload = f(eventEnvelope)
if (newMaybePayload eq eventEnvelope.eventOption) eventEnvelope
else copyEnvelope(eventEnvelope)(eventOption = newMaybePayload)
else eventEnvelope.withEventOption(newMaybePayload)
}
}
new Transformation(typedMappers, untypedMappers.andThen(anyPayloadMapper))
Expand All @@ -163,30 +170,6 @@ object EventProducerPushDestination {
untypedMappers)
}

private def copyEnvelope(original: EventEnvelope[Any])(
offset: Offset = original.offset,
persistenceId: String = original.persistenceId,
sequenceNr: Long = original.sequenceNr,
eventOption: Option[Any] = original.eventOption,
timestamp: Long = original.timestamp,
eventMetadata: Option[Any] = original.eventMetadata,
entityType: String = original.entityType,
slice: Int = original.slice,
filtered: Boolean = original.filtered,
source: String = original.source,
tags: Set[String] = original.tags): EventEnvelope[Any] =
new EventEnvelope[Any](
offset,
persistenceId,
sequenceNr,
eventOption,
timestamp,
eventMetadata,
entityType,
slice,
filtered,
source,
tags)
}

def grpcServiceHandler(eventConsumer: EventProducerPushDestination)(
Expand Down

0 comments on commit 85f3325

Please sign in to comment.