Skip to content

Commit

Permalink
Replaced MessagesModel.registerHandler with a stream (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
Levi-Lesches authored Sep 26, 2024
1 parent c4ba0ee commit 1957e52
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 116 deletions.
3 changes: 3 additions & 0 deletions lib/src/data/metrics/vitals.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class VitalsMetrics extends Metrics {
}
}

/// Publicly exposes [notifyListeners].
void notify() => notifyListeners();

@override
List<MetricLine> get allMetrics => [
MetricLine("Voltage: ${drive.batteryVoltage.toStringAsFixed(2)} V", severity: voltageSeverity),
Expand Down
9 changes: 0 additions & 9 deletions lib/src/data/protobuf.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,6 @@ export "package:protobuf/protobuf.dart" show GeneratedMessageGenericExtensions;
/// The `.fromBuffer` constructor is a type of [MessageDecoder].
typedef MessageDecoder<T extends Message> = T Function(List<int> data);

/// A callback to execute with a specific serialized Protobuf message.
typedef MessageHandler<T extends Message> = void Function(T);

/// A callback to handle any [WrappedMessage].
typedef WrappedMessageHandler = void Function(WrappedMessage);

/// A callback to execute with raw Protobuf data.
typedef RawDataHandler = void Function(List<int> data);

/// Gets the name of the command message for the given device.
String getCommandName(Device device) => switch (device) {
Device.ARM => "ArmCommand",
Expand Down
6 changes: 5 additions & 1 deletion lib/src/models/data/logs.dart
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ class LogsModel extends Model {

@override
Future<void> init() async {
models.messages.registerHandler<BurtLog>(name: BurtLog().messageName, decoder: BurtLog.fromBuffer, handler: handleLog);
models.messages.stream.onMessage<BurtLog>(
name: BurtLog().messageName,
constructor: BurtLog.fromBuffer,
callback: handleLog,
);
saveToFileTimer = Timer.periodic(saveToFileInterval, saveToFile);
}

Expand Down
79 changes: 24 additions & 55 deletions lib/src/models/data/messages.dart
Original file line number Diff line number Diff line change
@@ -1,34 +1,23 @@
import "package:protobuf/protobuf.dart";
import "dart:async";

import "package:rover_dashboard/data.dart";
import "package:rover_dashboard/models.dart";

/// A mixin that delegates [WrappedMessage]s to a handler via [registerHandler].
///
/// - Use [registerHandler] to invoke your handler whenever a new message is received.
/// - Use [removeHandler] to remove your handler.
/// - Override [allowedFallthrough] to allow certain massages to pass unhandled.
class MessagesModel {
/// A set of message types that are allowed to pass through without being handled.
static const Set<String> allowedFallthrough = {"AutonomyData", "Disconnect"};

/// A set of handlers to be called based on [WrappedMessage.name].
final Map<String, RawDataHandler> _handlers = {};

/// Delegates the message contents to the appropriate handler.
void onMessage(WrappedMessage wrapper) {
final rawHandler = _handlers[wrapper.name];
if (rawHandler == null) {
if (allowedFallthrough.contains(wrapper.name)) return;
throw StateError("No handler registered for ${wrapper.name} message");
}
try {
return rawHandler(wrapper.data);
} on InvalidProtocolBufferException {
// Data is corrupt, ignore it
}
}
export "package:burt_network/burt_network.dart" show WrappedMessageStream;

/// A single model to consolidate all messages.
///
/// Messages can arrive from serial devices or UDP devices, and any device can be unexpectedly
/// disconnected at any time. To simplify the logic of subscribing for new messages, this model
/// holds a [stream] of [WrappedMessage]s that anyone can subscribe to. When a message arrives,
/// simply call [addMessage] to ensure it will be added to the stream.
///
/// Note that having this model forward [stream] to the serial and UDP streams would *not* work,
/// as those streams can be closed when devices are disconnected, and new streams are created when
/// devices are connected for the first time. In that case, anyone who subscribes to the stream
/// before a device is connected (eg, in [Model.init]) won't get messages received afterwards. To
/// get around this issue, this model uses the same [StreamController] the entire time.
class MessagesModel extends Model {
/// Sends a command over the network or over Serial.
void sendMessage(Message message, {bool checkVersion = true}) {
final shouldCheck = checkVersion && models.settings.dashboard.versionChecking;
Expand All @@ -42,34 +31,14 @@ class MessagesModel {
models.sockets.data.sendMessage(message);
}

/// Adds a handler for the given message type.
///
/// When a new message is received, [onMessage] checks to see if its type matches [name].
/// If so, it calls [decoder] to parse the binary data into a Protobuf class, and then
/// calls [handler] with that parsed data class.
///
/// For example, with a message called `ScienceData`, you would use this function as:
/// ```dart
/// registerHandler<ScienceData>(
/// name: ScienceData().messageName, // identify the data as a ScienceData message
/// decoder: ScienceData.fromBuffer, // deserialize into a ScienceData instance
/// handler: (ScienceData data) => print(data.methane), // do something with the data
/// );
/// ```
void registerHandler<T extends Message>({
required String name,
required MessageDecoder<T> decoder,
required MessageHandler<T> handler,
}) {
if (_handlers.containsKey(name)) { // handler was already registered
throw ArgumentError("There's already a message handler for $name.");
} else {
_handlers[name] = (data) => handler(decoder(data));
}
}
final _controller = StreamController<WrappedMessage>.broadcast();

/// The stream of messages. Use [WrappedMessageStream.onMessage] to subscribe to messages.
Stream<WrappedMessage> get stream => _controller.stream;

/// Adds a message to the [stream].
void addMessage(WrappedMessage message) => _controller.add(message);

/// Removes the handler for a given message type.
///
/// This is useful if you register a handler to update a piece of UI that is no longer on-screen.
void removeHandler(String name) => _handlers.remove(name);
@override
Future<void> init() async { }
}
4 changes: 2 additions & 2 deletions lib/src/models/data/serial.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import "package:rover_dashboard/models.dart";
/// names instead.
///
/// Send messages to the connected devices using the [sendMessage] method, and all messages
/// received all ports are forwarded to [MessagesModel.onMessage].
/// received all ports are forwarded to [MessagesModel.addMessage].
class SerialModel extends Model {
/// All the connected devices and their respective serial ports.
///
Expand All @@ -37,7 +37,7 @@ class SerialModel extends Model {
models.home.setMessage(severity: Severity.error, text: "Could not connect to $port");
return;
}
device.messages.listen(models.messages.onMessage);
device.messages.listen(models.messages.addMessage);
models.home.setMessage(severity: Severity.info, text: "Connected to $port");
devices[port] = device;
notifyListeners();
Expand Down
2 changes: 1 addition & 1 deletion lib/src/models/data/sockets.dart
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Sockets extends Model {
? onConnect(socket.device)
: onDisconnect(socket.device),
);
socket.messages.listen(models.messages.onMessage);
socket.messages.listen(models.messages.addMessage);
await socket.init();
}
final level = Logger.level;
Expand Down
32 changes: 16 additions & 16 deletions lib/src/models/data/video.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ class VideoModel extends Model {
};

/// How many frames came in the network in the past second.
///
///
/// This number is updated every frame. Use [networkFps] in the UI.
Map<CameraName, int> framesThisSecond = {
for (final name in CameraName.values)
for (final name in CameraName.values)
name: 0,
};

/// How many frames came in the network in the past second.
Map<CameraName, int> networkFps = {};

/// Triggers when it's time to update a new frame.
///
///
/// This is kept here to ensure all widgets are in sync.
Timer? frameUpdater;

Expand All @@ -40,15 +40,15 @@ class VideoModel extends Model {

@override
Future<void> init() async {
models.messages.registerHandler<VideoData>(
models.messages.stream.onMessage<VideoData>(
name: VideoData().messageName,
decoder: VideoData.fromBuffer,
handler: handleData,
constructor: VideoData.fromBuffer,
callback: handleData,
);
models.messages.registerHandler<VideoCommand>(
models.messages.stream.onMessage<VideoCommand>(
name: VideoCommand().messageName,
decoder: VideoCommand.fromBuffer,
handler: (command) => _handshake = command,
constructor: VideoCommand.fromBuffer,
callback: (command) => _handshake = command,
);
fpsTimer = Timer.periodic(const Duration(seconds: 1), resetNetworkFps);
reset();
Expand All @@ -58,7 +58,7 @@ class VideoModel extends Model {
void resetNetworkFps([_]) {
networkFps = Map.from(framesThisSecond);
framesThisSecond = {
for (final name in CameraName.values)
for (final name in CameraName.values)
name: 0,
};
notifyListeners();
Expand Down Expand Up @@ -110,13 +110,13 @@ class VideoModel extends Model {
/// Takes a screenshot of the current frame.
Future<void> saveFrame(CameraName name) async {
final cachedFrame = feeds[name]?.frame;
if (cachedFrame == null) throw ArgumentError.notNull("Feed for $name");
if (cachedFrame == null) throw ArgumentError.notNull("Feed for $name");
await services.files.writeImage(cachedFrame, name.humanName);
models.home.setMessage(severity: Severity.info, text: "Screenshot saved");
}

/// Updates settings for the given camera.
Future<void> updateCamera(String id, CameraDetails details, {bool verify = true}) async {
Future<void> updateCamera(String id, CameraDetails details, {bool verify = true}) async {
_handshake = null;
final command = VideoCommand(id: id, details: details);
models.sockets.video.sendMessage(command);
Expand All @@ -126,7 +126,7 @@ class VideoModel extends Model {
}

/// Enables or disables the given camera.
///
///
/// This function is called automatically, so if the camera is not connected or otherwise available,
/// it'll fail silently. However, if the server simply doesn't respond, it'll show a warning.
Future<void> toggleCamera(CameraName name, {required bool enable}) async {
Expand All @@ -141,16 +141,16 @@ class VideoModel extends Model {
await Future<void>.delayed(const Duration(seconds: 2));
if (_handshake == null) {
models.home.setMessage(
severity: Severity.warning,
severity: Severity.warning,
text: "Could not ${enable ? 'enable' : 'disable'} the ${name.humanName} camera",
);
}
}
}
}

/// An exception thrown when the rover does not respond to a handshake.
///
/// Certain changes require a handshake to ensure the rover has received and applied the change.
/// If the rover fails to acknowledge or apply the change, a response will not be sent. Throw
/// this error to indicate that.
/// this error to indicate that.
class RequestNotAccepted implements Exception { }
31 changes: 16 additions & 15 deletions lib/src/models/rover/metrics.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,32 @@ class RoverMetrics extends Model {

@override
Future<void> init() async {
models.messages.registerHandler<DriveData>(
models.messages.stream.onMessage(
name: DriveData().messageName,
decoder: DriveData.fromBuffer,
handler: drive.update,
constructor: DriveData.fromBuffer,
callback: drive.update,
);
models.messages.registerHandler<ScienceData>(
models.messages.stream.onMessage(
name: ScienceData().messageName,
decoder: ScienceData.fromBuffer,
handler: science.update,
constructor: ScienceData.fromBuffer,
callback: science.update,
);
models.messages.registerHandler<RoverPosition>(
models.messages.stream.onMessage(
name: RoverPosition().messageName,
decoder: RoverPosition.fromBuffer,
handler: position.update,
constructor: RoverPosition.fromBuffer,
callback: position.update,
);
models.messages.registerHandler<ArmData>(
models.messages.stream.onMessage(
name: ArmData().messageName,
decoder: ArmData.fromBuffer,
handler: arm.update,
constructor: ArmData.fromBuffer,
callback: arm.update,
);
models.messages.registerHandler<GripperData>(
models.messages.stream.onMessage(
name: GripperData().messageName,
decoder: GripperData.fromBuffer,
handler: gripper.update,
constructor: GripperData.fromBuffer,
callback: gripper.update,
);
drive.addListener(vitals.notify);
// versionTimer = Timer.periodic(versionInterval, _sendVersions);
}

Expand Down
18 changes: 9 additions & 9 deletions lib/src/models/rover/settings.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import "package:rover_dashboard/models.dart";
const confirmationDelay = Duration(seconds: 1);

/// Updates sensitive settings on the rover.
///
/// Certain settings need confirmation that they were actually changed. Due to the nature of UDP,
///
/// Certain settings need confirmation that they were actually changed. Due to the nature of UDP,
/// we have no way to actually guarantee this, so we simply ask that the rover send the exact same
/// message in response (see [UpdateSetting]). If we do not get the response after waiting for a
/// confirmationDelay], we conclude that the rover didn't receive our request, similar to heartbeat.
Expand All @@ -22,15 +22,15 @@ class RoverSettings extends Model {

@override
Future<void> init() async {
models.messages.registerHandler<UpdateSetting>(
models.messages.stream.onMessage(
name: UpdateSetting().messageName,
decoder: UpdateSetting.fromBuffer,
handler: (settings) => _handshakes++,
constructor: UpdateSetting.fromBuffer,
callback: (settings) => _handshakes++,
);
}

/// Sends an [UpdateSetting] and awaits a response.
///
///
/// The response must be an echo of the data sent, to ensure the rover acknowledges the data.
/// Returns true if the handshake succeeds.
Future<bool> tryChangeSettings(UpdateSetting value) async {
Expand All @@ -45,15 +45,15 @@ class RoverSettings extends Model {
}

/// Sets the status of the rover.
///
///
/// See [RoverStatus] for details.
Future<void> setStatus(RoverStatus value) async {
if (!models.rover.isConnected) return;
if (value == RoverStatus.AUTONOMOUS || value == RoverStatus.IDLE) {
for (final controller in models.rover.controllers) {
controller.setMode(OperatingMode.none);
}
} else if (value == RoverStatus.MANUAL) {
} else if (value == RoverStatus.MANUAL) {
models.rover.setDefaultControls();
} else {
final message = UpdateSetting(status: value);
Expand All @@ -66,7 +66,7 @@ class RoverSettings extends Model {
settings.status = value;
models.rover.status.value = value;
notifyListeners();
}
}

/// Changes the color of the rover's LED strip.
Future<bool> setColor(ProtoColor color, {required bool blink}) async {
Expand Down
12 changes: 8 additions & 4 deletions lib/src/models/view/builders/autonomy_command.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import "dart:async";

import "package:flutter/foundation.dart";
import "package:rover_dashboard/data.dart";
import "package:rover_dashboard/models.dart";
Expand All @@ -22,19 +24,21 @@ class AutonomyCommandBuilder extends ValueBuilder<AutonomyCommand> {
@override
List<ChangeNotifier> get otherBuilders => [models.rover.status];

StreamSubscription<AutonomyCommand>? _subscription;

/// Listens for incoming confirmations from the rover that it received the command.
Future<void> init() async {
await Future<void>.delayed(const Duration(seconds: 1));
models.messages.registerHandler<AutonomyCommand>(
_subscription = models.messages.stream.onMessage(
name: AutonomyCommand().messageName,
decoder: AutonomyCommand.fromBuffer,
handler: (data) => _handshake = data,
constructor: AutonomyCommand.fromBuffer,
callback: (data) => _handshake = data,
);
}

@override
void dispose() {
models.messages.removeHandler(AutonomyCommand().messageName);
_subscription?.cancel();
super.dispose();
}

Expand Down
Loading

0 comments on commit 1957e52

Please sign in to comment.