Skip to content

Commit

Permalink
chore: remove sensorId (#985)
Browse files Browse the repository at this point in the history
* Learned from domain expert that sensor id (DevEUI) should not be used as entityId.
* To not convolute the sample we just skip the sensor id.
  • Loading branch information
patriknw authored Sep 4, 2023
1 parent e996466 commit a3bed24
Show file tree
Hide file tree
Showing 12 changed files with 35 additions and 37 deletions.
4 changes: 2 additions & 2 deletions samples/grpc/iot-service-scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
6. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl). Register a sensor:

```shell
grpcurl -d '{"sensor_id":"temp-1", "secret":"foo"}' -plaintext 127.0.0.1:8101 iot.registration.RegistrationService.Register
grpcurl -d '{"sensor_entity_id":"temp-1", "secret":"foo"}' -plaintext 127.0.0.1:8101 iot.registration.RegistrationService.Register
```

or same `grpcurl` commands to port 8102 to reach node 2.
Expand All @@ -51,6 +51,6 @@
8. After registration of a sensor the EdgeApp will simulate temperature readings, which will be consumed by the iot-service. Read current temperature, which should be updated with a random value:

9. ```shell
grpcurl -d '{"sensor_id":"temp-1"}' -plaintext 127.0.0.1:8101 iot.temperature.SensorTwinService.GetTemperature
grpcurl -d '{"sensor_entity_id":"temp-1"}' -plaintext 127.0.0.1:8101 iot.temperature.SensorTwinService.GetTemperature
```

Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ package iot.registration;
// Events published to external services

message Registered {
string sensor_id = 1;
SecretDataValue secret = 2;
SecretDataValue secret = 1;
}

