Skip to content

Commit

Permalink
sample: Expand IoT sample with consumer of temperature events (#973)
Browse files Browse the repository at this point in the history
* using producer push from edge
  • Loading branch information
patriknw authored Aug 30, 2023
1 parent bd40f50 commit fdb2742
Show file tree
Hide file tree
Showing 17 changed files with 519 additions and 127 deletions.
12 changes: 9 additions & 3 deletions samples/grpc/iot-service-scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@
curl http://localhost:9101/ready
```

6. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl).
6. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl). Register a sensor:

```shell
# register
grpcurl -d '{"sensor_id":"temp-1", "secret":"foo"}' -plaintext 127.0.0.1:8101 iot.registration.RegistrationService.Register
```

Expand All @@ -46,5 +45,12 @@
7. Consume events from edge:

```shell
sbt "Test/runMain iot.registration.EdgeApp"
sbt "Test/runMain iot.EdgeApp"
```

8. After registration of a sensor the EdgeApp will simulate temperature readings, which will be consumed by the iot-service. Read current temperature, which should be updated with a random value:

9. ```shell
grpcurl -d '{"sensor_id":"temp-1"}' -plaintext 127.0.0.1:8101 iot.temperature.SensorTwinService.GetTemperature
```

5 changes: 3 additions & 2 deletions samples/grpc/iot-service-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")
scalaVersion := "2.13.11"

Compile / scalacOptions ++= Seq(
"-target:11",
"-release:11",
"-deprecation",
"-feature",
"-unchecked",
Expand All @@ -31,7 +31,7 @@ val AkkaHttpVersion = "10.5.2"
val AkkaManagementVersion = "1.4.1"
val AkkaPersistenceR2dbcVersion = "1.2.0-M3"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.5.0-M2")
sys.props.getOrElse("akka-projection.version", "1.5.0-M3")
val AkkaDiagnosticsVersion = "2.0.1"

enablePlugins(AkkaGrpcPlugin)
Expand Down Expand Up @@ -67,6 +67,7 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
"com.lightbend.akka" %% "akka-persistence-r2dbc" % AkkaPersistenceR2dbcVersion,
"io.r2dbc" % "r2dbc-h2" % "1.0.0.RELEASE" % Test,
"com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion % Test,
// 3. Querying and publishing data from Akka Persistence
"com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "iot.temperature.proto";

package iot.temperature;

// gRPC definition for SensorTwinService

service SensorTwinService {
rpc GetTemperature (GetTemperatureRequest) returns (CurrentTemperature) {}
}

message GetTemperatureRequest {
string sensor_id = 1;
}

message CurrentTemperature {
string sensor_id = 1;
int32 temperature = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "iot.temperature.proto";

package iot.temperature;

// Events consumed from external services

message TemperatureRead {
string sensor_id = 1;
int32 temperature = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ akka.projection.grpc {

iot-service {
ask-timeout = 5 s
temperature.projections-slice-count = 4
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ akka {
snapshot-store {
plugin = "akka.persistence.r2dbc.snapshot"
}
state {
plugin = "akka.persistence.r2dbc.state"
}

r2dbc {
connection-factory = ${akka.persistence.r2dbc.postgres}
connection-factory {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
akka.actor.serialization-bindings {
"iot.CborSerializable" = jackson-cbor
"scalapb.GeneratedMessage" = proto
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package iot.registration
package iot

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand All @@ -13,25 +13,36 @@ import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse

object RegistrationServer {
object IotServer {

def start(
interface: String,
port: Int,
system: ActorSystem[_],
grpcService: proto.RegistrationService,
eventProducerService: PartialFunction[HttpRequest, Future[HttpResponse]])
: Unit = {
registrationService: registration.proto.RegistrationService,
registrationEventProducerService: PartialFunction[
HttpRequest,
Future[HttpResponse]],
pushedTemperatureEventsHandler: PartialFunction[
HttpRequest,
Future[HttpResponse]],
sensorTwinService: temperature.proto.SensorTwinService): Unit = {
implicit val sys: ActorSystem[_] = system
implicit val ec: ExecutionContext =
system.executionContext

val service: HttpRequest => Future[HttpResponse] =
ServiceHandler.concatOrNotFound(
eventProducerService,
proto.RegistrationServiceHandler.partial(grpcService),
registrationEventProducerService,
registration.proto.RegistrationServiceHandler
.partial(registrationService),
pushedTemperatureEventsHandler,
temperature.proto.SensorTwinServiceHandler.partial(sensorTwinService),
// ServerReflection enabled to support grpcurl without import-path and proto parameters
ServerReflection.partial(List(proto.RegistrationService)))
ServerReflection.partial(
List(
registration.proto.RegistrationService,
temperature.proto.SensorTwinService)))

val bound =
Http()
Expand All @@ -43,7 +54,7 @@ object RegistrationServer {
case Success(binding) =>
val address = binding.localAddress
system.log.info(
"Registration online at gRPC server {}:{}",
"IoT Service online at gRPC server {}:{}",
address.getHostString,
address.getPort)
case Failure(ex) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package iot.registration
package iot

import scala.util.control.NonFatal

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.management.scaladsl.AkkaManagement
import iot.registration.RegistrationEvents
import iot.registration.Registration
import iot.registration.RegistrationServiceImpl
import iot.temperature.SensorTwin
import iot.temperature.SensorTwinServiceImpl
import iot.temperature.TemperatureEvents
import org.slf4j.LoggerFactory

object Main {

val logger = LoggerFactory.getLogger("iot.registration.Main")
val logger = LoggerFactory.getLogger("iot.Main")

def main(args: Array[String]): Unit = {
val system =
Expand All @@ -29,20 +35,29 @@ object Main {
ClusterBootstrap(system).start()

Registration.init(system)
val registrationEventProducerService =
RegistrationEvents.eventProducerService(system)

val eventProducerService = PublishEvents.eventProducerService(system)
SensorTwin.init(system)
TemperatureEvents.initPushedEventsConsumer(system)
val pushedTemperatureEventsHandler =
TemperatureEvents.pushedEventsGrpcHandler(system)

val grpcInterface =
system.settings.config.getString("iot-service.grpc.interface")
val grpcPort =
system.settings.config.getInt("iot-service.grpc.port")
val grpcService = new RegistrationServiceImpl(system)
RegistrationServer.start(
val registrationService = new RegistrationServiceImpl(system)
val sensorTwinService = new SensorTwinServiceImpl(system)

IotServer.start(
grpcInterface,
grpcPort,
system,
grpcService,
eventProducerService)
registrationService,
registrationEventProducerService,
pushedTemperatureEventsHandler,
sensorTwinService)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.producer.scaladsl.EventProducer
import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation

object PublishEvents {
object RegistrationEvents {

def eventProducerService(system: ActorSystem[_])
: PartialFunction[HttpRequest, Future[HttpResponse]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class RegistrationServiceImpl(system: ActorSystem[_])
val entityRef = sharding.entityRefFor(Registration.EntityKey, in.sensorId)
val reply: Future[Done] =
entityRef.askWithStatus(
Registration
.Register(Registration.SecretDataValue(in.secret), _))
Registration.Register(Registration.SecretDataValue(in.secret), _))
val response = reply.map(_ => Empty.defaultInstance)
convertError(response)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package iot.temperature

import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.pattern.StatusReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.state.scaladsl.DurableStateBehavior
import akka.persistence.typed.state.scaladsl.Effect
import iot.CborSerializable

object SensorTwin {
val EntityKey = EntityTypeKey[Command]("SensorTwin")

sealed trait Command extends CborSerializable

final case class State(temperature: Int) extends CborSerializable

final case class UpdateTemperature(
temperature: Int,
replyTo: ActorRef[StatusReply[Done]])
extends Command

final case class GetTemperature(replyTo: ActorRef[StatusReply[Int]])
extends Command

def init(system: ActorSystem[_]): Unit = {
ClusterSharding(system).init(Entity(EntityKey)(entityContext =>
SensorTwin(entityContext.entityId)))
}

def apply(sensorId: String): Behavior[Command] =
DurableStateBehavior[Command, State](
PersistenceId(EntityKey.name, sensorId),
State(0),
(state, cmd) =>
cmd match {
case UpdateTemperature(temperature, replyTo) =>
Effect
.persist(State(temperature))
.thenReply(replyTo)(_ => StatusReply.Ack)

case GetTemperature(replyTo) =>
Effect.reply(replyTo)(StatusReply.success(state.temperature))
})

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package iot.temperature

import java.util.concurrent.TimeoutException

import scala.concurrent.Future

import akka.actor.typed.ActorSystem
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.grpc.GrpcServiceException
import akka.util.Timeout
import io.grpc.Status
import iot.temperature.proto.SensorTwinService

class SensorTwinServiceImpl(system: ActorSystem[_]) extends SensorTwinService {

import system.executionContext

implicit private val timeout: Timeout =
Timeout.create(
system.settings.config.getDuration("iot-service.ask-timeout"))

private val sharding = ClusterSharding(system)

override def getTemperature(
in: proto.GetTemperatureRequest): Future[proto.CurrentTemperature] = {
val entityRef = sharding.entityRefFor(SensorTwin.EntityKey, in.sensorId)
val reply: Future[Int] =
entityRef.askWithStatus(SensorTwin.GetTemperature(_))
val response =
reply.map(temperature =>
proto.CurrentTemperature(in.sensorId, temperature))
convertError(response)
}

private def convertError[T](response: Future[T]): Future[T] = {
response.recoverWith {
case _: TimeoutException =>
Future.failed(
new GrpcServiceException(
Status.UNAVAILABLE.withDescription("Operation timed out")))
case exc =>
Future.failed(
new GrpcServiceException(
Status.INVALID_ARGUMENT.withDescription(exc.getMessage)))
}
}
}
Loading

0 comments on commit fdb2742

Please sign in to comment.