Skip to content

Commit

Permalink
Implement support for the ask pattern (#64)
Browse files Browse the repository at this point in the history
* Implement support for the ask pattern

Co-authored-by: name <[email protected]>

* Update documentation and Scaladoc

Co-authored-by: FrancisToth <[email protected]>

Co-authored-by: name <[email protected]>
  • Loading branch information
calvinlfer and FrancisToth authored Apr 14, 2020
1 parent b94246a commit a6ff755
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 14 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,17 @@ It requires:

You can also use `Sharding.startProxy` if you need to send messages to entities located on `other` nodes.

To send a message to a sharded entity, use `send`. To stop one, use `stop`.
To send a message to a sharded entity without expecting a response, use `send`. To send a message to a sharded entity expecting a response, use `ask`. To stop one, use `stop`.
The `entityId` identifies the entity to target. Messages sent to the same `entityId` from different nodes in the cluster will be handled by the same actor.

```scala
def send(entityId: String, data: M): Task[Unit]
def ask[R](entityId: String, data: M): Task[R]
def stop(entityId: String): Task[Unit]
```

**Note on Serialization**
Akka messages are serialized when they are sent across the network. By default, Java serialization is used but it is not recommended to use it in production.
Akka messages are serialized when they are sent across the network. By default, Java serialization is used, but it is not recommended in production.
See [Akka Documentation](https://doc.akka.io/docs/akka/current/serialization.html) to see how to provide your own serializer.
This library wraps messages inside of a `zio.akka.cluster.sharding.MessageEnvelope` case class, so your serializer needs to cover it as well.

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/zio/akka/cluster/sharding/Entity.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package zio.akka.cluster.sharding

import zio.{ Ref, UIO }
import zio.{ Ref, Task, UIO }

trait Entity[State] {
def replyToSender[R](msg: R): Task[Unit]
def id: String
def state: Ref[Option[State]]
def stop: UIO[Unit]
Expand Down
42 changes: 31 additions & 11 deletions src/main/scala/zio/akka/cluster/sharding/Sharding.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package zio.akka.cluster.sharding

import akka.actor.{ Actor, ActorContext, ActorRef, ActorSystem, PoisonPill, Props }
import akka.pattern.{ ask => askPattern }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings }
import akka.util.Timeout
import zio.akka.cluster.sharding
import zio.akka.cluster.sharding.MessageEnvelope.{ MessagePayload, PoisonPillPayload }
import zio.{ Has, Ref, Runtime, Task, UIO, ZIO }
import zio.{ =!=, Has, Ref, Runtime, Task, UIO, ZIO }

import scala.concurrent.duration._
import scala.reflect.ClassTag

/**
* A `Sharding[M]` is able to send messages of type `M` to a sharded entity or to stop one.
Expand All @@ -15,6 +20,8 @@ trait Sharding[M] {

def stop(entityId: String): Task[Unit]

def ask[R](entityId: String, data: M)(implicit tag: ClassTag[R], proof: R =!= Nothing): Task[R]

}

object Sharding {
Expand All @@ -25,12 +32,14 @@ object Sharding {
* @param name the name of the entity type
* @param onMessage the behavior of the entity when it receives a message
* @param numberOfShards a fixed number of shards
* @param askTimeout a finite duration specifying how long an ask is allowed to wait for an entity to respond
* @return a [[Sharding]] object that can be used to send messages to sharded entities
*/
def start[Msg, State](
name: String,
onMessage: Msg => ZIO[Entity[State], Nothing, Unit],
numberOfShards: Int = 100
numberOfShards: Int = 100,
askTimeout: FiniteDuration = 10.seconds
): ZIO[Has[ActorSystem], Throwable, Sharding[Msg]] =
for {
rts <- ZIO.runtime[Has[ActorSystem]]
Expand All @@ -54,20 +63,23 @@ object Sharding {
)
} yield new ShardingImpl[Msg] {
override val getShardingRegion: ActorRef = shardingRegion
override implicit val timeout: Timeout = Timeout(askTimeout)
}

/**
* Starts cluster sharding in proxy mode for a given entity type.
* Starts cluster sharding in proxy mode for a given entity type.
*
* @param name the name of the entity type
* @param role an optional role to specify that this entity type is located on cluster nodes with a specific role
* @param name the name of the entity type
* @param role an optional role to specify that this entity type is located on cluster nodes with a specific role
* @param numberOfShards a fixed number of shards
* @param askTimeout a finite duration specifying how long an ask is allowed to wait for an entity to respond
* @return a [[Sharding]] object that can be used to send messages to sharded entities on other nodes
*/
def startProxy[Msg](
name: String,
role: Option[String],
numberOfShards: Int = 100
numberOfShards: Int = 100,
askTimeout: FiniteDuration = 10.seconds
): ZIO[Has[ActorSystem], Throwable, Sharding[Msg]] =
for {
rts <- ZIO.runtime[Has[ActorSystem]]
Expand All @@ -89,18 +101,25 @@ object Sharding {
)
)
} yield new ShardingImpl[Msg] {
override val timeout: Timeout = Timeout(askTimeout)
override val getShardingRegion: ActorRef = shardingRegion
}

private[sharding] trait ShardingImpl[Msg] extends Sharding[Msg] {

implicit val timeout: Timeout
val getShardingRegion: ActorRef

override def send(entityId: String, data: Msg): Task[Unit] =
Task(getShardingRegion ! sharding.MessageEnvelope(entityId, MessagePayload(data)))

override def stop(entityId: String): Task[Unit] =
Task(getShardingRegion ! sharding.MessageEnvelope(entityId, PoisonPillPayload))

override def ask[R](entityId: String, data: Msg)(implicit tag: ClassTag[R], proof: R =!= Nothing): Task[R] =
Task.fromFuture(_ =>
(getShardingRegion ? sharding.MessageEnvelope(entityId, MessagePayload(data)))
.mapTo[R]
)
}

private[sharding] class ShardEntity[Msg, State](rts: Runtime[Any])(
Expand All @@ -110,12 +129,13 @@ object Sharding {
val ref: Ref[Option[State]] = rts.unsafeRun(Ref.make[Option[State]](None))
val actorContext: ActorContext = context
val entity: Entity[State] = new Entity[State] {
override def id: String = context.self.path.name
override def state: Ref[Option[State]] = ref
override def stop: UIO[Unit] = UIO(actorContext.stop(self))
override def id: String = context.self.path.name
override def state: Ref[Option[State]] = ref
override def stop: UIO[Unit] = UIO(actorContext.stop(self))
override def replyToSender[R](msg: R): Task[Unit] = Task(context.sender() ! msg)
}

def receive: PartialFunction[Any, Unit] = {
def receive: Receive = {
case MessagePayload(msg) =>
rts.unsafeRunSync(onMessage(msg.asInstanceOf[Msg]).provide(entity))
()
Expand Down
10 changes: 10 additions & 0 deletions src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ object ShardingSpec extends DefaultRunnableSpec {
} yield res
)(equalTo(msg)).provideLayer(actorSystem)
},
testM("send and receive a message using ask") {
val onMessage: String => ZIO[Entity[Any], Nothing, Unit] =
incomingMsg => ZIO.accessM[Entity[Any]](r => r.replyToSender(incomingMsg).orDie)
assertM(
for {
sharding <- Sharding.start(shardName, onMessage)
reply <- sharding.ask[String](shardId, msg)
} yield reply
)(equalTo(msg)).provideLayer(actorSystem)
},
testM("gather state") {
assertM(
for {
Expand Down

0 comments on commit a6ff755

Please sign in to comment.