From c188404357c2e89c3bce6ae2775e60976a6cf034 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 6 Sep 2023 15:27:05 +0200 Subject: [PATCH] docs: Edge guide (#983) --- .../guide/1-event-sourced-shopping-cart.md | 2 +- .../paradox/guide/2-service-to-service.md | 2 +- .../src/main/paradox/guide/3-active-active.md | 2 +- akka-edge-docs/src/main/paradox/guide.md | 23 +- .../guide/1-local-drone-control-service.md | 220 ++++++++++++ .../2-drone-location-to-cloud-service.md | 318 ++++++++++++++++++ .../guide/3-restaurant-deliveries-service.md | 153 +++++++++ .../guide/4-local-drone-delivery-selection.md | 273 +++++++++++++++ .../guide/5-deploying-delivery-service.md | 5 + ...6-deploying-local-drone-control-service.md | 5 + .../paradox/images/guide-section-1.drawio | 68 ++++ .../main/paradox/images/guide-section-1.svg | 3 + .../paradox/images/guide-section-2.drawio | 161 +++++++++ .../main/paradox/images/guide-section-2.svg | 3 + .../paradox/images/guide-section-3.drawio | 61 ++++ .../main/paradox/images/guide-section-3.svg | 3 + .../paradox/images/guide-section-4.drawio | 173 ++++++++++ .../main/paradox/images/guide-section-4.svg | 3 + build.sbt | 5 +- .../java/local/drones/DeliveriesQueue.java | 7 +- .../java/local/drones/DeliveryEvents.java | 16 +- .../src/main/java/local/drones/Drone.java | 33 +- .../java/local/drones/DroneServiceImpl.java | 3 +- .../local/drones/LocalDroneControlServer.java | 2 + .../src/main/java/local/drones/Main.java | 2 + .../src/main/resources/grpc.conf | 2 + .../src/main/resources/persistence.conf | 5 +- .../grpc/local-drone-control-scala/build.sbt | 1 - .../src/main/resources/grpc.conf | 2 + .../src/main/resources/persistence.conf | 5 +- .../scala/local/drones/DeliveriesQueue.scala | 7 + .../scala/local/drones/DeliveryEvents.scala | 14 +- .../src/main/scala/local/drones/Drone.scala | 33 ++ .../scala/local/drones/DroneServiceImpl.scala | 3 +- .../drones/LocalDroneControlServer.scala | 3 +- .../src/main/scala/local/drones/Main.scala | 5 +- .../ddl-scripts/create_tables.sql | 2 + .../java/central/DroneDeliveriesServer.java | 3 + .../deliveries/RestaurantDeliveries.java | 39 ++- .../src/main/java/central/drones/Drone.java | 8 + .../drones/DroneOverviewServiceImpl.java | 1 + .../java/central/drones/LocalDroneEvents.java | 4 + .../src/main/resources/persistence.conf | 4 +- .../README.md | 2 +- .../ddl-scripts/create_tables.sql | 2 + .../src/main/resources/persistence.conf | 4 +- .../scala/central/DroneDeliveriesServer.scala | 2 + .../central/deliveries/DeliveryEvents.scala | 3 - .../deliveries/RestaurantDeliveries.scala | 25 +- .../src/main/scala/central/drones/Drone.scala | 10 + .../central/drones/LocalDroneEvents.scala | 7 +- 51 files changed, 1679 insertions(+), 63 deletions(-) create mode 100644 akka-edge-docs/src/main/paradox/guide/1-local-drone-control-service.md create mode 100644 akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md create mode 100644 akka-edge-docs/src/main/paradox/guide/3-restaurant-deliveries-service.md create mode 100644 akka-edge-docs/src/main/paradox/guide/4-local-drone-delivery-selection.md create mode 100644 akka-edge-docs/src/main/paradox/guide/5-deploying-delivery-service.md create mode 100644 akka-edge-docs/src/main/paradox/guide/6-deploying-local-drone-control-service.md create mode 100644 akka-edge-docs/src/main/paradox/images/guide-section-1.drawio create mode 100644 akka-edge-docs/src/main/paradox/images/guide-section-1.svg create mode 100644 akka-edge-docs/src/main/paradox/images/guide-section-2.drawio create mode 100644 akka-edge-docs/src/main/paradox/images/guide-section-2.svg create mode 100644 akka-edge-docs/src/main/paradox/images/guide-section-3.drawio create mode 100644 akka-edge-docs/src/main/paradox/images/guide-section-3.svg create mode 100644 akka-edge-docs/src/main/paradox/images/guide-section-4.drawio create mode 100644 akka-edge-docs/src/main/paradox/images/guide-section-4.svg diff --git a/akka-distributed-cluster-docs/src/main/paradox/guide/1-event-sourced-shopping-cart.md b/akka-distributed-cluster-docs/src/main/paradox/guide/1-event-sourced-shopping-cart.md index 23604416d..77b9a6bf7 100644 --- a/akka-distributed-cluster-docs/src/main/paradox/guide/1-event-sourced-shopping-cart.md +++ b/akka-distributed-cluster-docs/src/main/paradox/guide/1-event-sourced-shopping-cart.md @@ -138,7 +138,7 @@ Java ## Running the sample -The complete sample can be downloaded from github, but note that it also includes the next step of the guide: +The complete sample can be downloaded from GitHub, but note that it also includes the next step of the guide: * Java: https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-cart-service-java * Scala: https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-cart-service-scala diff --git a/akka-distributed-cluster-docs/src/main/paradox/guide/2-service-to-service.md b/akka-distributed-cluster-docs/src/main/paradox/guide/2-service-to-service.md index e67a59dcc..84905acd7 100644 --- a/akka-distributed-cluster-docs/src/main/paradox/guide/2-service-to-service.md +++ b/akka-distributed-cluster-docs/src/main/paradox/guide/2-service-to-service.md @@ -171,7 +171,7 @@ See @apidoc[akka.projection.grpc.consumer.ConsumerFilter] for full API documenta ## Complete Sample Projects -The complete sample can be downloaded from github, the shopping cart: +The complete sample can be downloaded from GitHub, the shopping cart: * Java: https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-cart-service-java * Scala: https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-cart-service-scala diff --git a/akka-distributed-cluster-docs/src/main/paradox/guide/3-active-active.md b/akka-distributed-cluster-docs/src/main/paradox/guide/3-active-active.md index e06ff66dc..027133c8f 100644 --- a/akka-distributed-cluster-docs/src/main/paradox/guide/3-active-active.md +++ b/akka-distributed-cluster-docs/src/main/paradox/guide/3-active-active.md @@ -147,7 +147,7 @@ Java ## Complete Sample Projects -The complete sample can be downloaded from github, the replicated shopping cart: +The complete sample can be downloaded from GitHub, the replicated shopping cart: * Java: https://github.com/akka/akka-projection/tree/main/samples/replicated/shopping-cart-service-java * Scala: https://github.com/akka/akka-projection/tree/main/samples/replicated/shopping-cart-service-scala diff --git a/akka-edge-docs/src/main/paradox/guide.md b/akka-edge-docs/src/main/paradox/guide.md index 2f73ba086..f2f628fe4 100644 --- a/akka-edge-docs/src/main/paradox/guide.md +++ b/akka-edge-docs/src/main/paradox/guide.md @@ -1,4 +1,25 @@ # Guide -As a hands-on guide we will step by step implement ... FIXME +As a hands-on guide we will step by step implement a drone restaurant to customer delivery service where orders for deliveries +between restaurants and customers are created in a service in the cloud. + +The drones continuously inform their PoP (point of presence) local control center about their exact location. An approximate +location of the drones are replicated to the cloud service at a much lower frequency, only when the drones change +location on a coarse grained GPS coordinate grid. + +The cloud service accepts restaurant orders and replicate them to the right control center. Drones interact with +the local center to pick up available orders closest to their location. + +@@toc { depth=2 } + +@@@ index + +1. [Local Drone Control Service](guide/1-local-drone-control-service.md) +2. [Coarse Grained Location Replication](guide/2-drone-location-to-cloud-service.md) +3. [Restaurant Deliveries Service](guide/3-restaurant-deliveries-service.md) +4. [Local Drone Delivery Selection](guide/4-local-drone-delivery-selection.md) +5. [Deploying the Restaurant Delivery Service](guide/5-deploying-delivery-service.md) +6. [Deploying the Local Drone Control Service](guide/6-deploying-local-drone-control-service.md) + +@@@ diff --git a/akka-edge-docs/src/main/paradox/guide/1-local-drone-control-service.md b/akka-edge-docs/src/main/paradox/guide/1-local-drone-control-service.md new file mode 100644 index 000000000..4b8621412 --- /dev/null +++ b/akka-edge-docs/src/main/paradox/guide/1-local-drone-control-service.md @@ -0,0 +1,220 @@ +# Local Drone Control Service + +As the other features of Akka Edge are build on top of Event Sourcing, let us start by implementing a digital twin +for drones using the @extref[Akka Event Sourced Behavior API](akka:typed/persistence.html). + +![Diagram showing 2 drone entities in a Local Drone Control Service](../images/guide-section-1.svg) + +We will represent drone as an Event Sourced entity, if you are unfamiliar with Event Sourcing, refer to the +@extref[Event Sourcing section in the Akka guide](akka-guide:concepts/event-sourcing.html) for an explanation. +The [Event Sourcing with Akka video](https://akka.io/blog/news/2020/01/07/akka-event-sourcing-video) is also a good starting point for learning Event Sourcing. + +For drones to communicate their location to the digital twin we will create a gRPC API. + +When this first step is completed, the drones will be able to report their location and users inspect the current +location of a drone connected to the local control center PoP. + +## Implementing a Drone digital twin + +### Commands and events + +Commands are the public API of an entity that other parts of the system use to interact with it. Entity state can only +be changed by commands. The results of commands are emitted as events. A command can request state changes, and different +events might be generated depending on the current state of the entity. A command can also be rejected if it has invalid +input or can’t be handled by the current state of the entity. + +The Drone only accepts two commands: `ReportLocation` and `GetLocation`. When the reported location changes it always persists +a `PositionUpdated` event, but additionally, whenever the position means it changed place on a more coarse grained grid, +it also emits a `CoarseGrainedLocationChanged` event. We will revisit the reason for the coarse grained event in the next +step in this guide. + +The definition of the commands and events look like this: + +Scala +: @@snip [Drone.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala) { #commands #events } + +Java +: @@snip [Drone.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java) { #commands #events } + +### State + +When the location is reported it is kept as a `currentState`, additionally the 100 previous reported locations are kept +in a list. + +The list of historical locations is not currently used for anything but is here to show that an entity could keep a time window of +fine-grained information to make local decisions at a detail level that would be impractical and maybe not even interesting +to report to a central cloud service. + +Scala +: @@snip [Drone.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala) { #state } + +Java +: @@snip [Drone.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java) { #state } + + +### Command handler + +The Drone entity will receive commands that report when the Drone changes location. We will implement a command handler to process these commands and emit a reply. + +The command handler for the Drone looks like this: + +Scala +: @@snip [Drone.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala) { #commandHandler } + +Java +: @@snip [Drone.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java) { #commandHandler } + +Note how the handler de-duplicates reports of the same location, immediately replying with an acknowledgement without +persisting any change. + +In addition to storing the `PositionUpdated` event, the command handler also calculates a coarse grained location, and +persists a `CoarseGrainedLocationChanged` event as well, if it did change from the previous coarse grained location. + +### Event handler + +From commands, the entity creates events that represent state changes. Aligning with the command handler above, the entity’s event handler reacts to events and updates the state. The events are continuously persisted to the Event Journal datastore, while the entity state is kept in memory. Other parts of the application may listen to the events. In case of a restart, the entity recovers its latest state by replaying the events from the Event Journal. + +The event handler only reacts to the `PositionUpdated` event and ignores the `CoarseGrainedLocationChanged` as the coarse +grained location can be calculated from the more fine-grained position coordinates: + +Scala +: @@snip [Drone.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala) { #eventHandler } + +Java +: @@snip [Drone.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java) { #eventHandler } + +### Serialization + +The state and events of the entity must be serializable because they are written to the datastore, if the local drone control needs to scale out across several nodes to handle traffic, the commands would also be sent between nodes within the Akka cluster. The sample project includes built-in CBOR serialization using the @extref[Akka Serialization Jackson module](akka:serialization-jackson.html). This section describes how serialization is implemented. You do not need to do anything specific to take advantage of CBOR, but this section explains how it is included. + +The state, commands and events are marked as CborSerializable which is configured to use the built-in CBOR serialization. The sample project includes this marker interface CborSerializable: + +Scala +: @@snip [CborSerializable.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/CborSerializable.scala) { } + +Java +: @@snip [CborSerializable.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/CborSerializable.java) { } + + +Configuration in the application configuration to select the serializer: + +Scala +: @@snip [serialization.conf](/samples/grpc/local-drone-control-scala/src/main/resources/serialization.conf) { } + +Java +: @@snip [serialization.conf](/samples/grpc/local-drone-control-java/src/main/resources/serialization.conf) { } + +### Journal storage + +In this sample we use Akka Persistence R2DBC with the H2 in-process database, with a file backed storage. H2 requires no +additional external database service so can be convenient for both development and production usage where only a single +node interacts with the journal and overhead needs to be kept low. + +It is of course also possible to instead use a separate standalone database such as for example PostgreSQL. + +Config to use H2 looks like this: + +Scala +: @@snip [persistence.conf](/samples/grpc/local-drone-control-scala/src/main/resources/persistence.conf) { } + +Java +: @@snip [persistence.conf](/samples/grpc/local-drone-control-java/src/main/resources/persistence.conf) { } + +In addition to the configuration, the following additional dependencies are needed in the project build: + +@@dependency [sbt,Maven,Gradle] { +group=com.h2database +artifact=h2 +version=$h2.version$ +group2=io.r2dbc +artifact2=r2dbc-h2 +version2=$r2dbc-h2.version$ +} + +## gRPC Service API for the drone communication + +To allow drones to actually use the service we need a public API reachable over the network. For this we will use @extref[Akka gRPC](akka-grpc:) +giving us a type safe, efficient protocol that allows clients to be written in many languages. + +The service descriptor for the API is defined in protobuf, it implements the report command that entity accepts but not +one matching the get location command: + +Scala +: @@snip [local.drones.drone_api.proto](/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_api.proto) { } + +Java +: @@snip [local.drones.drone_api.proto](/samples/grpc/local-drone-control-java/src/main/protobuf/local/drones/drone_api.proto) { } + +When compiling the project the Akka gRPC @scala[sbt]@java[maven] plugin generates a service interface for us to implement. +Our implementation of it interacts with the entity: + +Scala +: @@snip [DroneServiceImpl.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala) { } + +Java +: @@snip [DroneServiceImpl.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneServiceImpl.java) { } + +Finally, we need to start the gRPC server, making service implementation available for calls from drones: + +Scala +: @@snip [LocalDroneControlServer.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/LocalDroneControlServer.scala) { #bind } + +Java +: @@snip [LocalDroneControlServer.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/LocalDroneControlServer.java) { #bind } + +The Akka HTTP server must be running with HTTP/2 to serve gRPC, this is done through config: + +Scala +: @@snip [grpc.conf](/samples/grpc/local-drone-control-scala/src/main/resources/grpc.conf) { #http2 } + +Java +: @@snip [grpc.conf](/samples/grpc/local-drone-control-java/src/main/resources/grpc.conf) { #http2 } + +## Wiring it all up + +The main of this service starts up an actor system with a root behavior which is responsible for bootstrapping all +parts of the application. + +Note that the bootstrap contains some parts not yet described, the `DroneEvents` and the `DeliveriesQueue`. +They will be covered in the following sections of this guide and can be ignored for now. + +Scala +: @@snip [Main.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala) { #main } + +Java +: @@snip [Main.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/Main.java) { #main } + +## Running the sample + +The complete sample can be downloaded from GitHub, but note that it also includes the next steps of the guide: + +* Java: https://github.com/akka/akka-projection/tree/main/samples/grpc/local-drone-control-service-java +* Scala: https://github.com/akka/akka-projection/tree/main/samples/grpc/local-drone-control-service-scala + +@@@ div { .group-scala } + +To start the sample: + +```shell +sbt run +``` + +@@@ + +@@@ div { .group-java } + +```shell +mvn compile exec:exec +``` + +@@@ + +Try it with [grpcurl](https://github.com/fullstorydev/grpcurl): + +```shell +grpgrpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.07125, "latitude": 59.31834}, "altitude": 5}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation +``` + +## What's next? + +* Consuming the published coarse grained drone locations in a cloud service \ No newline at end of file diff --git a/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md b/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md new file mode 100644 index 000000000..9cae370f6 --- /dev/null +++ b/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md @@ -0,0 +1,318 @@ +# Coarse Grained Location Replication + +In the previous step of the guide we implemented the PoP local control service keeping track of drone locations. + +We also want to publish coarse grained location updates at a low frequency from the edge to a central cloud service. + +![Diagram showing local drone entities replication to cloud service](../images/guide-section-2.svg) + +In this step we will cover publishing and consuming those events, passing them to a stateful overview-version of the +drone digital twin. We will use @extref[Akka Projection gRPC](akka-projection:grpc.html) to do service-to-service events passing with an +effectively once delivery guarantee, this without requiring a message broker in between services. The cloud and the many +PoP services each has their own lifecycle and are deployed separately in different places. + +We will then implement a gRPC service for querying the coarse grained location of the global set of drones in the cloud service. + +## Coarse grained location aggregation and publishing + +We have already seen the additional `CoarseGrainedLocationChanged` event persisted in the previous step of the guide. +Now we will update the local-drone-control service to also publish these aggregate events upstream to a cloud service +so that it can keep a rough overview of where all drones are without needing to handle the global load of detailed and +frequent updates from all drones. + +Normally for Akka gRPC projections the consumer initiates the connection, but in edge scenarios it might be problematic +because of firewalls not allowing the cloud to connect to each PoP. The normal consumer initiated connections also means +that all producers must be known up front by the consumer. + +To solve this the local control center push events to the cloud using @extref[Akka gRPC projection with producer push](akka-projection:grpc-producer-push.html) +which means the control center will initiate the connection. + +The actual pushing of events is implemented as a single actor behavior, if partitioning is needed for scaling that is also possible +by letting multiple actors handle partitions of the entire stream of events from local drones. + +Scala +: @@snip [DroneEvents.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala) { } + +Java +: @@snip [DroneEvents.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneEvents.java) { } + +Two important things to note: + +1. A producer filter is applied to only push `CoarseGrainedLocationChanged` and not the fine-grained `PositionUpdated` events. +2. The internal domain representation of `CoarseGrainedLocationChanged` is transformed into an explicit public protocol + protobuf message `local.drones.proto.CoarseDroneLocation` message, for loose coupling between consumer and producer and + easier evolution over time without breaking wire compatibility. +3. The service defines a "location name" which is a unique identifier of the PoP in the format `country/city/part-of-city`, + it is used as `originId` for the producer push stream, identifying where the stream of events come from. + +## Producer Push Destination + +The producer push destination is a gRPC service where producers push events, the events are persisted in a local journal +as is, or after a transformation to an internal representation. For more details see @extref[Akka gRPC projection with producer push documentation](akka-projection:grpc-producer-push.html). + +We'll implement the producer push destination in a new separate service, intended to run as a clustered deployment in the cloud, where all the +local control services will push their aggregate events, the "Restaurant Drone Deliveries Service". + +In addition to accepting the events, we pick the local control center location (which is in the format `country/city/part-of-city`) passed as +producer `originId` on producer connection, and put it in a tag for the event. + +The setup logic looks like this: + +Scala +: @@snip [DroneEvents.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala) { #eventConsumer } + +Java +: @@snip [DroneEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java) { #eventConsumer } + +The returned @scala[PartialFunction]@java[Function] is an Akka HTTP gRPC request handler that can be bound directly in an HTTP server or +combined with multiple other request handlers and then bound as a single server: + +Scala +: @@snip [DroneDeliveriesServer.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala) { #composeAndBind } + +Java +: @@snip [DroneDeliveriesServer.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java) { #composeAndBind } + +As persistent storage for the event journal we are using PostgreSQL, we cannot use H2 like the local drone control service, as the +central cloud service is clustered and needs an external database that can accept connections from multiple separate cluster nodes. + +Config to use PostgreSQL looks like this: + +Scala +: @@snip [persistence.conf](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/persistence.conf) { } + +Java +: @@snip [persistence.conf](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf) { } + + +## Consuming the pushed events + +What we have set up only means that the pushed events are written into our local journal, to do something useful with the +events we need to run a projection consuming the events. We'll turn them into commands and send them to an entity. + +The projection is run as @extref[Akka Sharded Daemon Process](akka:typed/cluster-sharded-daemon-process.html) to partition +the global stream of events among multiple consumers balanced over the nodes of the restaurant-drone-deliveries-service. + +The handler of the projection turns the protobuf message `CoarseDroneLocation` pushed by the producer and stored in the +local journal into a `Drone.UpdateLocation` and sends it over Akka Cluster Sharding to the right drone overview entity: + +Scala +: @@snip [DroneEvents.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala) { #eventProjection } + +Java +: @@snip [DroneEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java) { #eventProjection } + +FIXME this is a lot in one snippet, split it up in multiple parts? + +## Durable State Drone Overview + +For the cloud representation of the drones, only containing the rough location, we use @extref[Durable State](akka:typed/durable-state/persistence.html), +which instead of event sourcing is more like a key-value storage, where each actor command can lead to persisting the complete state of the actor. + +### Commands + +The cloud drone entity supports two commands, much like the PoP drone entity. The commands are `UpdateLocation` and `GetState`: + +Scala +: @@snip [Drone.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Drone.scala) { #commands } + +Java +: @@snip [Drone.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java) { #commands } + +### State + +The state of the drone contains the location name of the PoP it talked to last, the coarse grained coordinates it last reported if it did and the +timestamp when that happened: + +Scala +: @@snip [Drone.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Drone.scala) { #state } + +Java +: @@snip [Drone.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java) { #state } + +The initial state, signifying that the durable state newer saw any update uses a special location "unknown", and +an @scala[`None`]@java[`Optional.empty()`] for its location. + + +Scala +: @@snip [Drone.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Drone.scala) { #emptyState } + +Java +: @@snip [Drone.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java) { #emptyState } + + +### Command handler + +The command handler turns `UpdateLocation` commands into an update to its state and persists that and replies with the +current state for the `GetState` command: + +Scala +: @@snip [Drone.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Drone.scala) { #commandHandler } + +Java +: @@snip [Drone.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java) { #commandHandler } + + + +## Storing query representation for easier querying + +The Akka R2DBC plugin contains a feature for @extref[writing durable state query representation](akka-persistence-r2dbc:durable-state-store.html#storing-query-representation) in the same transaction as the state update. We'll use that to write the `locationName` to +a queryable column in the PostgreSQL database. + +First step is to add a `location` column to the durable state table schema: + +Scala +: @@snip [create_tables.sql](/samples/grpc/restaurant-drone-deliveries-service-scala/ddl-scripts/create_tables.sql) { #queryableColumn } + +Java +: @@snip [create_tables.sql](/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/create_tables.sql) { #queryableColumn } + +We then implement @scala[`AdditionalColumn[Drone.State, String]`]@java[`AdditionalColumn`]: + +Scala +: @@snip [LocationColumn.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Drone.scala) { #locationColumn } + +Java +: @@snip [LocationColumn.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocationColumn.java) { } + +Finally, we add configuration to make Akka Persistence R2DBC use the `LocationColumn` when writing state: + +Scala +: @@snip [persistence.conf](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/persistence.conf) { #locationColumn } + +Java +: @@snip [persistence.conf](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf) { #locationColumn } + + +## gRPC service + +To make it possible for users of our service to query the drone overviews we define a gRPC service: + +Scala +: @@snip [drone_overview_api.proto](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/central/drones/drone_overview_api.proto) { } + +Java +: @@snip [drone_overview_api.proto](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/drones/drone_overview_api.proto) { } + +And implement the service interface Akka gRPC generates for it. + +The `getDroneOverview` method asks the drone entity directly about its current state. + +The `getCoarseDroneLocations` is a bit more involved, querying using the `locations` column, and then using +the Akka Serialization infrastructure to deserialize the found drone `State` instances. Grouping the drones +in the same coarse grained location and finally turning that map into a protobuf response message: + +Scala +: @@snip [DroneOverviewServiceImpl.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/DroneOverviewServiceImpl.scala) { } + +Java +: @@snip [DroneOverviewServiceImpl.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/DroneOverviewServiceImpl.java) { } + +Finally, we need to start the HTTP server, making service implementation available for calls from drones: + +Scala +: @@snip [LocalDroneControlServer.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala) { #composeAndBind } + +Java +: @@snip [LocalDroneControlServer.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java) { #composeAndBind } + +## Running the sample + +The complete sample can be downloaded from GitHub, but note that it also includes the next steps of the guide: + +* Java: https://github.com/akka/akka-projection/tree/main/samples/grpc/restaurant-drone-deliveries-service-java +* Scala: https://github.com/akka/akka-projection/tree/main/samples/grpc/restaurant-drone-deliveries-service-scala + +As this service consumes events from the service built in the previous step, start the local-drone-control service first: + +@@@ div { .group-scala } + +```shell +sbt run +``` + +@@@ + +@@@ div { .group-java } + +```shell +mvn compile exec:exec +``` + +@@@ + +Then start the drone-restaurant-deliveries-service. + +As the service needs a PostgreSQL instance running, start that up in a docker container and create the database +schema: + +```shell +docker compose up --wait +docker exec -i postgres_db psql -U postgres -t < ddl-scripts/create_tables.sql +``` + +Then start the service: + +@@@ div { .group-scala } + +```shell +sbt -Dconfig.resource=local1.conf run +``` + +And optionally one or two more Akka cluster nodes, but note that the local drone controls +are statically configured to the gRPC port of the first and will only publish events to that node. + +```shell +sbt -Dconfig.resource=local2.conf run +sbt -Dconfig.resource=local3.conf run +``` + +@@@ + +@@@ div { .group-java } + +```shell +mvn compile exec:exec -DAPP_CONFIG=local1.conf +``` + +And optionally one or two more Akka cluster nodes, but note that the local drone controls +are statically configured to the gRPC port of the first and will only publish events to that node. + +```shell +mvn compile exec:exec -DAPP_CONFIG=local2.conf +mvn compile exec:exec -DAPP_CONFIG=local3.conf +``` + +@@@ + + +Now update one or more drones a few times with [grpcurl](https://github.com/fullstorydev/grpcurl) against the local-drone-control: + +```shell +grpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.07125, "latitude": 59.31834}, "altitude": 5}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation + +grpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.08125, "latitude": 59.41834}, "altitude": 10}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation + +grpcurl -d '{"drone_id":"drone2", "coordinates": {"longitude": 18.07125, "latitude": 59.41834}, "altitude": 8 }' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation + +grpcurl -d '{"drone_id":"drone2", "coordinates": {"longitude": 18.07125, "latitude": 59.41834}, "altitude": 8 }' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation + +grpcurl -d '{"drone_id":"drone2", "coordinates": {"longitude": 18.08114, "latitude": 59.42122}, "altitude": 8 }' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation +``` + +Then query the cloud service: +```shell +grpcurl -d '{"location":"sweden/stockholm/kungsholmen"}' -plaintext localhost:8101 central.drones.DroneOverviewService/GetCoarseDroneLocations +``` + +If you posted the drone location updates above you should see the two drones listed at the same coarse grained coordinates. + +You can also query the individual drones for their specific coarse grained location: + +```shell +grpcurl -d '{"drone_id":"drone1"}' -plaintext localhost:8101 central.drones.DroneOverviewService.GetDroneOverview +``` + +## What's next? + +* Accept restaurant delivery orders in the restaurant-drone-deliveries-service \ No newline at end of file diff --git a/akka-edge-docs/src/main/paradox/guide/3-restaurant-deliveries-service.md b/akka-edge-docs/src/main/paradox/guide/3-restaurant-deliveries-service.md new file mode 100644 index 000000000..5695f25a1 --- /dev/null +++ b/akka-edge-docs/src/main/paradox/guide/3-restaurant-deliveries-service.md @@ -0,0 +1,153 @@ +# Restaurant deliveries + +In this section we will update the restaurant-drone-deliveries-service to accept and store restaurant delivery orders. + +![Diagram showing restaurant entities keeping track of deliveries in the cloud](../images/guide-section-3.svg) + +Just like for the drones in the first step of the guide we will represent each restaurant and its list of orders as +an @extref[Event Sourced Entity](akka:typed/persistence.html). + +## Implementing the restaurant entity + +### Commands and events + +The `RestaurantDeliveries` actor represents deliveries for one restaurant. It accepts the commands `SetUpRestaurant` to +initialize a restaurant with its location, and `RegisterDelivery` and `ListCurrentDeliveries` to add and inspect the +current deliveries of the restaurant: + +Scala +: @@snip [RestaurantDeliveries.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveries.scala) { #commands #events } + +Java +: @@snip [RestaurantDeliveries.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveries.java) { #commands #events } + + +### State + +The state starts out as @java[`null`]@scala[`None`] and then, once the restaurant has been set up will be @scala[`Some` containing] an instance of `State`, +which contains the coordinates of the restaurant, the location id of the local-drone-control-service it is closest to, and a list of registered deliveries: + +Scala +: @@snip [RestaurantDeliveries.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveries.scala) { #state } + +Java +: @@snip [RestaurantDeliveries.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveries.java) { #state } + + +### Command handler + +Since the `RestaurantDeliveries` can be started with no state, it has two different command handlers, one for no state, +where it only accepts the `SetUpRestaurant` command, and one where it accepts the delivery related commands: + +Scala +: @@snip [RestaurantDeliveries.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveries.scala) { #commandHandler } + +Java +: @@snip [RestaurantDeliveries.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveries.java) { #commandHandler } + +### gRPC service + +To make it possible for users of our service to administer the available restaurants and their queued orders we define +a gRPC service with one endpoint for setting up restaurants and one to register a delivery for an already set up restaurant: + +Scala +: @@snip [drone_overview_api.proto](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/central/deliveries/restaurant_deliveries_api.proto) { } + +Java +: @@snip [drone_overview_api.proto](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/deliveries/restaurant_deliveries_api.proto) { } + +And implement the service interface Akka gRPC generates for it. + +The `setUpRestaurant` method first validates that the location id in the request is known, from a pre-defined set of locations in the application config. +Then sends a `SetUpRestaurant` command to the entity to set it up. + +Scala +: @@snip [RestaurantDeliveriesServiceImpl.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveriesServiceImpl.scala) { } + +Java +: @@snip [RestaurantDeliveriesServiceImpl.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveriesServiceImpl.java) { } + + + +## Running the sample + +The complete sample can be downloaded from GitHub, but note that it also includes the next step of the guide: + +* Java: https://github.com/akka/akka-projection/tree/main/samples/grpc/restaurant-drone-deliveries-service-java +* Scala: https://github.com/akka/akka-projection/tree/main/samples/grpc/restaurant-drone-deliveries-service-scala + +In this step we created a local entity, so we can try it out by running the restaurant-drone-deliveries-service without +any local-drone-control services. + +To start the drone-restaurant-deliveries-service. + +As the service needs a PostgreSQL instance running, start that up in a docker container and create the database +schema (if you didn't do that in the previous guide step): + +```shell +docker compose up --wait +docker exec -i postgres_db psql -U postgres -t < ddl-scripts/create_tables.sql +``` + +Then start the service: + +@@@ div { .group-scala } + +```shell +sbt -Dconfig.resource=local1.conf run +``` + +And optionally one or two more Akka cluster nodes, but note that the local drone controls +are statically configured to the gRPC port of the first and will only publish events to that node. + +```shell +sbt -Dconfig.resource=local2.conf run +sbt -Dconfig.resource=local3.conf run +``` + +@@@ + +@@@ div { .group-java } + +```shell +mvn compile exec:exec -DAPP_CONFIG=local1.conf +``` + +And optionally one or two more Akka cluster nodes, but note that the local drone controls +are statically configured to the gRPC port of the first and will only publish events to that node. + +```shell +mvn compile exec:exec -DAPP_CONFIG=local2.conf +mvn compile exec:exec -DAPP_CONFIG=local3.conf +``` + +@@@ + +Create a restaurant with [grpcurl](https://github.com/fullstorydev/grpcurl): + +```shell +grpcurl -d '{"restaurant_id":"restaurant1","coordinates":{"latitude": 59.330324, "longitude": 18.039568}, "local_control_location_id": "sweden/stockholm/kungsholmen" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant +``` + +Set up another restaurant, closest to a different local drone control + +```shell +grpcurl -d '{"restaurant_id":"restaurant2","coordinates":{"latitude": 59.342046, "longitude": 18.059095}, "local_control_location_id": "sweden/stockholm/norrmalm" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant +``` + +Register a delivery for the first restaurant + +```shell +grpcurl -d '{"restaurant_id":"restaurant1","delivery_id": "order1","coordinates":{"latitude": 59.330841, "longitude": 18.038885}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery +``` + +Register a delivery for the second restaurant + +```shell +grpcurl -d '{"restaurant_id":"restaurant2","delivery_id": "order2","coordinates":{"latitude": 59.340128, "longitude": 18.056303}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery +``` + +## What's next? + + * Replicate the restaurant orders to the right local-drone-control PoP + * Let a drone pick up the closest waiting order from the local-drone-control \ No newline at end of file diff --git a/akka-edge-docs/src/main/paradox/guide/4-local-drone-delivery-selection.md b/akka-edge-docs/src/main/paradox/guide/4-local-drone-delivery-selection.md new file mode 100644 index 000000000..dbccd1d52 --- /dev/null +++ b/akka-edge-docs/src/main/paradox/guide/4-local-drone-delivery-selection.md @@ -0,0 +1,273 @@ +# Local Drone Delivery Selection + +In the previous step of the guide we implemented the means for the cloud service to keep track of restaurant and the +queue of registered deliveries for each. + +![Diagram showing delivery replication to the local drone control services](../images/guide-section-4.svg) + +We want to replicate the registered events to each local-drone-control PoP so that the drones close to it can pick up +orders and perform the deliveries. + +Again we will use @extref[Akka Projection gRPC](akka-projection:grpc.html) to do service-to-service events passing +without requiring a message broker in between services. + +We will then implement a service allowing the drones to ask the local-drone-control to assign them the closest waiting +delivery. + +## Replication of the delivery events + +First we must set up replication of the events from the restaurant-drone-deliveries-service. + +The regular @extref[Akka Projection gRPC](akka-projection:grpc.html) behavior is that the consumer connects to the +producer, in this case the local-drone-control being the consumer connecting to the cloud. + +To implement this we define an `EventProducerSource` and create a gRPC request handler for it. We use a protobuf message +that we transform the internal domain event `DeliveryRegistered` using a `Transformation`. Any other message type +is filtered out and not replicated to the consumers using the `orElseMapper`: + +Scala +: @@snip [DeliveryEvents.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/DeliveryEvents.scala) { } + +Java +: @@snip [DeliveryEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/DeliveryEvents.java) { } + + +The gRPC request handler is composed with the other gRPC handlers of the service into a single bound server: + +Scala +: @@snip [DroneDeliveriesServer.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala) { #composeAndBind } + +Java +: @@snip [DroneDeliveriesServer.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java) { #composeAndBind } + + +## Delivery queue actor + +The queue of all deliveries for one local-drone-control service is managed by a single durable state actor to keep things simple. + +For a high throughput of deliveries, a single actor might become a congestion point and a more clever scheme, for example partitioning +the deliveries into multiple queues based on the coarse grained coordinate of the restaurant, could make sense. + +### Commands and events + +The actor accepts the command `AddDelivery` to enqueue a delivery, the commands `RequestDelivery` and `CompleteDelivery` +for drones to pick up and complete deliveries and `GetCurrentState` for us to be able to inspect the current state of the queue: + +Scala +: @@snip [DeliveriesQueue.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueue.scala) { #commands } + +Java +: @@snip [DeliveriesQueue.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DeliveriesQueue.java) { #commands } + +### State + +In the state of the actor, we keep two lists, one is the waiting queue of deliveries, and one is the currently picked up +deliveries, waiting for the drone to report back once the delivery completed: + +Scala +: @@snip [DeliveriesQueue.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueue.scala) { #state } + +Java +: @@snip [DeliveriesQueue.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DeliveriesQueue.java) { #state } + +### Command handler + +The command handler de-duplicates orders by id for `AddDelivery` to avoid duplicates. + +When a `RequestDelivery` comes in, we first check that there are deliveries waiting, and if there are we find the one +where the restaurant is closest to the current location of the drone. We then move the delivery from the `waitingDeliveries` queue +to the `deliveriesInProgress` list, so that it is not selected again for another drone, and persist the state. + +For the `CompleteDelivery` command, the delivery is removed from the state and then the updated state is persisted. + +Scala +: @@snip [DeliveriesQueue.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueue.scala) { #commandHandler } + +Java +: @@snip [DeliveriesQueue.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DeliveriesQueue.java) { #commandHandler } + +## Consuming the delivery events + +To consume the stream of delivery events from the central cloud we need to set up a projection. We only want to consume +the events for the location id of the particular local-drone-control service, this is done through a consumer filter +first excluding all events and then selecting only the events for the configured location id: + +Scala +: @@snip [DeliveryEvents.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveryEvents.scala) { } + +Java +: @@snip [DeliveryEvents.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DeliveryEvents.java) { } + +## gRPC services + +### Drone deliveries + +The method for drones to select the next delivery, and to complete it are added to the existing drone service: + +Scala +: @@snip [local.drones.drone_api.proto](/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_api.proto) { } + +Java +: @@snip [local.drones.drone_api.proto](/samples/grpc/local-drone-control-java/src/main/protobuf/local/drones/drone_api.proto) { } + +Implementation of the generated service interface: + +Scala +: @@snip [DroneServiceImpl.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala) { } + +Java +: @@snip [DroneServiceImpl.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneServiceImpl.java) { } + + +### Inspecting the queue + +We add a new gRPC service for inspecting the current state of the queue: + +Scala +: @@snip [local.drones.deliveries_queue_api.proto](/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/deliveries_queue_api.proto) { } + +Java +: @@snip [local.drones.deliveries_queue_api.proto](/samples/grpc/local-drone-control-java/src/main/protobuf/local/drones/deliveries_queue_api.proto) { } + +Implementation of the generated service interface: + +Scala +: @@snip [DeliveriesQueueServiceImpl.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueueServiceImpl.scala) { } + +Java +: @@snip [DeliveriesQueueServiceImpl.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DeliveriesQueueServiceImpl.java) { } + + +Finally, we need to start the gRPC server with the two services: + +Scala +: @@snip [LocalDroneControlServer.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala) { #composeAndBind } + +Java +: @@snip [LocalDroneControlServer.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java) { #composeAndBind } + +## Running the sample + +The complete sample can be downloaded from GitHub, but note that it also includes the next steps of the guide: + +* Java: https://github.com/akka/akka-projection/tree/main/samples/grpc/restaurant-drone-deliveries-service-java +* Scala: https://github.com/akka/akka-projection/tree/main/samples/grpc/restaurant-drone-deliveries-service-scala + +As this service consumes events from the service built in the previous step, start the local-drone-control service first: + +@@@ div { .group-scala } + +To start the local-drone-control-service: + +```shell +sbt run +``` + +@@@ + +@@@ div { .group-java } + +```shell +mvn compile exec:exec +``` + +@@@ + +Then start the drone-restaurant-deliveries-service. + +As the service needs a PostgreSQL instance running, start that up in a docker container and create the database +schema if you did not do that in a previous step of the guide: + +```shell +docker compose up --wait +docker exec -i postgres_db psql -U postgres -t < ddl-scripts/create_tables.sql +``` + +Then start the service: + +@@@ div { .group-scala } + +```shell +sbt -Dconfig.resource=local1.conf run +``` + +And optionally one or two more Akka cluster nodes, but note that the local drone controls +are statically configured to the gRPC port of the first and will only publish events to that node. + +```shell +sbt -Dconfig.resource=local2.conf run +sbt -Dconfig.resource=local3.conf run +``` + +@@@ + +@@@ div { .group-java } + +```shell +mvn compile exec:exec -DAPP_CONFIG=local1.conf +``` + +And optionally one or two more Akka cluster nodes, but note that the local drone controls +are statically configured to the gRPC port of the first and will only publish events to that node. + +```shell +mvn compile exec:exec -DAPP_CONFIG=local2.conf +mvn compile exec:exec -DAPP_CONFIG=local3.conf +``` + +@@@ + +Create a restaurant with [grpcurl](https://github.com/fullstorydev/grpcurl): + +```shell +grpcurl -d '{"restaurant_id":"restaurant1","coordinates":{"latitude": 59.330324, "longitude": 18.039568}, "local_control_location_id": "sweden/stockholm/kungsholmen" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant +``` + +Set up another restaurant, closest to a different local drone control + +```shell +grpcurl -d '{"restaurant_id":"restaurant2","coordinates":{"latitude": 59.342046, "longitude": 18.059095}, "local_control_location_id": "sweden/stockholm/norrmalm" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant +``` + +Register a delivery for the first restaurant + +```shell +grpcurl -d '{"restaurant_id":"restaurant1","delivery_id": "order1","coordinates":{"latitude": 59.330841, "longitude": 18.038885}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery +``` + +Register a delivery for the second restaurant + +```shell +grpcurl -d '{"restaurant_id":"restaurant2","delivery_id": "order2","coordinates":{"latitude": 59.340128, "longitude": 18.056303}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery +``` + +Now update one or more drones a few times with [grpcurl](https://github.com/fullstorydev/grpcurl) against the local-drone-control: + +```shell +grpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.07125, "latitude": 59.31834}, "altitude": 5}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation + +grpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.08125, "latitude": 59.41834}, "altitude": 10}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation + +grpcurl -d '{"drone_id":"drone2", "coordinates": {"longitude": 18.08114, "latitude": 59.42122}, "altitude": 8 }' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation +``` + +Request a delivery for drone1 + +```shell +grpcurl -d '{"drone_id":"drone1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.RequestNextDelivery +``` + +Mark the delivery as completed +```shell +grpcurl -d '{"delivery_id":"order1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.CompleteDelivery +``` + +Inspect the current state of the local delivery queue + +```shell +grpcurl -plaintext 127.0.0.1:8080 local.drones.DeliveriesQueueService.GetCurrentQueue +``` + +## What's next? + +* Packaging up the two services for deployment \ No newline at end of file diff --git a/akka-edge-docs/src/main/paradox/guide/5-deploying-delivery-service.md b/akka-edge-docs/src/main/paradox/guide/5-deploying-delivery-service.md new file mode 100644 index 000000000..8dc719b51 --- /dev/null +++ b/akka-edge-docs/src/main/paradox/guide/5-deploying-delivery-service.md @@ -0,0 +1,5 @@ +# Deploying the Restaurant Delivery Service + +FIXME Outline: + + * normal k8 stuff, like the dist cluster guide diff --git a/akka-edge-docs/src/main/paradox/guide/6-deploying-local-drone-control-service.md b/akka-edge-docs/src/main/paradox/guide/6-deploying-local-drone-control-service.md new file mode 100644 index 000000000..9e71109fb --- /dev/null +++ b/akka-edge-docs/src/main/paradox/guide/6-deploying-local-drone-control-service.md @@ -0,0 +1,5 @@ +# Deploying the Local Drone Control Service + +FIXME Outline: + + * the big unknown, or is it just docker k8s (k3s)? \ No newline at end of file diff --git a/akka-edge-docs/src/main/paradox/images/guide-section-1.drawio b/akka-edge-docs/src/main/paradox/images/guide-section-1.drawio new file mode 100644 index 000000000..243b86dd6 --- /dev/null +++ b/akka-edge-docs/src/main/paradox/images/guide-section-1.drawio @@ -0,0 +1,68 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/akka-edge-docs/src/main/paradox/images/guide-section-1.svg b/akka-edge-docs/src/main/paradox/images/guide-section-1.svg new file mode 100644 index 000000000..867590a84 --- /dev/null +++ b/akka-edge-docs/src/main/paradox/images/guide-section-1.svg @@ -0,0 +1,3 @@ + + +
event
journal (H2)
event...
drone1
drone1
drone2
drone2
Local Drone Control
Local Drone Control
gRPC
gRPC
Text is not SVG - cannot display
\ No newline at end of file diff --git a/akka-edge-docs/src/main/paradox/images/guide-section-2.drawio b/akka-edge-docs/src/main/paradox/images/guide-section-2.drawio new file mode 100644 index 000000000..66c1c1c65 --- /dev/null +++ b/akka-edge-docs/src/main/paradox/images/guide-section-2.drawio @@ -0,0 +1,161 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/akka-edge-docs/src/main/paradox/images/guide-section-2.svg b/akka-edge-docs/src/main/paradox/images/guide-section-2.svg new file mode 100644 index 000000000..7faa85dbd --- /dev/null +++ b/akka-edge-docs/src/main/paradox/images/guide-section-2.svg @@ -0,0 +1,3 @@ + + +
event
journal (H2)
event...
drone1
drone1
drone2
drone2
Local Drone Control
Local Drone Control
gRPC
gRPC
journal/
state
store
journal/...
Consumer
Consumer
overview
drone1
overview...
overview
drone2
overview...
Projection
consuming events
Projection...
Restaurant Drone Deliveries Service
Restaurant Drone Deliveries Service
Producer
Producer
gRPC
gRPC
Text is not SVG - cannot display
\ No newline at end of file diff --git a/akka-edge-docs/src/main/paradox/images/guide-section-3.drawio b/akka-edge-docs/src/main/paradox/images/guide-section-3.drawio new file mode 100644 index 000000000..287b9ecd2 --- /dev/null +++ b/akka-edge-docs/src/main/paradox/images/guide-section-3.drawio @@ -0,0 +1,61 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/akka-edge-docs/src/main/paradox/images/guide-section-3.svg b/akka-edge-docs/src/main/paradox/images/guide-section-3.svg new file mode 100644 index 000000000..80b9e90df --- /dev/null +++ b/akka-edge-docs/src/main/paradox/images/guide-section-3.svg @@ -0,0 +1,3 @@ + + +
journal
journal
restaurant2
restaurant2
restaurant1
restaurant1
Restaurant Drone Deliveries Service
Restaurant Drone Deliveries Service
gRPC
gRPC
Text is not SVG - cannot display
\ No newline at end of file diff --git a/akka-edge-docs/src/main/paradox/images/guide-section-4.drawio b/akka-edge-docs/src/main/paradox/images/guide-section-4.drawio new file mode 100644 index 000000000..1fe3c5d1f --- /dev/null +++ b/akka-edge-docs/src/main/paradox/images/guide-section-4.drawio @@ -0,0 +1,173 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/akka-edge-docs/src/main/paradox/images/guide-section-4.svg b/akka-edge-docs/src/main/paradox/images/guide-section-4.svg new file mode 100644 index 000000000..4ab15c601 --- /dev/null +++ b/akka-edge-docs/src/main/paradox/images/guide-section-4.svg @@ -0,0 +1,3 @@ + + +
event
journal (H2)
event...
Local Drone Control
Local Drone Control
gRPC
gRPC
journal/
state
store
journal/...
Producer
Producer
restaurant2
restaurant2
restaurant1
restaurant1
Restaurant Drone Deliveries Service
Restaurant Drone Deliveries Service
Consumer
Consumer
gRPC
gRPC
drone2
drone2
drone1
drone1
DeliveriesQueue
DeliveriesQueue
gRPC
gRPC
Text is not SVG - cannot display
\ No newline at end of file diff --git a/build.sbt b/build.sbt index d8ffd8251..746ba235c 100644 --- a/build.sbt +++ b/build.sbt @@ -289,9 +289,12 @@ lazy val `akka-edge-docs` = project "github.base_url" -> "https://github.com/akka/akka-projection", "akka.version" -> Dependencies.Versions.akka, "akka.r2dbc.version" -> Dependencies.Versions.akkaPersistenceR2dbc, + "h2.version" -> Dependencies.Compile.h2.revision, + "r2dbc-h2.version" -> Dependencies.Compile.r2dbcH2.revision, "extref.akka-projection.base_url" -> s"https://doc.akka.io/docs/akka-projection/${Dependencies.AkkaProjectionVersionInDocs}/%s", "scaladoc.akka.projection.base_url" -> s"https://doc.akka.io/api/akka-projection/${Dependencies.AkkaProjectionVersionInDocs}/", - "extref.akka-distributed-cluster.base_url" -> s"https://doc.akka.io/docs/akka-distributed-cluster/${Dependencies.AkkaProjectionVersionInDocs}/%s"), + "extref.akka-distributed-cluster.base_url" -> s"https://doc.akka.io/docs/akka-distributed-cluster/${Dependencies.AkkaProjectionVersionInDocs}/%s", + "extref.akka-persistence-r2dbc.base_url" -> s"https://doc.akka.io/docs/akka-persistence-r2dbc/${Dependencies.AkkaPersistenceR2dbcVersionInDocs}/%s"), paradoxGroups := Map("Language" -> Seq("Java", "Scala")), paradoxRoots := List("index.html"), resolvers += Resolver.jcenterRepo, diff --git a/samples/grpc/local-drone-control-java/src/main/java/local/drones/DeliveriesQueue.java b/samples/grpc/local-drone-control-java/src/main/java/local/drones/DeliveriesQueue.java index 244e602f4..eb0416995 100644 --- a/samples/grpc/local-drone-control-java/src/main/java/local/drones/DeliveriesQueue.java +++ b/samples/grpc/local-drone-control-java/src/main/java/local/drones/DeliveriesQueue.java @@ -23,6 +23,7 @@ public final class DeliveriesQueue extends DurableStateBehavior { + // #commands public interface Command extends CborSerializable { } @@ -67,9 +68,10 @@ public GetCurrentState(ActorRef replyTo) { } } + // #commands - + // #state public static final class State implements CborSerializable { public final List waitingDeliveries; public final List deliveriesInProgress; @@ -137,6 +139,7 @@ public DeliveryInProgress(String deliveryId, String droneId, Instant pickupTime) this.pickupTime = pickupTime; } } + // #state // Not really an entity, we just have a single instance static public final EntityTypeKey EntityKey = EntityTypeKey.create(Command.class, "RestaurantDeliveries"); @@ -159,6 +162,7 @@ public State emptyState() { return new State(); } + // #commandHandler @Override public CommandHandler commandHandler() { return newCommandHandlerBuilder() @@ -221,4 +225,5 @@ private Effect onGetCurrentState(State state, GetCurrentState command) { var stateToShare = new State(new ArrayList<>(state.waitingDeliveries), new ArrayList<>(state.deliveriesInProgress)); return Effect().reply(command.replyTo, stateToShare); } + // #commandHandler } \ No newline at end of file diff --git a/samples/grpc/local-drone-control-java/src/main/java/local/drones/DeliveryEvents.java b/samples/grpc/local-drone-control-java/src/main/java/local/drones/DeliveryEvents.java index 7eedbdd8e..372d00b33 100644 --- a/samples/grpc/local-drone-control-java/src/main/java/local/drones/DeliveryEvents.java +++ b/samples/grpc/local-drone-control-java/src/main/java/local/drones/DeliveryEvents.java @@ -32,22 +32,22 @@ public static Behavior projectionBehavior( Settings settings) { var projectionName = "delivery-events"; + var eventsBySlicesQuery = + GrpcReadJournal.create( + system, + Arrays.asList(central.deliveries.proto.DeliveryEvents.getDescriptor())); + + // initial consumer topic filter for location id - // FIXME no docs of setting up initial consumer filter, am I missing some API? - // Async setup is a race condition but maybe ok? Does not seem to quite work, all or the wrong events are delivered + // FIXME replace with initial consumer filter once 1.5.0-M4 is out ConsumerFilter.get(system).ref().tell(new ConsumerFilter.UpdateFilter( - // FIXME stream-id duplicated in config - "delivery-events", + eventsBySlicesQuery.streamId(), // location id already is in the format of a topic filter expression Arrays.asList( new ConsumerFilter.ExcludeRegexEntityIds(Collections.singleton(".*")), new ConsumerFilter.IncludeTopics(Collections.singleton(settings.locationId))) )); - var eventsBySlicesQuery = - GrpcReadJournal.create( - system, - Arrays.asList(central.deliveries.proto.DeliveryEvents.getDescriptor())); // single projection handling all slices var sliceRanges = diff --git a/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java b/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java index 624a4e9c0..b682f4987 100644 --- a/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java +++ b/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java @@ -24,8 +24,18 @@ public class Drone extends EventSourcedBehavior { + // #commands + /** + * This interface defines all the commands (messages) that the Drone actor supports. + */ interface Command extends CborSerializable {} + /** + * A command to report the current position (coordinates and altitude) of the drone. + * + * It replies with `Done`, which is sent back to the caller when + * all the events emitted by this command are successfully persisted. + */ public static final class ReportPosition implements Command { public final Position position; public final ActorRef replyTo; @@ -36,6 +46,12 @@ public ReportPosition(Position position, ActorRef replyTo) { } } + /** + * A command to query the current position (coordinates and altitude) of the drone. + * + * It replies with a `StatusReply<Position>`, which is sent back to the caller as a success if the + * coordinates are known. If not an error is sent back. + */ public static final class GetCurrentPosition implements Command { public final ActorRef> replyTo; @@ -44,7 +60,12 @@ public GetCurrentPosition(ActorRef> replyTo) { } } + // #commands + // #events + /** + * This interface defines all the events that the Drone supports. + */ interface Event extends CborSerializable {} public static final class PositionUpdated implements Event { @@ -64,9 +85,9 @@ public CoarseGrainedLocationChanged(CoarseGrainedCoordinates coordinates) { this.coordinates = coordinates; } } + // #events - - + // #state class State implements CborSerializable { Optional currentPosition; final List historicalPositions; @@ -82,6 +103,7 @@ Optional coarseGrainedCoordinates() { } } + // #state public static final EntityTypeKey ENTITY_KEY = EntityTypeKey.create(Command.class, "Drone"); @@ -105,6 +127,7 @@ public State emptyState() { return new State(); } + // #commandHandler @Override public CommandHandler commandHandler() { return newCommandHandlerBuilder().forAnyState() @@ -113,8 +136,10 @@ public CommandHandler commandHandler() { .build(); } + // #commandHandler + // #eventHandler @Override public EventHandler eventHandler() { return newEventHandlerBuilder() @@ -136,7 +161,10 @@ public EventHandler eventHandler() { state) .build(); } + // #eventHandler + + // #commandHandler private Effect onReportPosition(State state, ReportPosition command) { if (state.currentPosition.equals(Optional.of(command.position))) { // already seen @@ -165,6 +193,7 @@ private Effect onGetCurrentPosition(State state, GetCurrentPositio .orElse(Effect().reply(command.replyTo, StatusReply.error("Position of drone is unknown")) ); } + // #commandHandler } diff --git a/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneServiceImpl.java b/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneServiceImpl.java index bbde133d2..21d319ecc 100644 --- a/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneServiceImpl.java +++ b/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneServiceImpl.java @@ -57,6 +57,7 @@ public CompletionStage reportLocation(ReportLocationRequ return convertError(response); } + // #requestNextDelivery @Override public CompletionStage requestNextDelivery(RequestNextDeliveryRequest in) { logger.info("Drone {} requesting next delivery", in.getDroneId()); @@ -86,7 +87,7 @@ public CompletionStage requestNextDelivery(RequestN return convertError(response); } - + // #requestNextDelivery @Override public CompletionStage completeDelivery(CompleteDeliveryRequest in) { logger.info("Delivery {} completed", in.getDeliveryId()); diff --git a/samples/grpc/local-drone-control-java/src/main/java/local/drones/LocalDroneControlServer.java b/samples/grpc/local-drone-control-java/src/main/java/local/drones/LocalDroneControlServer.java index e7038865f..3deb63c5a 100644 --- a/samples/grpc/local-drone-control-java/src/main/java/local/drones/LocalDroneControlServer.java +++ b/samples/grpc/local-drone-control-java/src/main/java/local/drones/LocalDroneControlServer.java @@ -14,6 +14,7 @@ public class LocalDroneControlServer { + // #bind public static void start(String host, int port, ActorSystem system, @@ -46,4 +47,5 @@ public static void start(String host, } }); } + // #bind } diff --git a/samples/grpc/local-drone-control-java/src/main/java/local/drones/Main.java b/samples/grpc/local-drone-control-java/src/main/java/local/drones/Main.java index 0a0dae0d0..4474fd7e1 100644 --- a/samples/grpc/local-drone-control-java/src/main/java/local/drones/Main.java +++ b/samples/grpc/local-drone-control-java/src/main/java/local/drones/Main.java @@ -9,6 +9,7 @@ public class Main { + // #main public static void main(String[] args) { ActorSystem.create(rootBehavior(), "local-drone-control"); } @@ -55,4 +56,5 @@ private static Behavior rootBehavior() { return Behaviors.empty(); }); } + // #main } diff --git a/samples/grpc/local-drone-control-java/src/main/resources/grpc.conf b/samples/grpc/local-drone-control-java/src/main/resources/grpc.conf index ca5a3bcd5..a2b5317ec 100644 --- a/samples/grpc/local-drone-control-java/src/main/resources/grpc.conf +++ b/samples/grpc/local-drone-control-java/src/main/resources/grpc.conf @@ -1,4 +1,6 @@ +// #http2 akka.http.server.enable-http2 = on +// #http2 local-drone-control { # gRPC server for drones to talk to local service diff --git a/samples/grpc/local-drone-control-java/src/main/resources/persistence.conf b/samples/grpc/local-drone-control-java/src/main/resources/persistence.conf index 2604a56a5..4d27f6e45 100644 --- a/samples/grpc/local-drone-control-java/src/main/resources/persistence.conf +++ b/samples/grpc/local-drone-control-java/src/main/resources/persistence.conf @@ -18,8 +18,11 @@ akka { connection-factory { additional-init = ${akka.projection.r2dbc.default-h2-schema} protocol = "file" - # for running from SBT but keeping state across runs + # database name is full path to database + # default for running from SBT but keeping state across runs database = "./target/drone-db" + # inject a path to a persistent volume when running in a container + database = ${?H2_DATABASE_PATH} } } } diff --git a/samples/grpc/local-drone-control-scala/build.sbt b/samples/grpc/local-drone-control-scala/build.sbt index b1710243f..0961430d2 100644 --- a/samples/grpc/local-drone-control-scala/build.sbt +++ b/samples/grpc/local-drone-control-scala/build.sbt @@ -62,7 +62,6 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion, "com.lightbend.akka" %% "akka-persistence-r2dbc" % AkkaPersistenceR2dbcVersion, "com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion % Test, - // FIXME should we use a "real" db? // local single-node lightweight database with h2 "com.h2database" % "h2" % "2.1.210", "io.r2dbc" % "r2dbc-h2" % "1.0.0.RELEASE", diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/grpc.conf b/samples/grpc/local-drone-control-scala/src/main/resources/grpc.conf index ca5a3bcd5..a2b5317ec 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/grpc.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/grpc.conf @@ -1,4 +1,6 @@ +// #http2 akka.http.server.enable-http2 = on +// #http2 local-drone-control { # gRPC server for drones to talk to local service diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/persistence.conf b/samples/grpc/local-drone-control-scala/src/main/resources/persistence.conf index 2604a56a5..4d27f6e45 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/persistence.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/persistence.conf @@ -18,8 +18,11 @@ akka { connection-factory { additional-init = ${akka.projection.r2dbc.default-h2-schema} protocol = "file" - # for running from SBT but keeping state across runs + # database name is full path to database + # default for running from SBT but keeping state across runs database = "./target/drone-db" + # inject a path to a persistent volume when running in a container + database = ${?H2_DATABASE_PATH} } } } diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueue.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueue.scala index d4d4cd987..3d8c8c473 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueue.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueue.scala @@ -12,6 +12,7 @@ import java.time.Instant object DeliveriesQueue { + // #commands sealed trait Command extends CborSerializable final case class AddDelivery( @@ -32,6 +33,9 @@ object DeliveriesQueue { final case class GetCurrentState(replyTo: ActorRef[State]) extends Command + // #commands + + // #state final case class WaitingDelivery( deliveryId: String, from: Coordinates, @@ -45,6 +49,7 @@ object DeliveriesQueue { waitingDeliveries: Vector[WaitingDelivery], deliveriesInProgress: Vector[DeliveryInProgress]) extends CborSerializable + // #state // Not really an entity, we just have one val EntityKey = EntityTypeKey("RestaurantDeliveries") @@ -58,6 +63,7 @@ object DeliveriesQueue { } } + // #commandHandler private def onCommand(context: ActorContext[Command])( state: State, command: Command): Effect[State] = @@ -112,5 +118,6 @@ object DeliveriesQueue { case GetCurrentState(replyTo) => Effect.reply(replyTo)(state) } + // #commandHandler } diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveryEvents.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveryEvents.scala index 27128165e..863a8b040 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveryEvents.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveryEvents.scala @@ -24,21 +24,19 @@ object DeliveryEvents { implicit val timeout: Timeout = settings.askTimeout + val eventsBySlicesQuery = + GrpcReadJournal( + List(central.deliveries.proto.DeliveryEventsProto.javaDescriptor)) + // initial consumer topic filter for location id - // FIXME no docs of setting up initial consumer filter, am I missing some API? - // Async setup is a race condition but maybe ok? Does not seem to quite work, all or the wrong events are delivered + // FIXME replace with initial consumer filter once 1.5.0-M4 is out ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter( - // FIXME stream-id duplicated in config - "delivery-events", + eventsBySlicesQuery.streamId, // location id already is in the format of a topic filter expression Vector( ConsumerFilter.ExcludeRegexEntityIds(Set(".*")), ConsumerFilter.IncludeTopics(Set(settings.locationId)))) - val eventsBySlicesQuery = - GrpcReadJournal( - List(central.deliveries.proto.DeliveryEventsProto.javaDescriptor)) - // single projection handling all slices val sliceRanges = Persistence(system).sliceRanges(1) diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala index ac2afbf47..96d99e72f 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala @@ -14,20 +14,47 @@ import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.persistence.typed.scaladsl.ReplyEffect object Drone { + + // #commands + + /** + * This interface defines all the commands (messages) that the Drone actor supports. + */ sealed trait Command extends CborSerializable + /** + * A command to report the current position (coordinates and altitude) of the drone. + * + * It replies with `Done`, which is sent back to the caller when + * all the events emitted by this command are successfully persisted. + */ final case class ReportPosition(position: Position, replyTo: ActorRef[Done]) extends Command + /** + * A command to query the current position (coordinates and altitude) of the drone. + * + * It replies with a `StatusReply[Position]`, which is sent back to the caller as a success if the + * coordinates are known. If not an error is sent back. + */ final case class GetCurrentPosition(replyTo: ActorRef[StatusReply[Position]]) extends Command + // #commands + + // #events + + /** + * This interface defines all the events that the Drone supports. + */ sealed trait Event extends CborSerializable final case class PositionUpdated(position: Position) extends Event final case class CoarseGrainedLocationChanged( coordinates: CoarseGrainedCoordinates) extends Event + // #events + // #state final case class State( currentPosition: Option[Position], historicalPositions: Vector[Position]) @@ -36,6 +63,7 @@ object Drone { currentPosition.map(p => CoarseGrainedCoordinates.fromCoordinates(p.coordinates)) } + // #state private val emptyState = State(None, Vector.empty) @@ -56,6 +84,7 @@ object Drone { handleCommand, handleEvent) + // #commandHandler private def handleCommand( state: State, command: Command): ReplyEffect[Event, State] = command match { @@ -92,6 +121,9 @@ object Drone { } + // #commandHandler + + // #eventHandler private def handleEvent(state: State, event: Event): State = event match { case PositionUpdated(newPosition) => val newHistoricalPositions = state.currentPosition match { @@ -111,5 +143,6 @@ object Drone { state } + // #eventHandler } diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala index 1d5942653..15610481b 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala @@ -49,6 +49,7 @@ class DroneServiceImpl( convertError(response) } + // #requestNextDelivery override def requestNextDelivery(in: proto.RequestNextDeliveryRequest) : Future[proto.RequestNextDeliveryResponse] = { logger.info("Drone {} requesting next delivery", in.droneId) @@ -71,7 +72,7 @@ class DroneServiceImpl( convertError(response) } - + // #requestNextDelivery override def completeDelivery( in: proto.CompleteDeliveryRequest): Future[Empty] = { logger.info("Delivery {} completed", in.deliveryId) diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/LocalDroneControlServer.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/LocalDroneControlServer.scala index 1ffb68660..fda439b64 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/LocalDroneControlServer.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/LocalDroneControlServer.scala @@ -14,7 +14,7 @@ import scala.util.Failure import scala.util.Success object LocalDroneControlServer { - + // #bind def start( interface: String, port: Int, @@ -51,5 +51,6 @@ object LocalDroneControlServer { system.terminate() } } + // #bind } diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala index 7ae7da5da..7d7421cb6 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala @@ -4,12 +4,11 @@ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ ActorSystem, Behavior } import akka.cluster.typed.Cluster import akka.cluster.typed.Join -import org.slf4j.LoggerFactory object Main { - val logger = LoggerFactory.getLogger("drones.Main") + // #main def main(args: Array[String]): Unit = { ActorSystem[Nothing](rootBehavior(), "local-drone-control") } @@ -55,4 +54,6 @@ object Main { Behaviors.empty } + // #main + } diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/create_tables.sql b/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/create_tables.sql index 94c2e71cc..470136fd3 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/create_tables.sql +++ b/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/create_tables.sql @@ -39,6 +39,7 @@ CREATE TABLE IF NOT EXISTS snapshot( PRIMARY KEY(persistence_id) ); +-- // #queryableColumn CREATE TABLE IF NOT EXISTS durable_state ( slice INT NOT NULL, entity_type VARCHAR(255) NOT NULL, @@ -56,6 +57,7 @@ CREATE TABLE IF NOT EXISTS durable_state ( PRIMARY KEY(persistence_id, revision) ); +-- // #queryableColumn -- to query drones by location CREATE INDEX IF NOT EXISTS durable_state_drone_location_idx ON durable_state(location); diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java index 9eca73d93..72415d8c9 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java @@ -30,6 +30,7 @@ public static void start( Function> deliveryEventsProducerService, Function> pushedDroneEventsHandler) { + // #composeAndBind @SuppressWarnings("unchecked") var service = ServiceHandler.concatOrNotFound( @@ -44,6 +45,8 @@ public static void start( pushedDroneEventsHandler); var bound = Http.get(system).newServerAt(host, port).bind(service); + // #composeAndBind + bound.whenComplete( (binding, error) -> { if (error == null) { diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveries.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveries.java index b81aac8a8..5fcdd2c24 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveries.java +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveries.java @@ -24,6 +24,8 @@ public final class RestaurantDeliveries extends EventSourcedBehavior< RestaurantDeliveries.Command, RestaurantDeliveries.Event, RestaurantDeliveries.State> { + + // #commands public interface Command extends CborSerializable {} public static final class SetUpRestaurant implements Command { @@ -62,7 +64,9 @@ public ListCurrentDeliveries(ActorRef> replyTo) { this.replyTo = replyTo; } } + // #commands + // #events public interface Event extends CborSerializable {} public static final class RestaurantLocationSet implements Event { @@ -84,21 +88,6 @@ public DeliveryRegistered(Delivery delivery) { } } - public static final class State implements CborSerializable { - public final String localControlLocationId; - public final Coordinates restaurantLocation; - public final List currentDeliveries; - - public State( - String localControlLocationId, - Coordinates restaurantLocation, - List currentDeliveries) { - this.localControlLocationId = localControlLocationId; - this.restaurantLocation = restaurantLocation; - this.currentDeliveries = currentDeliveries; - } - } - public static final class Delivery { public final String deliveryId; // The following two fields always the same for the same restaurant, so that they can be seen in @@ -121,6 +110,24 @@ public Delivery( this.timestamp = timestamp; } } + // #events + + // #state + public static final class State implements CborSerializable { + public final String localControlLocationId; + public final Coordinates restaurantLocation; + public final List currentDeliveries; + + public State( + String localControlLocationId, + Coordinates restaurantLocation, + List currentDeliveries) { + this.localControlLocationId = localControlLocationId; + this.restaurantLocation = restaurantLocation; + this.currentDeliveries = currentDeliveries; + } + } + // #state public static final EntityTypeKey ENTITY_KEY = EntityTypeKey.create(Command.class, "RestaurantDeliveries"); @@ -153,6 +160,7 @@ public State emptyState() { return null; } + // #commandHandler @Override public CommandHandler commandHandler() { var noStateHandler = @@ -226,6 +234,7 @@ private Effect onSetUpRestaurant(SetUpRestaurant command) { new RestaurantLocationSet(command.localControlLocationId, command.restaurantLocation)) .thenReply(command.replyTo, updatedState -> StatusReply.ack()); } + // #commandHandler @Override public EventHandler eventHandler() { diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java index 17c554edf..dd2b87cf6 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java @@ -21,6 +21,7 @@ public final class Drone extends DurableStateBehavior { + // #commands interface Command extends CborSerializable {} public static final class UpdateLocation implements Command { @@ -46,7 +47,9 @@ public GetState(ActorRef replyTo) { this.replyTo = replyTo; } } + // #commands + // #state public static final class State implements CborSerializable { public String locationName; public Optional currentLocation; @@ -61,6 +64,7 @@ public State( this.lastChange = lastChange; } } + // #state public static final EntityTypeKey ENTITY_KEY = EntityTypeKey.create(Command.class, "CentralDrone"); @@ -81,11 +85,14 @@ private Drone(ActorContext context, String entityId) { this.context = context; } + // #emptyState @Override public State emptyState() { return new State("unknown", Optional.empty(), Instant.EPOCH); } + // #emptyState + // #commandHandler @Override public CommandHandler commandHandler() { return newCommandHandlerBuilder() @@ -114,4 +121,5 @@ private Effect onUpdateLocation(State state, UpdateLocation command) { state.lastChange = Instant.now(); return Effect().persist(state).thenReply(command.replyTo, updatedState -> StatusReply.ack()); } + // #commandHandler } diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/DroneOverviewServiceImpl.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/DroneOverviewServiceImpl.java index e03d34bb2..2e5a9bd97 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/DroneOverviewServiceImpl.java +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/DroneOverviewServiceImpl.java @@ -45,6 +45,7 @@ public DroneOverviewServiceImpl(ActorSystem system) { @Override public CompletionStage getDroneOverview(GetDroneOverviewRequest in) { + // FIXME missing impl return null; } diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java index 554e0496e..e1d9f94cc 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java @@ -33,6 +33,7 @@ public final class LocalDroneEvents { private static final Logger logger = LoggerFactory.getLogger(LocalDroneEvents.class); + // #eventConsumer // Note: stream id used in producer for the drone events public static final String DRONE_EVENT_STREAM_ID = "drone-events"; @@ -56,7 +57,9 @@ public static Function> pushedEventsG return EventProducerPushDestination.grpcServiceHandler(destination, system); } + // #eventConsumer + // #eventProjection private static class LocationHandler extends Handler> { private final ClusterSharding sharding; @@ -143,4 +146,5 @@ private static Projection> projection( return R2dbcProjection.atLeastOnceAsync( projectionId, Optional.empty(), sourceProvider, () -> new LocationHandler(system), system); } + // #eventProjection } diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf index 163bd253a..b04ca492f 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf @@ -17,11 +17,13 @@ akka { password = ${?DB_PASSWORD} } - state = { + // #locationColumn + state { additional-columns { "CentralDrone" = ["central.drones.LocationColumn"] } } + // #locationColumn # FIXME disabled until we have fixed handling of FilteredEvents for publishing in r2dbc journal.publish-events = off diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/README.md b/samples/grpc/restaurant-drone-deliveries-service-scala/README.md index d00da5e86..01d439ea8 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/README.md +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/README.md @@ -5,7 +5,7 @@ The sample show-cases a service for drones doing restaurant deliveries. It is intended to be used together with the local-drone-control sample. * Keeps track of a coarse grained location of each drone to the cloud -* FIXME Accepts restaurant delivery requests which are then fed to the right local drone control +* Accepts restaurant delivery requests which are then fed to the right local drone control ## Running the sample code diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/ddl-scripts/create_tables.sql b/samples/grpc/restaurant-drone-deliveries-service-scala/ddl-scripts/create_tables.sql index 94c2e71cc..470136fd3 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/ddl-scripts/create_tables.sql +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/ddl-scripts/create_tables.sql @@ -39,6 +39,7 @@ CREATE TABLE IF NOT EXISTS snapshot( PRIMARY KEY(persistence_id) ); +-- // #queryableColumn CREATE TABLE IF NOT EXISTS durable_state ( slice INT NOT NULL, entity_type VARCHAR(255) NOT NULL, @@ -56,6 +57,7 @@ CREATE TABLE IF NOT EXISTS durable_state ( PRIMARY KEY(persistence_id, revision) ); +-- // #queryableColumn -- to query drones by location CREATE INDEX IF NOT EXISTS durable_state_drone_location_idx ON durable_state(location); diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/persistence.conf b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/persistence.conf index 142e60516..f539ced3e 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/persistence.conf +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/persistence.conf @@ -17,11 +17,13 @@ akka { password = ${?DB_PASSWORD} } - state = { + // #locationColumn + state { additional-columns { "CentralDrone" = ["central.drones.LocationColumn"] } } + // #locationColumn # only time stamp based offsets used offset-store.offset-table="" diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala index d40796381..7abb01677 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala @@ -31,6 +31,7 @@ object DroneDeliveriesServer { Future[HttpResponse]])(implicit system: ActorSystem[_]): Unit = { import system.executionContext + // #composeAndBind val service = ServiceHandler.concatOrNotFound( DroneOverviewServiceHandler.partial(droneOverviewService), RestaurantDeliveriesServiceHandler.partial(restaurantDeliveriesService), @@ -41,6 +42,7 @@ object DroneDeliveriesServer { pushedDroneEventsHandler) val bound = Http(system).newServerAt(interface, port).bind(service) + // #composeAndBind bound.foreach(binding => logger.info( "Drone event consumer listening at: {}:{}", diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/DeliveryEvents.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/DeliveryEvents.scala index 72da01e90..560acf447 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/DeliveryEvents.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/DeliveryEvents.scala @@ -1,6 +1,3 @@ -/* - * Copyright (C) 2009-2023 Lightbend Inc. - */ package central.deliveries import akka.actor.typed.ActorSystem diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveries.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveries.scala index bad4fc93a..54f4be2f6 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveries.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveries.scala @@ -21,6 +21,7 @@ import java.time.Instant */ object RestaurantDeliveries { + // #commands sealed trait Command extends CborSerializable final case class RegisterDelivery( @@ -37,7 +38,9 @@ object RestaurantDeliveries { final case class ListCurrentDeliveries(replyTo: ActorRef[Seq[Delivery]]) extends Command + // #commands + // #events sealed trait Event extends CborSerializable final case class DeliveryRegistered(delivery: Delivery) extends Event @@ -46,18 +49,22 @@ object RestaurantDeliveries { coordinates: Coordinates) extends Event + final case class Delivery( + deliveryId: String, + // FIXME next two fields always the same for the same restaurant, annoying, + // but how else would we see them in downstream projection? + localControlLocationId: String, + origin: Coordinates, + destination: Coordinates, + timestamp: Instant) + // #events + + // #state private final case class State( localControlLocationId: String, restaurantLocation: Coordinates, currentDeliveries: Vector[Delivery]) - final case class Delivery( - deliveryId: String, - // FIXME next two fields always the same for the same restaurant, annoying, - // but how else would we see them in downstream projection? - localControlLocationId: String, - origin: Coordinates, - destination: Coordinates, - timestamp: Instant) + // #state val EntityKey = EntityTypeKey[Command]("RestaurantDeliveries") @@ -79,6 +86,7 @@ object RestaurantDeliveries { case _ => Set.empty } + // #commandHandler private def onCommand( state: Option[State], command: Command): Effect[Event, Option[State]] = @@ -132,6 +140,7 @@ object RestaurantDeliveries { StatusReply.Error("Changing restaurant location not supported")) } } + // #commandHandler private def onEvent(state: Option[State], event: Event): Option[State] = (state, event) match { diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Drone.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Drone.scala index 2d143dcb4..5e6bab37b 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Drone.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Drone.scala @@ -20,6 +20,7 @@ import java.time.Instant */ object Drone { + // #commands sealed trait Command final case class UpdateLocation( @@ -29,16 +30,21 @@ object Drone { extends Command final case class GetState(replyTo: ActorRef[State]) extends Command + // #commands val EntityKey = EntityTypeKey[Command]("CentralDrone") + // #state final case class State( locationName: String, currentLocation: Option[CoarseGrainedCoordinates], lastChange: Instant) extends CborSerializable + // #state + // #emptyState private val emptyState = State("unknown", None, Instant.EPOCH) + // #emptyState def init(system: ActorSystem[_]): Unit = { ClusterSharding(system).init(Entity(EntityKey)(entityContext => @@ -54,6 +60,7 @@ object Drone { } } + // #commandHandler private def onCommand(context: ActorContext[Command])( state: State, command: Command): Effect[State] = @@ -73,9 +80,11 @@ object Drone { case GetState(replyTo) => Effect.reply(replyTo)(state) } + // #commandHandler } +// #locationColumn /** * Write local drone control location name column for querying drone locations per control location */ @@ -88,3 +97,4 @@ final class LocationColumn extends AdditionalColumn[Drone.State, String] { AdditionalColumn.BindValue(upsert.value.locationName) } +// #locationColumn \ No newline at end of file diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala index 1649a8e52..2cb62fefc 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala @@ -27,6 +27,7 @@ import scala.jdk.DurationConverters.JavaDurationOps */ object LocalDroneEvents { + // #eventConsumer val DroneEventStreamId = "drone-events" // FIXME The type key on the producer side. Make sure we have documented it. @@ -48,7 +49,10 @@ object LocalDroneEvents { .grpcServiceHandler(destination)(system) .asInstanceOf[PartialFunction[HttpRequest, Future[HttpResponse]]] } + // #eventConsumer + + // #eventProjection def initPushedEventsConsumer(implicit system: ActorSystem[_]): Unit = { implicit val askTimeout: Timeout = system.settings.config @@ -130,7 +134,6 @@ object LocalDroneEvents { numberOfInstances = sliceRanges.size, behaviorFactory = i => ProjectionBehavior(projection(sliceRanges(i))), stopMessage = ProjectionBehavior.Stop) - } - + // #eventProjection }