Skip to content

Commit

Permalink
Update zio, zio-streams, zio-test, ... to 1.0.0-RC18 (#53)
Browse files Browse the repository at this point in the history
* Update zio, zio-streams, zio-test, ... to 1.0.0-RC18

* Update code for ZIO RC18

Co-authored-by: Pierre Ricadat <[email protected]>
  • Loading branch information
scala-steward and ghostdogpr authored Mar 4, 2020
1 parent 0c00970 commit 4e09c7b
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 285 deletions.
65 changes: 35 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ See [Akka Documentation](https://doc.akka.io/docs/akka/current/cluster-usage.htm
You can also manually join a cluster using `Cluster.join`.

```scala
def join(seedNodes: List[Address]): ZIO[ActorSystem, Throwable, Unit]
def join(seedNodes: List[Address]): ZIO[Has[ActorSystem], Throwable, Unit]
```

It's possible to get the status of the cluster by calling `Cluster.clusterState`

```scala
val clusterState: ZIO[ActorSystem, Throwable, CurrentClusterState]
val clusterState: ZIO[Has[ActorSystem], Throwable, CurrentClusterState]
```

To monitor the cluster and be informed of changes (e.g. new members, member unreachable, etc), use `Cluster.clusterEvents`.
Expand All @@ -50,13 +50,13 @@ To unsubscribe, simply `shutdown` the queue.
`initialStateAsEvents` indicates if you want to receive previous cluster events leading to the current state, or only future events.

```scala
def clusterEvents(initialStateAsEvents: Boolean = false): ZIO[ActorSystem, Throwable, Queue[ClusterDomainEvent]]
def clusterEvents(initialStateAsEvents: Boolean = false): ZIO[Has[ActorSystem], Throwable, Queue[ClusterDomainEvent]]
```

Finally, you can leave the current cluster using `Cluster.leave`.

```scala
val leave: ZIO[ActorSystem, Throwable, Unit]
val leave: ZIO[Has[ActorSystem], Throwable, Unit]
```

### Akka PubSub
Expand All @@ -71,7 +71,7 @@ See [Akka Documentation](https://doc.akka.io/docs/akka/current/distributed-pub-s
To create a `PubSub` object which can both publish and subscribe, use `PubSub.createPubSub`.

```scala
def createPubSub[A]: ZIO[ActorSystem, Throwable, PubSub[A]]
def createPubSub[A]: ZIO[Has[ActorSystem], Throwable, PubSub[A]]
```

There are also less powerful variants `PubSub.createPublisher` if you only need to publish and `PubSub.createSubscriber` if you only need to subscribe.
Expand Down Expand Up @@ -107,16 +107,18 @@ This library wraps messages inside of a `zio.akka.cluster.pubsub.MessageEnvelope

```scala
import akka.actor.ActorSystem
import zio.Task
import zio.{ Has, Managed, Task, ZLayer }
import zio.akka.cluster.pubsub.PubSub

for {
actorSystem <- Task(ActorSystem("Test"))
pubSub <- PubSub.createPubSub[String].provide(actorSystem)
queue <- pubSub.listen("my-topic")
_ <- pubSub.publish("my-topic", "yo")
firstMsg <- queue.take
} yield firstMsg
val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] =
ZLayer.fromManaged(Managed.make(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either))

(for {
pubSub <- PubSub.createPubSub[String]
queue <- pubSub.listen("my-topic")
_ <- pubSub.publish("my-topic", "yo")
firstMsg <- queue.take
} yield firstMsg).provideLayer(actorSystem)
```

### Akka Cluster Sharding
Expand All @@ -137,7 +139,7 @@ def start[Msg, State](
name: String,
onMessage: Msg => ZIO[Entity[State], Nothing, Unit],
numberOfShards: Int = 100
): ZIO[ActorSystem, Throwable, Sharding[Msg]]
): ZIO[Has[ActorSystem], Throwable, Sharding[Msg]]
```

It requires:
Expand Down Expand Up @@ -167,20 +169,23 @@ This library wraps messages inside of a `zio.akka.cluster.sharding.MessageEnvelo

```scala
import akka.actor.ActorSystem
import zio.{Task, ZIO}
import zio.akka.cluster.sharding.{Entity, Sharding}

for {
actorSystem <- Task(ActorSystem("Test"))
behavior = (msg: String) => msg match {
case "+" => ZIO.accessM[Entity[Int]](_.state.update(x => Some(x.getOrElse(0) + 1))).unit
case "-" => ZIO.accessM[Entity[Int]](_.state.update(x => Some(x.getOrElse(0) - 1))).unit
case _ => ZIO.unit
}
sharding <- Sharding.start("session", behavior).provide(actorSystem)
entityId = "1"
_ <- sharding.send(entityId, "+")
_ <- sharding.send(entityId, "+")
_ <- sharding.send(entityId, "-")
} yield ()
import zio.akka.cluster.sharding.{ Entity, Sharding }
import zio.{ Has, Managed, Task, ZIO, ZLayer }

val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] =
ZLayer.fromManaged(Managed.make(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either))

val behavior: String => ZIO[Entity[Int], Nothing, Unit] = {
case "+" => ZIO.accessM[Entity[Int]](_.state.update(x => Some(x.getOrElse(0) + 1)))
case "-" => ZIO.accessM[Entity[Int]](_.state.update(x => Some(x.getOrElse(0) - 1)))
case _ => ZIO.unit
}

(for {
sharding <- Sharding.start("session", behavior)
entityId = "1"
_ <- sharding.send(entityId, "+")
_ <- sharding.send(entityId, "+")
_ <- sharding.send(entityId, "-")
} yield ()).provideLayer(actorSystem)
```
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ developers := List(
)

libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "1.0.0-RC17",
"dev.zio" %% "zio-streams" % "1.0.0-RC17",
"dev.zio" %% "zio" % "1.0.0-RC18",
"dev.zio" %% "zio-streams" % "1.0.0-RC18",
"com.typesafe.akka" %% "akka-cluster-tools" % "2.5.29",
"com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.29",
"dev.zio" %% "zio-test" % "1.0.0-RC17" % "test",
"dev.zio" %% "zio-test-sbt" % "1.0.0-RC17" % "test",
"dev.zio" %% "zio-test" % "1.0.0-RC18" % "test",
"dev.zio" %% "zio-test-sbt" % "1.0.0-RC18" % "test",
compilerPlugin("org.typelevel" %% "kind-projector" % "0.10.3"),
compilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
)
Expand Down
28 changes: 12 additions & 16 deletions src/main/scala/zio/akka/cluster/Cluster.scala
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
package zio.akka.cluster

import akka.actor.{ Actor, ActorSystem, Address, PoisonPill, Props }
import akka.cluster.ClusterEvent.{
ClusterDomainEvent,
CurrentClusterState,
InitialStateAsEvents,
InitialStateAsSnapshot,
SubscriptionInitialStateMode
}
import akka.cluster.ClusterEvent._
import zio.Exit.{ Failure, Success }
import zio.{ Queue, Runtime, Task, ZIO }
import zio.{ Has, Queue, Runtime, Task, ZIO }

object Cluster {

private val cluster: ZIO[ActorSystem, Throwable, akka.cluster.Cluster] =
private val cluster: ZIO[Has[ActorSystem], Throwable, akka.cluster.Cluster] =
for {
actorSystem <- ZIO.environment[ActorSystem]
actorSystem <- ZIO.access[Has[ActorSystem]](_.get)
cluster <- Task(akka.cluster.Cluster(actorSystem))
} yield cluster

/**
* Returns the current state of the cluster.
*/
val clusterState: ZIO[ActorSystem, Throwable, CurrentClusterState] =
val clusterState: ZIO[Has[ActorSystem], Throwable, CurrentClusterState] =
for {
cluster <- cluster
state <- Task(cluster.state)
Expand All @@ -31,7 +25,7 @@ object Cluster {
/**
* Joins a cluster using the provided seed nodes.
*/
def join(seedNodes: List[Address]): ZIO[ActorSystem, Throwable, Unit] =
def join(seedNodes: List[Address]): ZIO[Has[ActorSystem], Throwable, Unit] =
for {
cluster <- cluster
_ <- Task(cluster.joinSeedNodes(seedNodes))
Expand All @@ -40,7 +34,7 @@ object Cluster {
/**
* Leaves the current cluster.
*/
val leave: ZIO[ActorSystem, Throwable, Unit] =
val leave: ZIO[Has[ActorSystem], Throwable, Unit] =
for {
cluster <- cluster
_ <- Task(cluster.leave(cluster.selfAddress))
Expand All @@ -52,7 +46,9 @@ object Cluster {
* To unsubscribe, use `queue.shutdown`.
* To use a bounded queue, see `clusterEventsWith`.
*/
def clusterEvents(initialStateAsEvents: Boolean = false): ZIO[ActorSystem, Throwable, Queue[ClusterDomainEvent]] =
def clusterEvents(
initialStateAsEvents: Boolean = false
): ZIO[Has[ActorSystem], Throwable, Queue[ClusterDomainEvent]] =
Queue.unbounded[ClusterDomainEvent].tap(clusterEventsWith(_, initialStateAsEvents))

/**
Expand All @@ -63,10 +59,10 @@ object Cluster {
def clusterEventsWith(
queue: Queue[ClusterDomainEvent],
initialStateAsEvents: Boolean = false
): ZIO[ActorSystem, Throwable, Unit] =
): ZIO[Has[ActorSystem], Throwable, Unit] =
for {
rts <- Task.runtime
actorSystem <- ZIO.environment[ActorSystem]
actorSystem <- ZIO.access[Has[ActorSystem]](_.get)
_ <- Task(actorSystem.actorOf(Props(new SubscriberActor(rts, queue, initialStateAsEvents))))
} yield ()

Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/zio/akka/cluster/pubsub/PubSub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zio.akka.cluster.pubsub
import akka.actor.{ ActorRef, ActorSystem }
import akka.cluster.pubsub.DistributedPubSub
import zio.akka.cluster.pubsub.impl.{ PublisherImpl, SubscriberImpl }
import zio.{ Queue, Task, ZIO }
import zio.{ Has, Queue, Task, ZIO }

/**
* A `Publisher[A]` is able to send messages of type `A` through Akka PubSub.
Expand Down Expand Up @@ -35,9 +35,9 @@ object PubSub {
/**
* Creates a new `Publisher[A]`.
*/
def createPublisher[A]: ZIO[ActorSystem, Throwable, Publisher[A]] =
def createPublisher[A]: ZIO[Has[ActorSystem], Throwable, Publisher[A]] =
for {
actorSystem <- ZIO.environment[ActorSystem]
actorSystem <- ZIO.access[Has[ActorSystem]](_.get)
mediator <- getMediator(actorSystem)
} yield new Publisher[A] with PublisherImpl[A] {
override val getMediator: ActorRef = mediator
Expand All @@ -46,9 +46,9 @@ object PubSub {
/**
* Creates a new `Subscriber[A]`.
*/
def createSubscriber[A]: ZIO[ActorSystem, Throwable, Subscriber[A]] =
def createSubscriber[A]: ZIO[Has[ActorSystem], Throwable, Subscriber[A]] =
for {
actorSystem <- ZIO.environment[ActorSystem]
actorSystem <- ZIO.access[Has[ActorSystem]](_.get)
mediator <- getMediator(actorSystem)
} yield new Subscriber[A] with SubscriberImpl[A] {
override val getActorSystem: ActorSystem = actorSystem
Expand All @@ -58,9 +58,9 @@ object PubSub {
/**
* Creates a new `PubSub[A]`.
*/
def createPubSub[A]: ZIO[ActorSystem, Throwable, PubSub[A]] =
def createPubSub[A]: ZIO[Has[ActorSystem], Throwable, PubSub[A]] =
for {
actorSystem <- ZIO.environment[ActorSystem]
actorSystem <- ZIO.access[Has[ActorSystem]](_.get)
mediator <- getMediator(actorSystem)
} yield new PubSub[A] with PublisherImpl[A] with SubscriberImpl[A] {
override val getActorSystem: ActorSystem = actorSystem
Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/zio/akka/cluster/sharding/Sharding.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.{ Actor, ActorContext, ActorRef, ActorSystem, PoisonPill, Prop
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings }
import zio.akka.cluster.sharding
import zio.akka.cluster.sharding.MessageEnvelope.{ MessagePayload, PoisonPillPayload }
import zio.{ Ref, Runtime, Task, UIO, ZIO }
import zio.{ Has, Ref, Runtime, Task, UIO, ZIO }

/**
* A `Sharding[M]` is able to send messages of type `M` to a sharded entity or to stop one.
Expand All @@ -31,10 +31,10 @@ object Sharding {
name: String,
onMessage: Msg => ZIO[Entity[State], Nothing, Unit],
numberOfShards: Int = 100
): ZIO[ActorSystem, Throwable, Sharding[Msg]] =
): ZIO[Has[ActorSystem], Throwable, Sharding[Msg]] =
for {
rts <- ZIO.runtime[ActorSystem]
actorSystem = rts.environment
rts <- ZIO.runtime[Has[ActorSystem]]
actorSystem = rts.environment.get
shardingRegion <- Task(
ClusterSharding(actorSystem).start(
typeName = name,
Expand Down Expand Up @@ -68,10 +68,10 @@ object Sharding {
name: String,
role: Option[String],
numberOfShards: Int = 100
): ZIO[ActorSystem, Throwable, Sharding[Msg]] =
): ZIO[Has[ActorSystem], Throwable, Sharding[Msg]] =
for {
rts <- ZIO.runtime[ActorSystem]
actorSystem = rts.environment
rts <- ZIO.runtime[Has[ActorSystem]]
actorSystem = rts.environment.get
shardingRegion <- Task(
ClusterSharding(actorSystem).startProxy(
typeName = name,
Expand Down
69 changes: 34 additions & 35 deletions src/test/scala/zio/akka/cluster/ClusterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,43 @@ package zio.akka.cluster
import akka.actor.ActorSystem
import akka.cluster.ClusterEvent.MemberLeft
import com.typesafe.config.{ Config, ConfigFactory }
import zio.{ Managed, Task }
import zio.test._
import zio.test.Assertion._
import zio.test._
import zio.test.environment.TestEnvironment
import zio.{ Managed, Task, ZLayer }

object ClusterSpec extends DefaultRunnableSpec {

object ClusterSpec
extends DefaultRunnableSpec(
suite("ClusterSpec")(
testM("receive cluster events") {
val config: Config = ConfigFactory.parseString(s"""
|akka {
| actor {
| provider = "cluster"
| }
| remote {
| netty.tcp {
| hostname = "127.0.0.1"
| port = 2551
| }
| }
| cluster {
| seed-nodes = ["akka.tcp://[email protected]:2551"]
| }
|}
def spec: ZSpec[TestEnvironment, Any] =
suite("ClusterSpec")(
testM("receive cluster events") {
val config: Config = ConfigFactory.parseString(s"""
|akka {
| actor {
| provider = "cluster"
| }
| remote {
| netty.tcp {
| hostname = "127.0.0.1"
| port = 2551
| }
| }
| cluster {
| seed-nodes = ["akka.tcp://[email protected]:2551"]
| }
|}
""".stripMargin)

val actorSystem: Managed[Throwable, ActorSystem] =
Managed.make(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either)
val actorSystem: Managed[Throwable, ActorSystem] =
Managed.make(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either)

assertM(
actorSystem.use(actorSystem =>
(for {
queue <- Cluster.clusterEvents()
_ <- Cluster.leave
item <- queue.take
} yield item).provide(actorSystem)
),
isSubtype[MemberLeft](anything)
)
}
)
assertM(
for {
queue <- Cluster.clusterEvents()
_ <- Cluster.leave
item <- queue.take
} yield item
)(isSubtype[MemberLeft](anything)).provideLayer(ZLayer.fromManaged(actorSystem))
}
)
}
Loading

0 comments on commit 4e09c7b

Please sign in to comment.