From c50f8f3e898641c5fcb986bfce3401a93a08b8a7 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 6 Sep 2023 13:59:00 +0200 Subject: [PATCH] chore: use mapAsynPartitioned instead of own stage (#989) --- .../internal/ParallelUpdatesFlowSpec.scala | 101 ------------ .../ParallelUpdatesFlow.excludes | 4 + .../internal/ParallelUpdatesFlow.scala | 131 ---------------- .../internal/ReplicationImpl.scala | 148 +++++++++--------- 4 files changed, 79 insertions(+), 305 deletions(-) delete mode 100644 akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/internal/ParallelUpdatesFlowSpec.scala create mode 100644 akka-projection-grpc/src/main/mima-filters/1.5.0-M3.backwards.excludes/ParallelUpdatesFlow.excludes delete mode 100644 akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ParallelUpdatesFlow.scala diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/internal/ParallelUpdatesFlowSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/internal/ParallelUpdatesFlowSpec.scala deleted file mode 100644 index 19e53ebd7..000000000 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/internal/ParallelUpdatesFlowSpec.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright (C) 2009-2023 Lightbend Inc. - */ - -package akka.projection.grpc.replication.internal - -import akka.Done -import akka.actor.testkit.typed.TestException -import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import akka.persistence.query.typed.EventEnvelope -import akka.projection.internal.ProjectionContextImpl -import akka.stream.scaladsl.Keep -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import akka.stream.testkit.scaladsl.TestSource -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpecLike - -import scala.concurrent.Promise - -class ParallelUpdatesFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with Matchers { - - "The ParallelUpdatesFlow" should { - - "process envelopes for different persistence ids in parallel" in { - val processProbe = createTestProbe[(String, Promise[Done])]() - val (sourceProbe, resultF) = TestSource() - .viaMat(new ParallelUpdatesFlow[AnyRef](4)({ envelope => - val done = Promise[Done]() - processProbe ! ((envelope.persistenceId, done)) - done.future - }))(Keep.left) - .toMat(Sink.seq)(Keep.both) - .run() - - sourceProbe.sendNext(element("pid1")) - sourceProbe.sendNext(element("pid2")) - sourceProbe.sendNext(element("pid3")) - sourceProbe.sendNext(element("pid4")) - - val messages = processProbe.receiveMessages(4) - sourceProbe.expectNoMessage() // no demand until previous completed - messages.foreach(_._2.success(Done)) - - sourceProbe.sendNext(element("pid5")) - sourceProbe.sendNext(element("pid6")) - sourceProbe.sendNext(element("pid7")) - sourceProbe.sendNext(element("pid8")) - val messages2 = processProbe.receiveMessages(4) - messages2.foreach(_._2.success(Done)) - - sourceProbe.sendComplete() - resultF.futureValue should have size (8) - } - - "make sure to never have more than one envelope in flight for the same persistence id" in { - val processProbe = createTestProbe[(String, Promise[Done])]() - val (sourceProbe, resultF) = TestSource() - .viaMat(new ParallelUpdatesFlow[AnyRef](4)({ envelope => - val done = Promise[Done]() - processProbe ! ((envelope.persistenceId, done)) - done.future - }))(Keep.left) - .toMat(Sink.seq)(Keep.both) - .run() - - sourceProbe.sendNext(element("pid1")) - sourceProbe.sendNext(element("pid1")) - sourceProbe.expectNoMessage() // no demand until previous pid1 completed - processProbe.receiveMessage()._2.success(Done) - processProbe.receiveMessage()._2.success(Done) - - sourceProbe.sendNext(element("pid3")) - sourceProbe.sendNext(element("pid4")) - - processProbe.receiveMessages(2).foreach(_._2.success(Done)) - - sourceProbe.sendComplete() - resultF.futureValue should have size (4) - } - - "handle immediate complete" in { - Source.empty - .via(new ParallelUpdatesFlow[AnyRef](4)({ _ => throw TestException("boom") })) - .runWith(Sink.ignore) - .futureValue - } - - "fail if future throws" in { - Source - .single(element("pid1")) - .via(new ParallelUpdatesFlow[AnyRef](4)({ _ => throw TestException("boom") })) - .runWith(Sink.ignore) - .failed - .futureValue - } - } - - private def element(pid: String) = - (EventEnvelope[AnyRef](null, pid, -1L, null, -1L, "", 0), ProjectionContextImpl(null, null, null, -1)) -} diff --git a/akka-projection-grpc/src/main/mima-filters/1.5.0-M3.backwards.excludes/ParallelUpdatesFlow.excludes b/akka-projection-grpc/src/main/mima-filters/1.5.0-M3.backwards.excludes/ParallelUpdatesFlow.excludes new file mode 100644 index 000000000..c8ea43390 --- /dev/null +++ b/akka-projection-grpc/src/main/mima-filters/1.5.0-M3.backwards.excludes/ParallelUpdatesFlow.excludes @@ -0,0 +1,4 @@ +# internals +ProblemFilters.exclude[MissingClassProblem]("akka.projection.grpc.replication.internal.ParallelUpdatesFlow") +ProblemFilters.exclude[MissingClassProblem]("akka.projection.grpc.replication.internal.ParallelUpdatesFlow$") +ProblemFilters.exclude[MissingClassProblem]("akka.projection.grpc.replication.internal.ParallelUpdatesFlow$Holder") diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ParallelUpdatesFlow.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ParallelUpdatesFlow.scala deleted file mode 100644 index f92ce22d9..000000000 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ParallelUpdatesFlow.scala +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright (C) 2009-2023 Lightbend Inc. - */ - -package akka.projection.grpc.replication.internal - -import akka.Done -import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts -import akka.persistence.query.typed.EventEnvelope -import akka.projection.ProjectionContext -import akka.stream.Attributes -import akka.stream.FlowShape -import akka.stream.Inlet -import akka.stream.Outlet -import akka.stream.stage.GraphStage -import akka.stream.stage.GraphStageLogic -import akka.stream.stage.InHandler -import akka.stream.stage.OutHandler - -import scala.collection.mutable -import scala.concurrent.Future -import scala.util.Failure -import scala.util.Success -import scala.util.Try - -/** - * INTERNAL API - */ -@InternalApi -private[akka] object ParallelUpdatesFlow { - final class Holder[T](val element: (EventEnvelope[T], ProjectionContext), var completed: Boolean) { - def persistenceId: String = element._1.persistenceId - def envelope: EventEnvelope[T] = element._1 - } -} - -/** - * INTERNAL API - */ -@InternalApi -private[akka] final class ParallelUpdatesFlow[T](parallelism: Int)(f: EventEnvelope[T] => Future[Done]) - extends GraphStage[FlowShape[(EventEnvelope[T], ProjectionContext), (EventEnvelope[T], ProjectionContext)]] { - import ParallelUpdatesFlow._ - - // Simpler version of MapAsyncPartitioned until that is ready, better than just mapAsync(1) but second element - // for the same persistence id will block pulling elements for other pids until the original one has completed - // FIXME replace with MapAsyncPartitioned once available - - val in = Inlet[(EventEnvelope[T], ProjectionContext)]("in") - val out = Outlet[(EventEnvelope[T], ProjectionContext)]("out") - val shape = FlowShape(in, out) - - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - - private var blockedByInFlight: Option[Holder[T]] = None - private val inFlight = mutable.Queue[Holder[T]]() - - private val onCompleteCallback = getAsyncCallback(onComplete).invoke(_) - - private def onComplete(result: Try[String]): Unit = result match { - case Success(persistenceId) => - inFlight.find(_.persistenceId == persistenceId).get.completed = true - emitHeadIfPossible() - - case Failure(ex) => throw ex - } - - setHandler( - in, - new InHandler { - override def onPush(): Unit = { - val holder = new Holder[T](grab(in), false) - if (inFlight.exists(_.persistenceId == holder.persistenceId)) { - blockedByInFlight = Some(holder) - } else { - inFlight.enqueue(holder) - processElement(holder) - pullNextIfPossible() - } - } - - override def onUpstreamFinish(): Unit = { - if (inFlight.isEmpty) completeStage() - // else keep going and complete once queue is empty - } - }) - - setHandler(out, new OutHandler { - override def onPull(): Unit = { - if (inFlight.nonEmpty && inFlight.head.completed) { - emitHeadIfPossible() - } else { - pullNextIfPossible() - } - } - }) - - private def processElement(holder: Holder[T]): Unit = { - f(holder.envelope) - .map(_ => holder.persistenceId)(ExecutionContexts.parasitic) - .onComplete(onCompleteCallback)(ExecutionContexts.parasitic) - } - - private def emitHeadIfPossible(): Unit = { - if (inFlight.head.completed && isAvailable(out)) { - val head = inFlight.dequeue() - push(out, head.element) - blockedByInFlight match { - case Some(blocked) => - if (blocked.persistenceId == head.persistenceId) { - // we're now unblocked - blockedByInFlight = None - processElement(blocked) - inFlight.enqueue(blocked) - pullNextIfPossible() - } - case None => - pullNextIfPossible() - } - if (isClosed(in) && inFlight.isEmpty) completeStage() - } - } - - private def pullNextIfPossible(): Unit = { - if (blockedByInFlight.isEmpty && inFlight.size < parallelism && !isClosed(in) && !hasBeenPulled(in)) { - pull(in) - } - } - } -} diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala index d137ed12d..7ff123045 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala @@ -4,6 +4,9 @@ package akka.projection.grpc.replication.internal +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + import akka.Done import akka.NotUsed import akka.actor.ExtendedActorSystem @@ -11,6 +14,7 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi import akka.cluster.ClusterActorRefProvider +import akka.cluster.sharding.typed.ClusterShardingSettings import akka.cluster.sharding.typed.ReplicatedEntity import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.sharding.typed.scaladsl.ClusterSharding @@ -41,10 +45,6 @@ import akka.projection.grpc.replication.scaladsl.ReplicationSettings import akka.stream.scaladsl.FlowWithContext import akka.util.Timeout import org.slf4j.LoggerFactory -import scala.concurrent.ExecutionContext -import scala.concurrent.Future - -import akka.cluster.sharding.typed.ClusterShardingSettings /** * INTERNAL API @@ -157,77 +157,79 @@ private[akka] object ReplicationImpl { case Some(role) => defaultWithShardingSettings.withRole(role) } } - ShardedDaemonProcess(system).init(projectionName, remoteReplica.numberOfConsumers, { idx => - val sliceRange = sliceRanges(idx) - val projectionKey = s"${sliceRange.min}-${sliceRange.max}" - val projectionId = ProjectionId(projectionName, projectionKey) - - val replicationFlow: FlowWithContext[EventEnvelope[AnyRef], ProjectionContext, Done, ProjectionContext, NotUsed] = - FlowWithContext[EventEnvelope[AnyRef], ProjectionContext] - .via(new ParallelUpdatesFlow[AnyRef](settings.parallelUpdates)({ - envelope => - - if (!envelope.filtered) { - envelope.eventMetadata match { - case Some(replicatedEventMetadata: ReplicatedEventMetadata) => - // skipping events originating from other replicas is handled by filtering but for good measure - require(replicatedEventMetadata.originReplica == remoteReplica.replicaId) - - val replicationId = ReplicationId.fromString(envelope.persistenceId) - val destinationReplicaId = replicationId.withReplica(settings.selfReplicaId) - val entityRef = - entityRefFactory(destinationReplicaId.entityId).asInstanceOf[EntityRef[PublishedEvent]] - if (log.isTraceEnabled) { - log.traceN( - "[{}] forwarding event originating on dc [{}] to [{}] (origin seq_nr [{}]): [{}]", - projectionKey, - replicatedEventMetadata.originReplica, - destinationReplicaId.persistenceId.id, - envelope.sequenceNr, - replicatedEventMetadata.version) - } - val askResult = entityRef.ask[Done](replyTo => - PublishedEventImpl( - replicationId.persistenceId, - replicatedEventMetadata.originSequenceNr, - envelope.event, - envelope.timestamp, - Some(new ReplicatedPublishedEventMetaData( + ShardedDaemonProcess(system).init(projectionName, remoteReplica.numberOfConsumers, { + idx => + val sliceRange = sliceRanges(idx) + val projectionKey = s"${sliceRange.min}-${sliceRange.max}" + val projectionId = ProjectionId(projectionName, projectionKey) + + val replicationFlow + : FlowWithContext[EventEnvelope[AnyRef], ProjectionContext, Done, ProjectionContext, NotUsed] = + FlowWithContext[EventEnvelope[AnyRef], ProjectionContext] + .mapAsyncPartitioned(parallelism = settings.parallelUpdates, perPartition = 1)(envelope => + envelope.persistenceId) { + case (envelope, _) => + if (!envelope.filtered) { + envelope.eventMetadata match { + case Some(replicatedEventMetadata: ReplicatedEventMetadata) => + // skipping events originating from other replicas is handled by filtering but for good measure + require(replicatedEventMetadata.originReplica == remoteReplica.replicaId) + + val replicationId = ReplicationId.fromString(envelope.persistenceId) + val destinationReplicaId = replicationId.withReplica(settings.selfReplicaId) + val entityRef = + entityRefFactory(destinationReplicaId.entityId).asInstanceOf[EntityRef[PublishedEvent]] + if (log.isTraceEnabled) { + log.traceN( + "[{}] forwarding event originating on dc [{}] to [{}] (origin seq_nr [{}]): [{}]", + projectionKey, replicatedEventMetadata.originReplica, - replicatedEventMetadata.version)), - Some(replyTo))) - askResult.failed.foreach(error => - log.warn( - s"Failing replication stream [$projectionName/$projectionKey] from [${remoteReplica.replicaId.id}], event pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]", - error)) - askResult - - case unexpected => - throw new IllegalArgumentException( - s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" + - ", is the remote entity really a Replicated Event Sourced Entity?") + destinationReplicaId.persistenceId.id, + envelope.sequenceNr, + replicatedEventMetadata.version) + } + val askResult = entityRef.ask[Done](replyTo => + PublishedEventImpl( + replicationId.persistenceId, + replicatedEventMetadata.originSequenceNr, + envelope.event, + envelope.timestamp, + Some(new ReplicatedPublishedEventMetaData( + replicatedEventMetadata.originReplica, + replicatedEventMetadata.version)), + Some(replyTo))) + askResult.failed.foreach(error => + log.warn( + s"Failing replication stream [$projectionName/$projectionKey] from [${remoteReplica.replicaId.id}], event pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]", + error)) + askResult + + case unexpected => + throw new IllegalArgumentException( + s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" + + ", is the remote entity really a Replicated Event Sourced Entity?") + } + } else { + // Events not originating on sending side already are filtered/have no payload and end up here + if (log.isTraceEnabled) + log.traceN( + "[{}] ignoring filtered event from replica [{}] (pid [{}], seq_nr [{}])", + projectionKey, + remoteReplica.replicaId, + envelope.persistenceId, + envelope.sequenceNr) + Future.successful(Done) } - } else { - // Events not originating on sending side already are filtered/have no payload and end up here - if (log.isTraceEnabled) - log.traceN( - "[{}] ignoring filtered event from replica [{}] (pid [{}], seq_nr [{}])", - projectionKey, - remoteReplica.replicaId, - envelope.persistenceId, - envelope.sequenceNr) - Future.successful(Done) - } - })) - .map(_ => Done) - - val sourceProvider = EventSourcedProvider.eventsBySlices[AnyRef]( - system, - eventsBySlicesQuery, - eventsBySlicesQuery.streamId, - sliceRange.min, - sliceRange.max) - ProjectionBehavior(settings.projectionProvider(projectionId, sourceProvider, replicationFlow, system)) + } + .map(_ => Done) + + val sourceProvider = EventSourcedProvider.eventsBySlices[AnyRef]( + system, + eventsBySlicesQuery, + eventsBySlicesQuery.streamId, + sliceRange.min, + sliceRange.max) + ProjectionBehavior(settings.projectionProvider(projectionId, sourceProvider, replicationFlow, system)) }, shardedDaemonProcessSettings, Some(ProjectionBehavior.Stop)) }