From 4e09c7bf6965ad410360b61f7c77f8cdfae64f82 Mon Sep 17 00:00:00 2001 From: Scala Steward <43047562+scala-steward@users.noreply.github.com> Date: Wed, 4 Mar 2020 07:31:24 +0100 Subject: [PATCH] Update zio, zio-streams, zio-test, ... to 1.0.0-RC18 (#53) * Update zio, zio-streams, zio-test, ... to 1.0.0-RC18 * Update code for ZIO RC18 Co-authored-by: Pierre Ricadat --- README.md | 65 +++--- build.sbt | 8 +- src/main/scala/zio/akka/cluster/Cluster.scala | 28 ++- .../zio/akka/cluster/pubsub/PubSub.scala | 14 +- .../zio/akka/cluster/sharding/Sharding.scala | 14 +- .../scala/zio/akka/cluster/ClusterSpec.scala | 69 +++---- .../zio/akka/cluster/pubsub/PubSubSpec.scala | 162 +++++++-------- .../akka/cluster/sharding/ShardingSpec.scala | 194 +++++++++--------- 8 files changed, 269 insertions(+), 285 deletions(-) diff --git a/README.md b/README.md index a001053..d4d05dd 100644 --- a/README.md +++ b/README.md @@ -34,13 +34,13 @@ See [Akka Documentation](https://doc.akka.io/docs/akka/current/cluster-usage.htm You can also manually join a cluster using `Cluster.join`. ```scala -def join(seedNodes: List[Address]): ZIO[ActorSystem, Throwable, Unit] +def join(seedNodes: List[Address]): ZIO[Has[ActorSystem], Throwable, Unit] ``` It's possible to get the status of the cluster by calling `Cluster.clusterState` ```scala -val clusterState: ZIO[ActorSystem, Throwable, CurrentClusterState] +val clusterState: ZIO[Has[ActorSystem], Throwable, CurrentClusterState] ``` To monitor the cluster and be informed of changes (e.g. new members, member unreachable, etc), use `Cluster.clusterEvents`. @@ -50,13 +50,13 @@ To unsubscribe, simply `shutdown` the queue. `initialStateAsEvents` indicates if you want to receive previous cluster events leading to the current state, or only future events. ```scala -def clusterEvents(initialStateAsEvents: Boolean = false): ZIO[ActorSystem, Throwable, Queue[ClusterDomainEvent]] +def clusterEvents(initialStateAsEvents: Boolean = false): ZIO[Has[ActorSystem], Throwable, Queue[ClusterDomainEvent]] ``` Finally, you can leave the current cluster using `Cluster.leave`. ```scala -val leave: ZIO[ActorSystem, Throwable, Unit] +val leave: ZIO[Has[ActorSystem], Throwable, Unit] ``` ### Akka PubSub @@ -71,7 +71,7 @@ See [Akka Documentation](https://doc.akka.io/docs/akka/current/distributed-pub-s To create a `PubSub` object which can both publish and subscribe, use `PubSub.createPubSub`. ```scala -def createPubSub[A]: ZIO[ActorSystem, Throwable, PubSub[A]] +def createPubSub[A]: ZIO[Has[ActorSystem], Throwable, PubSub[A]] ``` There are also less powerful variants `PubSub.createPublisher` if you only need to publish and `PubSub.createSubscriber` if you only need to subscribe. @@ -107,16 +107,18 @@ This library wraps messages inside of a `zio.akka.cluster.pubsub.MessageEnvelope ```scala import akka.actor.ActorSystem -import zio.Task +import zio.{ Has, Managed, Task, ZLayer } import zio.akka.cluster.pubsub.PubSub -for { - actorSystem <- Task(ActorSystem("Test")) - pubSub <- PubSub.createPubSub[String].provide(actorSystem) - queue <- pubSub.listen("my-topic") - _ <- pubSub.publish("my-topic", "yo") - firstMsg <- queue.take -} yield firstMsg +val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] = + ZLayer.fromManaged(Managed.make(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either)) + +(for { + pubSub <- PubSub.createPubSub[String] + queue <- pubSub.listen("my-topic") + _ <- pubSub.publish("my-topic", "yo") + firstMsg <- queue.take +} yield firstMsg).provideLayer(actorSystem) ``` ### Akka Cluster Sharding @@ -137,7 +139,7 @@ def start[Msg, State]( name: String, onMessage: Msg => ZIO[Entity[State], Nothing, Unit], numberOfShards: Int = 100 - ): ZIO[ActorSystem, Throwable, Sharding[Msg]] + ): ZIO[Has[ActorSystem], Throwable, Sharding[Msg]] ``` It requires: @@ -167,20 +169,23 @@ This library wraps messages inside of a `zio.akka.cluster.sharding.MessageEnvelo ```scala import akka.actor.ActorSystem -import zio.{Task, ZIO} -import zio.akka.cluster.sharding.{Entity, Sharding} - -for { - actorSystem <- Task(ActorSystem("Test")) - behavior = (msg: String) => msg match { - case "+" => ZIO.accessM[Entity[Int]](_.state.update(x => Some(x.getOrElse(0) + 1))).unit - case "-" => ZIO.accessM[Entity[Int]](_.state.update(x => Some(x.getOrElse(0) - 1))).unit - case _ => ZIO.unit - } - sharding <- Sharding.start("session", behavior).provide(actorSystem) - entityId = "1" - _ <- sharding.send(entityId, "+") - _ <- sharding.send(entityId, "+") - _ <- sharding.send(entityId, "-") -} yield () +import zio.akka.cluster.sharding.{ Entity, Sharding } +import zio.{ Has, Managed, Task, ZIO, ZLayer } + +val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] = + ZLayer.fromManaged(Managed.make(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either)) + +val behavior: String => ZIO[Entity[Int], Nothing, Unit] = { + case "+" => ZIO.accessM[Entity[Int]](_.state.update(x => Some(x.getOrElse(0) + 1))) + case "-" => ZIO.accessM[Entity[Int]](_.state.update(x => Some(x.getOrElse(0) - 1))) + case _ => ZIO.unit +} + +(for { + sharding <- Sharding.start("session", behavior) + entityId = "1" + _ <- sharding.send(entityId, "+") + _ <- sharding.send(entityId, "+") + _ <- sharding.send(entityId, "-") +} yield ()).provideLayer(actorSystem) ``` diff --git a/build.sbt b/build.sbt index 210d81e..5e4c8c3 100644 --- a/build.sbt +++ b/build.sbt @@ -24,12 +24,12 @@ developers := List( ) libraryDependencies ++= Seq( - "dev.zio" %% "zio" % "1.0.0-RC17", - "dev.zio" %% "zio-streams" % "1.0.0-RC17", + "dev.zio" %% "zio" % "1.0.0-RC18", + "dev.zio" %% "zio-streams" % "1.0.0-RC18", "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.29", "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.29", - "dev.zio" %% "zio-test" % "1.0.0-RC17" % "test", - "dev.zio" %% "zio-test-sbt" % "1.0.0-RC17" % "test", + "dev.zio" %% "zio-test" % "1.0.0-RC18" % "test", + "dev.zio" %% "zio-test-sbt" % "1.0.0-RC18" % "test", compilerPlugin("org.typelevel" %% "kind-projector" % "0.10.3"), compilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1") ) diff --git a/src/main/scala/zio/akka/cluster/Cluster.scala b/src/main/scala/zio/akka/cluster/Cluster.scala index 43e0be9..6745a23 100644 --- a/src/main/scala/zio/akka/cluster/Cluster.scala +++ b/src/main/scala/zio/akka/cluster/Cluster.scala @@ -1,28 +1,22 @@ package zio.akka.cluster import akka.actor.{ Actor, ActorSystem, Address, PoisonPill, Props } -import akka.cluster.ClusterEvent.{ - ClusterDomainEvent, - CurrentClusterState, - InitialStateAsEvents, - InitialStateAsSnapshot, - SubscriptionInitialStateMode -} +import akka.cluster.ClusterEvent._ import zio.Exit.{ Failure, Success } -import zio.{ Queue, Runtime, Task, ZIO } +import zio.{ Has, Queue, Runtime, Task, ZIO } object Cluster { - private val cluster: ZIO[ActorSystem, Throwable, akka.cluster.Cluster] = + private val cluster: ZIO[Has[ActorSystem], Throwable, akka.cluster.Cluster] = for { - actorSystem <- ZIO.environment[ActorSystem] + actorSystem <- ZIO.access[Has[ActorSystem]](_.get) cluster <- Task(akka.cluster.Cluster(actorSystem)) } yield cluster /** * Returns the current state of the cluster. */ - val clusterState: ZIO[ActorSystem, Throwable, CurrentClusterState] = + val clusterState: ZIO[Has[ActorSystem], Throwable, CurrentClusterState] = for { cluster <- cluster state <- Task(cluster.state) @@ -31,7 +25,7 @@ object Cluster { /** * Joins a cluster using the provided seed nodes. */ - def join(seedNodes: List[Address]): ZIO[ActorSystem, Throwable, Unit] = + def join(seedNodes: List[Address]): ZIO[Has[ActorSystem], Throwable, Unit] = for { cluster <- cluster _ <- Task(cluster.joinSeedNodes(seedNodes)) @@ -40,7 +34,7 @@ object Cluster { /** * Leaves the current cluster. */ - val leave: ZIO[ActorSystem, Throwable, Unit] = + val leave: ZIO[Has[ActorSystem], Throwable, Unit] = for { cluster <- cluster _ <- Task(cluster.leave(cluster.selfAddress)) @@ -52,7 +46,9 @@ object Cluster { * To unsubscribe, use `queue.shutdown`. * To use a bounded queue, see `clusterEventsWith`. */ - def clusterEvents(initialStateAsEvents: Boolean = false): ZIO[ActorSystem, Throwable, Queue[ClusterDomainEvent]] = + def clusterEvents( + initialStateAsEvents: Boolean = false + ): ZIO[Has[ActorSystem], Throwable, Queue[ClusterDomainEvent]] = Queue.unbounded[ClusterDomainEvent].tap(clusterEventsWith(_, initialStateAsEvents)) /** @@ -63,10 +59,10 @@ object Cluster { def clusterEventsWith( queue: Queue[ClusterDomainEvent], initialStateAsEvents: Boolean = false - ): ZIO[ActorSystem, Throwable, Unit] = + ): ZIO[Has[ActorSystem], Throwable, Unit] = for { rts <- Task.runtime - actorSystem <- ZIO.environment[ActorSystem] + actorSystem <- ZIO.access[Has[ActorSystem]](_.get) _ <- Task(actorSystem.actorOf(Props(new SubscriberActor(rts, queue, initialStateAsEvents)))) } yield () diff --git a/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala b/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala index 56d143d..2aacde1 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala @@ -3,7 +3,7 @@ package zio.akka.cluster.pubsub import akka.actor.{ ActorRef, ActorSystem } import akka.cluster.pubsub.DistributedPubSub import zio.akka.cluster.pubsub.impl.{ PublisherImpl, SubscriberImpl } -import zio.{ Queue, Task, ZIO } +import zio.{ Has, Queue, Task, ZIO } /** * A `Publisher[A]` is able to send messages of type `A` through Akka PubSub. @@ -35,9 +35,9 @@ object PubSub { /** * Creates a new `Publisher[A]`. */ - def createPublisher[A]: ZIO[ActorSystem, Throwable, Publisher[A]] = + def createPublisher[A]: ZIO[Has[ActorSystem], Throwable, Publisher[A]] = for { - actorSystem <- ZIO.environment[ActorSystem] + actorSystem <- ZIO.access[Has[ActorSystem]](_.get) mediator <- getMediator(actorSystem) } yield new Publisher[A] with PublisherImpl[A] { override val getMediator: ActorRef = mediator @@ -46,9 +46,9 @@ object PubSub { /** * Creates a new `Subscriber[A]`. */ - def createSubscriber[A]: ZIO[ActorSystem, Throwable, Subscriber[A]] = + def createSubscriber[A]: ZIO[Has[ActorSystem], Throwable, Subscriber[A]] = for { - actorSystem <- ZIO.environment[ActorSystem] + actorSystem <- ZIO.access[Has[ActorSystem]](_.get) mediator <- getMediator(actorSystem) } yield new Subscriber[A] with SubscriberImpl[A] { override val getActorSystem: ActorSystem = actorSystem @@ -58,9 +58,9 @@ object PubSub { /** * Creates a new `PubSub[A]`. */ - def createPubSub[A]: ZIO[ActorSystem, Throwable, PubSub[A]] = + def createPubSub[A]: ZIO[Has[ActorSystem], Throwable, PubSub[A]] = for { - actorSystem <- ZIO.environment[ActorSystem] + actorSystem <- ZIO.access[Has[ActorSystem]](_.get) mediator <- getMediator(actorSystem) } yield new PubSub[A] with PublisherImpl[A] with SubscriberImpl[A] { override val getActorSystem: ActorSystem = actorSystem diff --git a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala index 18d5bd4..4afcd37 100644 --- a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala +++ b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala @@ -4,7 +4,7 @@ import akka.actor.{ Actor, ActorContext, ActorRef, ActorSystem, PoisonPill, Prop import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings } import zio.akka.cluster.sharding import zio.akka.cluster.sharding.MessageEnvelope.{ MessagePayload, PoisonPillPayload } -import zio.{ Ref, Runtime, Task, UIO, ZIO } +import zio.{ Has, Ref, Runtime, Task, UIO, ZIO } /** * A `Sharding[M]` is able to send messages of type `M` to a sharded entity or to stop one. @@ -31,10 +31,10 @@ object Sharding { name: String, onMessage: Msg => ZIO[Entity[State], Nothing, Unit], numberOfShards: Int = 100 - ): ZIO[ActorSystem, Throwable, Sharding[Msg]] = + ): ZIO[Has[ActorSystem], Throwable, Sharding[Msg]] = for { - rts <- ZIO.runtime[ActorSystem] - actorSystem = rts.environment + rts <- ZIO.runtime[Has[ActorSystem]] + actorSystem = rts.environment.get shardingRegion <- Task( ClusterSharding(actorSystem).start( typeName = name, @@ -68,10 +68,10 @@ object Sharding { name: String, role: Option[String], numberOfShards: Int = 100 - ): ZIO[ActorSystem, Throwable, Sharding[Msg]] = + ): ZIO[Has[ActorSystem], Throwable, Sharding[Msg]] = for { - rts <- ZIO.runtime[ActorSystem] - actorSystem = rts.environment + rts <- ZIO.runtime[Has[ActorSystem]] + actorSystem = rts.environment.get shardingRegion <- Task( ClusterSharding(actorSystem).startProxy( typeName = name, diff --git a/src/test/scala/zio/akka/cluster/ClusterSpec.scala b/src/test/scala/zio/akka/cluster/ClusterSpec.scala index eb357c3..93976be 100644 --- a/src/test/scala/zio/akka/cluster/ClusterSpec.scala +++ b/src/test/scala/zio/akka/cluster/ClusterSpec.scala @@ -3,44 +3,43 @@ package zio.akka.cluster import akka.actor.ActorSystem import akka.cluster.ClusterEvent.MemberLeft import com.typesafe.config.{ Config, ConfigFactory } -import zio.{ Managed, Task } -import zio.test._ import zio.test.Assertion._ +import zio.test._ +import zio.test.environment.TestEnvironment +import zio.{ Managed, Task, ZLayer } + +object ClusterSpec extends DefaultRunnableSpec { -object ClusterSpec - extends DefaultRunnableSpec( - suite("ClusterSpec")( - testM("receive cluster events") { - val config: Config = ConfigFactory.parseString(s""" - |akka { - | actor { - | provider = "cluster" - | } - | remote { - | netty.tcp { - | hostname = "127.0.0.1" - | port = 2551 - | } - | } - | cluster { - | seed-nodes = ["akka.tcp://Test@127.0.0.1:2551"] - | } - |} + def spec: ZSpec[TestEnvironment, Any] = + suite("ClusterSpec")( + testM("receive cluster events") { + val config: Config = ConfigFactory.parseString(s""" + |akka { + | actor { + | provider = "cluster" + | } + | remote { + | netty.tcp { + | hostname = "127.0.0.1" + | port = 2551 + | } + | } + | cluster { + | seed-nodes = ["akka.tcp://Test@127.0.0.1:2551"] + | } + |} """.stripMargin) - val actorSystem: Managed[Throwable, ActorSystem] = - Managed.make(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + val actorSystem: Managed[Throwable, ActorSystem] = + Managed.make(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) - assertM( - actorSystem.use(actorSystem => - (for { - queue <- Cluster.clusterEvents() - _ <- Cluster.leave - item <- queue.take - } yield item).provide(actorSystem) - ), - isSubtype[MemberLeft](anything) - ) - } - ) + assertM( + for { + queue <- Cluster.clusterEvents() + _ <- Cluster.leave + item <- queue.take + } yield item + )(isSubtype[MemberLeft](anything)).provideLayer(ZLayer.fromManaged(actorSystem)) + } ) +} diff --git a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala index def0e3a..e32e76c 100644 --- a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala +++ b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala @@ -2,91 +2,12 @@ package zio.akka.cluster.pubsub import akka.actor.ActorSystem import com.typesafe.config.{ Config, ConfigFactory } -import zio.{ Managed, Task } -import zio.akka.cluster.pubsub.PubSubSpecUtil._ import zio.test.Assertion._ import zio.test._ +import zio.test.environment.TestEnvironment +import zio.{ Has, Managed, Task, ZLayer } -object PubSubSpec - extends DefaultRunnableSpec( - suite("PubSubSpec")( - testM("send and receive a single message") { - assertM( - pubSub.use(pubSub => - for { - queue <- pubSub.listen(topic) - _ <- pubSub.publish(topic, msg) - item <- queue.take - } yield item - ), - equalTo(msg) - ) - }, - testM("support multiple subscribers") { - assertM( - pubSub.use(pubSub => - for { - queue1 <- pubSub.listen(topic) - queue2 <- pubSub.listen(topic) - _ <- pubSub.publish(topic, msg) - item1 <- queue1.take - item2 <- queue2.take - } yield (item1, item2) - ), - equalTo((msg, msg)) - ) - }, - testM("support multiple publishers") { - val msg2 = "what's up" - assertM( - pubSub.use(pubSub => - for { - queue <- pubSub.listen(topic) - _ <- pubSub.publish(topic, msg) - _ <- pubSub.publish(topic, msg2) - item1 <- queue.take - item2 <- queue.take - } yield (item1, item2) - ), - equalTo((msg, msg2)) - ) - }, - testM("send only one message to a single group") { - val group = "group" - assertM( - pubSub.use(pubSub => - for { - queue1 <- pubSub.listen(topic, Some(group)) - queue2 <- pubSub.listen(topic, Some(group)) - _ <- pubSub.publish(topic, msg, sendOneMessageToEachGroup = true) - item <- queue1.take race queue2.take - sizes <- queue1.size zip queue2.size - } yield (item, sizes) - ), - equalTo((msg, (0, 0))) - ) - }, - testM("send one message to each group") { - val group1 = "group1" - val group2 = "group2" - assertM( - pubSub.use(pubSub => - for { - queue1 <- pubSub.listen(topic, Some(group1)) - queue2 <- pubSub.listen(topic, Some(group2)) - _ <- pubSub.publish(topic, msg, sendOneMessageToEachGroup = true) - item1 <- queue1.take - item2 <- queue2.take - } yield List(item1, item2) - ), - equalTo(List(msg, msg)) - ) - } - ), - List(TestAspect.executionStrategy(ExecutionStrategy.Sequential)) - ) - -object PubSubSpecUtil { +object PubSubSpec extends DefaultRunnableSpec { val config: Config = ConfigFactory.parseString(s""" |akka { @@ -105,11 +26,80 @@ object PubSubSpecUtil { |} """.stripMargin) - val actorSystem: Managed[Throwable, ActorSystem] = - Managed.make(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) - - val pubSub: Managed[Throwable, PubSub[String]] = actorSystem.mapM(PubSub.createPubSub[String].provide) + val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] = + ZLayer.fromManaged( + Managed.make(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + ) val topic = "topic" val msg = "yo" + + def spec: ZSpec[TestEnvironment, Any] = + suite("PubSubSpec")( + testM("send and receive a single message") { + assertM( + for { + pubSub <- PubSub.createPubSub[String] + queue <- pubSub.listen(topic) + _ <- pubSub.publish(topic, msg) + item <- queue.take + } yield item + )(equalTo(msg)).provideLayer(actorSystem) + }, + testM("support multiple subscribers") { + assertM( + for { + pubSub <- PubSub.createPubSub[String] + queue1 <- pubSub.listen(topic) + queue2 <- pubSub.listen(topic) + _ <- pubSub.publish(topic, msg) + item1 <- queue1.take + item2 <- queue2.take + } yield (item1, item2) + )(equalTo((msg, msg))).provideLayer(actorSystem) + }, + testM("support multiple publishers") { + val msg2 = "what's up" + assertM( + for { + pubSub <- PubSub.createPubSub[String] + queue <- pubSub.listen(topic) + _ <- pubSub.publish(topic, msg) + _ <- pubSub.publish(topic, msg2) + item1 <- queue.take + item2 <- queue.take + } yield (item1, item2) + )(equalTo((msg, msg2))).provideLayer(actorSystem) + }, + testM("send only one message to a single group") { + val group = "group" + assertM( + for { + pubSub <- PubSub.createPubSub[String] + queue1 <- pubSub.listen(topic, Some(group)) + queue2 <- pubSub.listen(topic, Some(group)) + _ <- pubSub.publish(topic, msg, sendOneMessageToEachGroup = true) + item <- queue1.take race queue2.take + sizes <- queue1.size zip queue2.size + } yield (item, sizes) + )(equalTo((msg, (0, 0)))).provideLayer(actorSystem) + }, + testM("send one message to each group") { + val group1 = "group1" + val group2 = "group2" + assertM( + for { + pubSub <- PubSub.createPubSub[String] + queue1 <- pubSub.listen(topic, Some(group1)) + queue2 <- pubSub.listen(topic, Some(group2)) + _ <- pubSub.publish(topic, msg, sendOneMessageToEachGroup = true) + item1 <- queue1.take + item2 <- queue2.take + } yield List(item1, item2) + )(equalTo(List(msg, msg))).provideLayer(actorSystem) + } + ) + + override def aspects: List[TestAspect[Nothing, TestEnvironment, Nothing, Any]] = + List(TestAspect.executionStrategy(ExecutionStrategy.Sequential)) } diff --git a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala index b89a338..495a48f 100644 --- a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala +++ b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala @@ -3,107 +3,14 @@ package zio.akka.cluster.sharding import scala.language.postfixOps import akka.actor.ActorSystem import com.typesafe.config.{ Config, ConfigFactory } -import zio.akka.cluster.sharding.ShardingSpecUtil._ import zio.clock.Clock import zio.duration._ import zio.test.Assertion._ import zio.test._ -import zio.{ Promise, ZIO } +import zio.test.environment.TestEnvironment +import zio.{ Has, Managed, Promise, Task, ZIO, ZLayer } -object ShardingSpec - extends DefaultRunnableSpec( - suite("ShardingSpec")( - testM("send and receive a single message") { - assertM( - actorSystem.use(sys => - for { - p <- Promise.make[Nothing, String] - onMessage = (msg: String) => p.succeed(msg).unit - sharding <- Sharding.start(shardName, onMessage).provide(sys) - _ <- sharding.send(shardId, msg) - res <- p.await - } yield res - ), - equalTo(msg) - ) - }, - testM("gather state") { - assertM( - actorSystem.use { - sys => - for { - p <- Promise.make[Nothing, Boolean] - onMessage = (_: String) => - for { - state <- ZIO.access[Entity[Int]](_.state) - newState <- state.update { - case None => Some(1) - case Some(x) => Some(x + 1) - } - _ <- ZIO.when(newState.contains(3))(p.succeed(true)) // complete the promise after the 3rd message - } yield () - sharding <- Sharding.start(shardName, onMessage).provide(sys) - _ <- sharding.send(shardId, msg) - _ <- sharding.send(shardId, msg) - earlyPoll <- p.poll - _ <- sharding.send(shardId, msg) - res <- p.await - } yield (earlyPoll, res) - }, - equalTo((None, true)) - ) - }, - testM("kill itself") { - assertM( - actorSystem.use { - sys => - for { - p <- Promise.make[Nothing, Option[Unit]] - onMessage = (msg: String) => - msg match { - case "set" => ZIO.accessM[Entity[Unit]](_.state.set(Some(()))) - case "get" => ZIO.accessM[Entity[Unit]](_.state.get.flatMap(s => p.succeed(s).unit)) - case "die" => ZIO.accessM[Entity[Unit]](_.stop) - } - sharding <- Sharding.start(shardName, onMessage).provide(sys) - _ <- sharding.send(shardId, "set") - _ <- sharding.send(shardId, "die") - _ <- ZIO - .sleep(3 seconds) - .provide(Clock.Live) // give time to the ShardCoordinator to notice the death of the actor and recreate one - _ <- sharding.send(shardId, "get") - res <- p.await - } yield res - }, - isNone - ) - }, - testM("work with 2 actor systems") { - assertM( - (actorSystem zip actorSystem2).use { - case (sys1, sys2) => - for { - p1 <- Promise.make[Nothing, Unit] - p2 <- Promise.make[Nothing, Unit] - onMessage1 = (_: String) => p1.succeed(()).unit - onMessage2 = (_: String) => p2.succeed(()).unit - sharding1 <- Sharding.start(shardName, onMessage1).provide(sys1) - _ <- Sharding.start(shardName, onMessage2).provide(sys2) - _ <- sharding1.send("1", "hi") - _ <- sharding1.send("2", "hi") - _ <- p1.await - _ <- p2.await - } yield () - }, - isUnit - ) - } - ), - List(TestAspect.executionStrategy(ExecutionStrategy.Sequential)) - ) - -object ShardingSpecUtil { - import zio._ +object ShardingSpec extends DefaultRunnableSpec { val config: Config = ConfigFactory.parseString(s""" |akka { @@ -123,8 +30,10 @@ object ShardingSpecUtil { |} """.stripMargin) - val actorSystem: Managed[Throwable, ActorSystem] = - Managed.make(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] = + ZLayer.fromManaged( + Managed.make(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + ) val config2: Config = ConfigFactory.parseString(s""" |akka { @@ -144,10 +53,95 @@ object ShardingSpecUtil { |} """.stripMargin) - val actorSystem2: Managed[Throwable, ActorSystem] = - Managed.make(Task(ActorSystem("Test", config2)))(sys => Task.fromFuture(_ => sys.terminate()).either) + val actorSystem2: ZLayer[Any, Throwable, Has[ActorSystem]] = + ZLayer.fromManaged( + Managed.make(Task(ActorSystem("Test", config2)))(sys => Task.fromFuture(_ => sys.terminate()).either) + ) val shardId = "shard" val shardName = "name" val msg = "yo" + + def spec: ZSpec[TestEnvironment, Any] = + suite("ShardingSpec")( + testM("send and receive a single message") { + assertM( + for { + p <- Promise.make[Nothing, String] + onMessage = (msg: String) => p.succeed(msg).unit + sharding <- Sharding.start(shardName, onMessage) + _ <- sharding.send(shardId, msg) + res <- p.await + } yield res + )(equalTo(msg)).provideLayer(actorSystem) + }, + testM("gather state") { + assertM( + for { + p <- Promise.make[Nothing, Boolean] + onMessage = (_: String) => + for { + state <- ZIO.access[Entity[Int]](_.state) + newState <- state.updateAndGet { + case None => Some(1) + case Some(x) => Some(x + 1) + } + _ <- ZIO.when(newState.contains(3))(p.succeed(true)) // complete the promise after the 3rd message + } yield () + sharding <- Sharding.start(shardName, onMessage) + _ <- sharding.send(shardId, msg) + _ <- sharding.send(shardId, msg) + earlyPoll <- p.poll + _ <- sharding.send(shardId, msg) + res <- p.await + } yield (earlyPoll, res) + )(equalTo((None, true))).provideLayer(actorSystem) + }, + testM("kill itself") { + assertM( + for { + p <- Promise.make[Nothing, Option[Unit]] + onMessage = (msg: String) => + msg match { + case "set" => ZIO.accessM[Entity[Unit]](_.state.set(Some(()))) + case "get" => ZIO.accessM[Entity[Unit]](_.state.get.flatMap(s => p.succeed(s).unit)) + case "die" => ZIO.accessM[Entity[Unit]](_.stop) + } + sharding <- Sharding.start(shardName, onMessage) + _ <- sharding.send(shardId, "set") + _ <- sharding.send(shardId, "die") + _ <- ZIO + .sleep(3 seconds) + .provideLayer( + Clock.live + ) // give time to the ShardCoordinator to notice the death of the actor and recreate one + _ <- sharding.send(shardId, "get") + res <- p.await + } yield res + )(isNone).provideLayer(actorSystem) + }, + testM("work with 2 actor systems") { + assertM( + actorSystem.build.use(a1 => + actorSystem2.build.use(a2 => + for { + p1 <- Promise.make[Nothing, Unit] + p2 <- Promise.make[Nothing, Unit] + onMessage1 = (_: String) => p1.succeed(()).unit + onMessage2 = (_: String) => p2.succeed(()).unit + sharding1 <- Sharding.start(shardName, onMessage1).provideLayer(ZLayer.succeedMany(a1)) + _ <- Sharding.start(shardName, onMessage2).provideLayer(ZLayer.succeedMany(a2)) + _ <- sharding1.send("1", "hi") + _ <- sharding1.send("2", "hi") + _ <- p1.await + _ <- p2.await + } yield () + ) + ) + )(isUnit) + } + ) + + override def aspects: List[TestAspect[Nothing, TestEnvironment, Nothing, Any]] = + List(TestAspect.executionStrategy(ExecutionStrategy.Sequential)) }