Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: grpc sample restaurant order replication #963

Merged
merged 6 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions samples/grpc/local-drone-control-scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,43 @@ located geographically close to the actual drones for short latencies and resili
Drones interact with the closest control service in the following ways:

* Report their precise location, at a high frequency
* FIXME Ask for the next delivery to perform
* Ask for the next delivery to perform
* Mark a delivery as completed

The control service interacts with the global cloud service, represented by the separate
restaurant-drone-deliveries-service sample, in the following ways:

* Replicates a coarse grained location of each drone to the cloud, at a lower frequency,
only when they change location at a coarse grained grid
* FIXME get restaurant to home delivery orders in the geographical area of the local drone control
* get restaurant to home delivery orders in the geographical area of the local drone control

## Running the sample

Start one instance with:

```
```shell
sbt run
```

Posting updated location for a drone:

```shell
grpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.07125, "latitude": 59.31834}, "altitude": 5}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation
```

Request assignment of a delivery (it needs to have reported location at least once first)

```shell
grpcurl -d '{"drone_id":"drone1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.RequestNextDelivery
```
grpcurl -d '{"drone_id":"drone1", "longitude": 18.07125, "latitude": 59.31834}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation

Mark the delivery as completed
```shell
grpcurl -d '{"delivery_id":"order1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.CompleteDelivery
```

Inspect the current state of the local delivery queue

```shell
grpcurl -plaintext 127.0.0.1:8080 local.drones.DeliveriesQueueService.GetCurrentQueue
```
1 change: 1 addition & 0 deletions samples/grpc/local-drone-control-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ licenses := Seq(
scalaVersion := "2.13.11"

Compile / scalacOptions ++= Seq(
"-release:11",
"-deprecation",
"-feature",
"-unchecked",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "central.deliveries.proto";

package central.deliveries;

// copy of the descriptor from restaurant-drone-deliveries who owns/publishes the events

import "common/coordinates.proto";

message DeliveryRegistered {
string delivery_id = 1;
common.Coordinates origin = 2;
common.Coordinates destination = 3;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "common.proto";

package common;

// generic messages, shared between local-drone-control and restaurant-drone-deliveries
johanandren marked this conversation as resolved.
Show resolved Hide resolved

message Coordinates {
// latitude (north-south) in decimal degree coordinates
double latitude = 1;
// longitude (east west) in decimal degree coordinates
double longitude = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "local.drones.proto";

import "google/protobuf/empty.proto";
import "common/coordinates.proto";

package local.drones;

// gRPC definition for DroneService, for drones to interact with

service DeliveriesQueueService {
rpc GetCurrentQueue (google.protobuf.Empty) returns (GetCurrentQueueResponse) {}
}

message GetCurrentQueueResponse {
repeated WaitingDelivery waitingDeliveries = 1;
repeated DeliveryInProgress deliveriesInProgress = 2;
}

message WaitingDelivery {
string delivery_id = 1;
common.Coordinates from = 2;
common.Coordinates to = 3;
}

message DeliveryInProgress {
string delivery_id = 1;
string drone_id = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,38 @@ option java_multiple_files = true;
option java_package = "local.drones.proto";

import "google/protobuf/empty.proto";
import "common/coordinates.proto";

package local.drones;

// gRPC definition for DroneService
// gRPC definition for DroneService, for drones to interact with

service DroneService {
rpc ReportLocation (ReportLocationRequest) returns (google.protobuf.Empty) {}

// deliveries
rpc RequestNextDelivery (RequestNextDeliveryRequest) returns (RequestNextDeliveryResponse) {}
rpc CompleteDelivery (CompleteDeliveryRequest) returns (google.protobuf.Empty) {}
}


message ReportLocationRequest {
string drone_id = 1;
// longitude in decimal degree coordinates
double longitude = 2;
// latitude in decimal degree coordinates
double latitude = 3;
common.Coordinates coordinates = 2;
// altitude in meters
double altitude = 4;
}

message RequestNextDeliveryRequest {
string drone_id = 1;
}

message RequestNextDeliveryResponse {
string delivery_id = 1;
common.Coordinates from = 2;
common.Coordinates to = 3;
}

message CompleteDeliveryRequest {
string delivery_id = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ option java_package = "local.drones.proto";

package local.drones;

// events published by the drones
import "common/coordinates.proto";

// events published by the drone entity, for consumption/push to the cloud

message CoarseDroneLocation {
string drone_id = 1;
double latitude = 2;
double longitude = 3;
// coordinates but truncated to a coarse grained location
common.Coordinates coordinates = 1;
}

Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ akka {
}

local-drone-control {
# unique identifier for the instance of local control, for example geographic location of the
# node, or ip
service-id = "kungsholmen"
service-id = ${?SERVICE_ID}
drone-service {
ask-timeout = 3s
}
# unique identifier for the instance of local control, must be known up front by the cloud service
location-id = "sweden/stockholm/kungsholmen"
location-id = ${?LOCATION_ID}

ask-timeout = 3s
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,14 @@ akka.grpc.client.central-drone-control = {
host = "127.0.0.1"
port = 8101
use-tls = false
}

akka.projection.grpc.consumer {
client {
# same as for producer above, so re-use config from there
host = ${akka.grpc.client.central-drone-control.host}
port = ${akka.grpc.client.central-drone-control.port}
use-tls = ${akka.grpc.client.central-drone-control.use-tls}
}
stream-id = "delivery-events"
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ akka {
snapshot-store {
plugin = "akka.persistence.r2dbc.snapshot"
}
state {
plugin = "akka.persistence.r2dbc.state"
}

r2dbc {
# Single projection instance, no need for many topics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,37 @@ package local.drones
/**
* Decimal degree coordinates
*/
final case class Coordinates(latitude: Double, longitude: Double)
final case class Coordinates(latitude: Double, longitude: Double) {

import Coordinates._

def distanceTo(other: Coordinates): Long = {
// using the haversine formula https://en.wikipedia.org/wiki/Versine#hav
val latitudeDistance = Math.toRadians(latitude - other.latitude)
val longitudeDistance = Math.toRadians(longitude - other.longitude)
val sinLatitude = Math.sin(latitudeDistance / 2)
val sinLongitude = Math.sin(longitudeDistance / 2)
val a = sinLatitude * sinLatitude +
(Math.cos(Math.toRadians(latitude)) *
Math.cos(Math.toRadians(other.latitude)) *
sinLongitude * sinLongitude)
val c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a))
(roughlyRadiusOfEarthInM * c).toLong
}

def toProto: common.proto.Coordinates =
common.proto.Coordinates(latitude, longitude)

}

object Coordinates {

private val roughlyRadiusOfEarthInM = 6371000

def fromProto(pc: common.proto.Coordinates): Coordinates =
Coordinates(pc.latitude, pc.longitude)

}

final case class Position(coordinates: Coordinates, altitudeMeters: Double)

Expand All @@ -19,4 +49,7 @@ object CoarseGrainedCoordinates {

}

final case class CoarseGrainedCoordinates(latitude: Double, longitude: Double)
final case class CoarseGrainedCoordinates(latitude: Double, longitude: Double) {
def toProto: common.proto.Coordinates =
common.proto.Coordinates(latitude, longitude)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package local.drones

import akka.Done
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.actor.typed.{ ActorRef, Behavior }
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.pattern.StatusReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.state.scaladsl.{ DurableStateBehavior, Effect }

import java.time.Instant

object DeliveriesQueue {

sealed trait Command extends CborSerializable

final case class AddDelivery(
waitingDelivery: WaitingDelivery,
replyTo: ActorRef[Done])
extends Command

final case class RequestDelivery(
droneId: String,
droneCoordinates: Coordinates,
replyTo: ActorRef[StatusReply[WaitingDelivery]])
extends Command

final case class CompleteDelivery(
deliveryId: String,
replyTo: ActorRef[StatusReply[Done]])
extends Command

final case class GetCurrentState(replyTo: ActorRef[State]) extends Command

final case class WaitingDelivery(
deliveryId: String,
from: Coordinates,
to: Coordinates)

final case class DeliveryInProgress(
deliveryId: String,
droneId: String,
pickupTime: Instant)
final case class State(
waitingDeliveries: Vector[WaitingDelivery],
deliveriesInProgress: Vector[DeliveryInProgress])
extends CborSerializable

// Not really an entity, we just have one
val EntityKey = EntityTypeKey("RestaurantDeliveries")

def apply(): Behavior[Command] = {
Behaviors.setup { context =>
DurableStateBehavior[Command, State](
PersistenceId(EntityKey.name, "DeliveriesQueue"),
State(Vector.empty, Vector.empty),
onCommand(context))
}
}

private def onCommand(context: ActorContext[Command])(
state: State,
command: Command): Effect[State] =
command match {
case AddDelivery(delivery, replyTo) =>
context.log.info("Adding delivery [{}] to queue", delivery.deliveryId)
if (state.waitingDeliveries.contains(delivery))
johanandren marked this conversation as resolved.
Show resolved Hide resolved
Effect.reply(replyTo)(Done)
else
Effect
.persist(
state.copy(waitingDeliveries =
state.waitingDeliveries :+ delivery))
.thenReply(replyTo)(_ => Done)

case RequestDelivery(droneId, droneCoordinates, replyTo) =>
if (state.waitingDeliveries.isEmpty)
Effect.reply(replyTo)(StatusReply.Error("No waiting orders"))
else {
val closestPickupForDrone = state.waitingDeliveries.minBy(delivery =>
droneCoordinates.distanceTo(delivery.from))
context.log.info(
"Selected next delivery [{}] for drone [{}]",
closestPickupForDrone.deliveryId,
droneId)
// Note: A real application would have to care more about retries/lost data here
Effect
.persist(
state.copy(
waitingDeliveries =
state.waitingDeliveries.filterNot(_ == closestPickupForDrone),
state.deliveriesInProgress :+ DeliveryInProgress(
closestPickupForDrone.deliveryId,
droneId,
Instant.now())))
.thenReply(replyTo)(_ => StatusReply.Success(closestPickupForDrone))
}

case CompleteDelivery(deliveryId, replyTo) =>
if (!state.deliveriesInProgress.exists(_.deliveryId == deliveryId)) {
Effect.reply(replyTo)(
StatusReply.Error(s"Unknown delivery id: ${deliveryId}"))
} else {
Effect
.persist(
state.copy(deliveriesInProgress =
state.deliveriesInProgress.filter(_.deliveryId == deliveryId)))
johanandren marked this conversation as resolved.
Show resolved Hide resolved
.thenReply(replyTo)(_ => StatusReply.Success(Done))
}

case GetCurrentState(replyTo) =>
Effect.reply(replyTo)(state)
}

}
Loading