diff --git a/akka-projection-grpc-tests/src/test/java/akka/projection/grpc/consumer/javadsl/EventProducerPushDestinationCompileTest.java b/akka-projection-grpc-tests/src/test/java/akka/projection/grpc/consumer/javadsl/EventProducerPushDestinationCompileTest.java index 5fd7af66a..ef750d2d0 100644 --- a/akka-projection-grpc-tests/src/test/java/akka/projection/grpc/consumer/javadsl/EventProducerPushDestinationCompileTest.java +++ b/akka-projection-grpc-tests/src/test/java/akka/projection/grpc/consumer/javadsl/EventProducerPushDestinationCompileTest.java @@ -83,7 +83,7 @@ public static void withFilters(ActorSystem system) { EventProducerPushDestination destination = EventProducerPushDestination.create("stream-id", protoDescriptors, system) .withConsumerFilters( - Arrays.asList(new ConsumerFilter.IncludeTopics(Collections.singleton("myhome/groundfloor/+/temperature"))) + Collections.singletonList(new ConsumerFilter.IncludeTopics(Collections.singleton("myhome/groundfloor/+/temperature"))) ); // #consumerFilters } @@ -94,7 +94,8 @@ public static void withTransformations(ActorSystem system) { EventProducerPushDestination.create("stream-id", protoDescriptors, system) .withTransformationForOrigin((String originId, Metadata metadata) -> Transformation.empty() - .registerPersistenceIdMapper(envelope -> envelope.persistenceId().replace("originalPrefix", "newPrefix")) + .registerPersistenceIdMapper(system, envelope -> + envelope.persistenceId().replace("originalPrefix", "newPrefix")) .registerTagMapper(String.class, envelope -> { Set newTags = new HashSet<>(); newTags.addAll(envelope.getTags()); diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/Transformation.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/Transformation.scala index cb82a90f7..8a003c101 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/Transformation.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/Transformation.scala @@ -15,6 +15,8 @@ import java.util.function.{ Function => JFunction } import scala.compat.java8.OptionConverters._ import scala.reflect.ClassTag +import akka.actor.typed.ActorSystem + object Transformation { /** @@ -49,8 +51,8 @@ final class Transformation private (val delegate: scaladsl.EventProducerPushDest * same incoming persistence id to the same stored persistence id to not introduce gaps in the sequence numbers * and break consuming projections. */ - def registerPersistenceIdMapper(f: JFunction[EventEnvelope[Any], String]): Transformation = { - new Transformation(delegate.registerPersistenceIdMapper(f.apply)) + def registerPersistenceIdMapper(system: ActorSystem[_], f: JFunction[EventEnvelope[Any], String]): Transformation = { + new Transformation(delegate.registerPersistenceIdMapper(f.apply)(system)) } /** diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/EventProducerPushDestination.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/EventProducerPushDestination.scala index d182bf73f..47510f53a 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/EventProducerPushDestination.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/EventProducerPushDestination.scala @@ -10,7 +10,6 @@ import akka.grpc.scaladsl.Metadata import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse import akka.persistence.FilteredPayload -import akka.persistence.query.Offset import akka.persistence.query.typed.EventEnvelope import akka.projection.grpc.consumer.ConsumerFilter.FilterCriteria import akka.projection.grpc.consumer.EventProducerPushDestinationSettings @@ -18,10 +17,12 @@ import akka.projection.grpc.internal.EventPusherConsumerServiceImpl import akka.projection.grpc.internal.ProtoAnySerialization.Prefer import akka.projection.grpc.internal.proto.EventConsumerServicePowerApiHandler import com.google.protobuf.Descriptors - import scala.collection.immutable import scala.reflect.ClassTag +import akka.persistence.Persistence +import akka.persistence.typed.PersistenceId + /** * A passive consumer service for event producer push that can be bound as a gRPC endpoint accepting active producers * pushing events, for example to run a projection piercing firewalls or NAT. Events are pushed directly into the @@ -86,7 +87,7 @@ object EventProducerPushDestination { val mapTags = { (eventEnvelope: EventEnvelope[Any]) => val newTags = f(eventEnvelope.asInstanceOf[EventEnvelope[A]]) if (newTags eq eventEnvelope.tags) eventEnvelope - else copyEnvelope(eventEnvelope)(tags = newTags) + else eventEnvelope.withTags(newTags) } appendMapper(clazz, mapTags) } @@ -96,11 +97,17 @@ object EventProducerPushDestination { * same incoming persistence id to the same stored persistence id to not introduce gaps in the sequence numbers * and break consuming projections. */ - def registerPersistenceIdMapper(f: EventEnvelope[Any] => String): Transformation = { + def registerPersistenceIdMapper(f: EventEnvelope[Any] => String)( + implicit system: ActorSystem[_]): Transformation = { val mapId = { (eventEnvelope: EventEnvelope[Any]) => val newPid = f(eventEnvelope) - if (newPid eq eventEnvelope.persistenceId) eventEnvelope - else copyEnvelope(eventEnvelope)(persistenceId = newPid) + if (newPid eq eventEnvelope.persistenceId) + eventEnvelope + else { + val entityType = PersistenceId.extractEntityType(newPid) + val slice = Persistence(system).sliceForPersistenceId(newPid) + eventEnvelope.withPersistenceId(newPid, entityType, slice) + } } // needs to be untyped since not mapping filtered events the same way will cause gaps in seqnrs new Transformation(typedMappers, untypedMappers.andThen(mapId)) @@ -126,7 +133,7 @@ object EventProducerPushDestination { else { val newMaybePayload = f(eventEnvelope.asInstanceOf[EventEnvelope[A]]) if (newMaybePayload eq eventEnvelope.eventOption) eventEnvelope - else copyEnvelope(eventEnvelope)(eventOption = newMaybePayload) + else eventEnvelope.withEventOption(newMaybePayload) } } appendMapper(clazz, mapPayload) @@ -141,7 +148,7 @@ object EventProducerPushDestination { else { val newMaybePayload = f(eventEnvelope) if (newMaybePayload eq eventEnvelope.eventOption) eventEnvelope - else copyEnvelope(eventEnvelope)(eventOption = newMaybePayload) + else eventEnvelope.withEventOption(newMaybePayload) } } new Transformation(typedMappers, untypedMappers.andThen(anyPayloadMapper)) @@ -163,30 +170,6 @@ object EventProducerPushDestination { untypedMappers) } - private def copyEnvelope(original: EventEnvelope[Any])( - offset: Offset = original.offset, - persistenceId: String = original.persistenceId, - sequenceNr: Long = original.sequenceNr, - eventOption: Option[Any] = original.eventOption, - timestamp: Long = original.timestamp, - eventMetadata: Option[Any] = original.eventMetadata, - entityType: String = original.entityType, - slice: Int = original.slice, - filtered: Boolean = original.filtered, - source: String = original.source, - tags: Set[String] = original.tags): EventEnvelope[Any] = - new EventEnvelope[Any]( - offset, - persistenceId, - sequenceNr, - eventOption, - timestamp, - eventMetadata, - entityType, - slice, - filtered, - source, - tags) } def grpcServiceHandler(eventConsumer: EventProducerPushDestination)(