message SecretDataValue {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ service RegistrationService {


message RegisterRequest {
string sensor_id = 1;
string sensor_entity_id = 1;
string secret = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ service SensorTwinService {
}

message GetTemperatureRequest {
string sensor_id = 1;
string sensor_entity_id = 1;
}

message CurrentTemperature {
string sensor_id = 1;
string sensor_entity_id = 1;
int32 temperature = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ package iot.temperature;
// Events consumed from external services

message TemperatureRead {
string sensor_id = 1;
int32 temperature = 2;
int32 temperature = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ object Registration {
Registration(entityContext.entityId)))
}

def apply(sensorId: String): Behavior[Command] = {
def apply(entityId: String): Behavior[Command] = {
EventSourcedBehavior
.withEnforcedReplies[Command, Event, State](
persistenceId = PersistenceId(EntityKey.name, sensorId),
persistenceId = PersistenceId(EntityKey.name, entityId),
emptyState = State.empty,
commandHandler =
(state, command) => handleCommand(state, command),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ object RegistrationEvents {
private def transformRegistered(
envelope: EventEnvelope[Registration.Registered]): proto.Registered = {
val event = envelope.event
proto.Registered(
sensorId = PersistenceId.extractEntityId(envelope.persistenceId),
secret = Some(proto.SecretDataValue(event.secret.value)))
proto.Registered(secret = Some(proto.SecretDataValue(event.secret.value)))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class RegistrationServiceImpl(system: ActorSystem[_])
private val sharding = ClusterSharding(system)

override def register(in: proto.RegisterRequest): Future[Empty] = {
logger.info("register sensor {}", in.sensorId)
val entityRef = sharding.entityRefFor(Registration.EntityKey, in.sensorId)
logger.info("register sensor {}", in.sensorEntityId)
val entityRef = sharding.entityRefFor(Registration.EntityKey, in.sensorEntityId)
val reply: Future[Done] =
entityRef.askWithStatus(
Registration.Register(Registration.SecretDataValue(in.secret), _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ object SensorTwin {
SensorTwin(entityContext.entityId)))
}

def apply(sensorId: String): Behavior[Command] =
def apply(entityId: String): Behavior[Command] =
DurableStateBehavior[Command, State](
PersistenceId(EntityKey.name, sensorId),
PersistenceId(EntityKey.name, entityId),
State(0),
(state, cmd) =>
cmd match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ class SensorTwinServiceImpl(system: ActorSystem[_]) extends SensorTwinService {

override def getTemperature(
in: proto.GetTemperatureRequest): Future[proto.CurrentTemperature] = {
val entityRef = sharding.entityRefFor(SensorTwin.EntityKey, in.sensorId)
val entityRef = sharding.entityRefFor(SensorTwin.EntityKey, in.sensorEntityId)
val reply: Future[Int] =
entityRef.askWithStatus(SensorTwin.GetTemperature(_))
val response =
reply.map(temperature =>
proto.CurrentTemperature(in.sensorId, temperature))
proto.CurrentTemperature(in.sensorEntityId, temperature))
convertError(response)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import akka.http.scaladsl.model.HttpResponse
import akka.persistence.query.Offset
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import akka.persistence.typed.PersistenceId
import akka.projection.Projection
import akka.projection.ProjectionBehavior
import akka.projection.ProjectionId
Expand Down Expand Up @@ -76,9 +77,11 @@ object TemperatureEvents {
envelope.eventOption)

envelope.event match {
case proto.TemperatureRead(sensorId, temperature, _) =>
case proto.TemperatureRead(temperature, _) =>
val entityId =
PersistenceId.extractEntityId(envelope.persistenceId)
val entityRef =
sharding.entityRefFor(SensorTwin.EntityKey, sensorId)
sharding.entityRefFor(SensorTwin.EntityKey, entityId)
entityRef.askWithStatus(
SensorTwin.UpdateTemperature(temperature, _))
case unknown =>
Expand All @@ -95,8 +98,8 @@ object TemperatureEvents {
}

// Split the slices into N ranges
val numberOfSliceRanges: Int = system.settings.config.getInt(
"iot-service.temperature.projections-slice-count")
val numberOfSliceRanges: Int = system.settings.config
.getInt("iot-service.temperature.projections-slice-count")
val sliceRanges = EventSourcedProvider.sliceRanges(
system,
R2dbcReadJournal.Identifier,
Expand Down
25 changes: 12 additions & 13 deletions samples/grpc/iot-service-scala/src/test/scala/iot/EdgeApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ object EdgeApp {

override def process(envelope: EventEnvelope[AnyRef]): Future[Done] = {
envelope.event match {
case registration.proto.Registered(sensorId, secret, _) =>
case registration.proto.Registered(secret, _) =>
val entityId = PersistenceId.extractEntityId(envelope.persistenceId)
log.info(
"Consumed registered sensor {} in projection {}",
sensorId,
entityId,
projectionId.id)
spawner ! Root.StartSensorSimulator(sensorId)
spawner ! Root.StartSensorSimulator(entityId)
Future.successful(Done)
case other =>
throw new IllegalArgumentException(
Expand All @@ -61,7 +62,7 @@ object EdgeApp {

object Root {
sealed trait Command
final case class StartSensorSimulator(sensorId: String) extends Command
final case class StartSensorSimulator(entityId: String) extends Command

def apply(): Behavior[Command] =
Behaviors.setup[Command] { context =>
Expand Down Expand Up @@ -108,10 +109,8 @@ object EdgeApp {
SensorSimulator.TemperatureRead,
temperature.proto.TemperatureRead] { envelope =>
val event = envelope.event
val sensorId =
PersistenceId.extractEntityId(envelope.persistenceId)
Future.successful(Some(
temperature.proto.TemperatureRead(sensorId, event.temperature)))
temperature.proto.TemperatureRead(event.temperature)))
}

val eventProducer = EventProducerPush[SensorSimulator.Event](
Expand Down Expand Up @@ -144,9 +143,9 @@ object EdgeApp {
initRegistrationProjection()
initTemperaturePush()

Behaviors.receiveMessage { case StartSensorSimulator(sensorId) =>
if (context.child(sensorId).isEmpty)
context.spawn(SensorSimulator(sensorId), sensorId)
Behaviors.receiveMessage { case StartSensorSimulator(entityId) =>
if (context.child(entityId).isEmpty)
context.spawn(SensorSimulator(entityId), entityId)
Behaviors.same
}
}
Expand All @@ -165,14 +164,14 @@ object EdgeApp {

final case class TemperatureRead(temperature: Int) extends Event

def apply(sensorId: String): Behavior[Command] = {
def apply(entityId: String): Behavior[Command] = {
Behaviors.setup { context =>
context.log.info("Starting sensor simulation [{}]", sensorId)
context.log.info("Starting sensor simulation [{}]", entityId)
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(Tick, 3.seconds)

EventSourcedBehavior[Command, Event, State.type](
PersistenceId(EntityTypeName, sensorId),
PersistenceId(EntityTypeName, entityId),
State,
(_, cmd) =>
cmd match {
Expand Down

0 comments on commit a3bed24

Please sign in to comment.