diff --git a/lib/src/data/metrics/vitals.dart b/lib/src/data/metrics/vitals.dart index af8b0f48f..f76fa30ce 100644 --- a/lib/src/data/metrics/vitals.dart +++ b/lib/src/data/metrics/vitals.dart @@ -37,6 +37,9 @@ class VitalsMetrics extends Metrics { } } + /// Publicly exposes [notifyListeners]. + void notify() => notifyListeners(); + @override List get allMetrics => [ MetricLine("Voltage: ${drive.batteryVoltage.toStringAsFixed(2)} V", severity: voltageSeverity), diff --git a/lib/src/data/protobuf.dart b/lib/src/data/protobuf.dart index b5dd0d048..1d952946a 100644 --- a/lib/src/data/protobuf.dart +++ b/lib/src/data/protobuf.dart @@ -10,15 +10,6 @@ export "package:protobuf/protobuf.dart" show GeneratedMessageGenericExtensions; /// The `.fromBuffer` constructor is a type of [MessageDecoder]. typedef MessageDecoder = T Function(List data); -/// A callback to execute with a specific serialized Protobuf message. -typedef MessageHandler = void Function(T); - -/// A callback to handle any [WrappedMessage]. -typedef WrappedMessageHandler = void Function(WrappedMessage); - -/// A callback to execute with raw Protobuf data. -typedef RawDataHandler = void Function(List data); - /// Gets the name of the command message for the given device. String getCommandName(Device device) => switch (device) { Device.ARM => "ArmCommand", diff --git a/lib/src/models/data/logs.dart b/lib/src/models/data/logs.dart index 6cbebcb62..41efcfa1f 100644 --- a/lib/src/models/data/logs.dart +++ b/lib/src/models/data/logs.dart @@ -48,7 +48,11 @@ class LogsModel extends Model { @override Future init() async { - models.messages.registerHandler(name: BurtLog().messageName, decoder: BurtLog.fromBuffer, handler: handleLog); + models.messages.stream.onMessage( + name: BurtLog().messageName, + constructor: BurtLog.fromBuffer, + callback: handleLog, + ); saveToFileTimer = Timer.periodic(saveToFileInterval, saveToFile); } diff --git a/lib/src/models/data/messages.dart b/lib/src/models/data/messages.dart index d207fffe8..73562cbdf 100644 --- a/lib/src/models/data/messages.dart +++ b/lib/src/models/data/messages.dart @@ -1,34 +1,23 @@ -import "package:protobuf/protobuf.dart"; +import "dart:async"; import "package:rover_dashboard/data.dart"; import "package:rover_dashboard/models.dart"; -/// A mixin that delegates [WrappedMessage]s to a handler via [registerHandler]. -/// -/// - Use [registerHandler] to invoke your handler whenever a new message is received. -/// - Use [removeHandler] to remove your handler. -/// - Override [allowedFallthrough] to allow certain massages to pass unhandled. -class MessagesModel { - /// A set of message types that are allowed to pass through without being handled. - static const Set allowedFallthrough = {"AutonomyData", "Disconnect"}; - - /// A set of handlers to be called based on [WrappedMessage.name]. - final Map _handlers = {}; - - /// Delegates the message contents to the appropriate handler. - void onMessage(WrappedMessage wrapper) { - final rawHandler = _handlers[wrapper.name]; - if (rawHandler == null) { - if (allowedFallthrough.contains(wrapper.name)) return; - throw StateError("No handler registered for ${wrapper.name} message"); - } - try { - return rawHandler(wrapper.data); - } on InvalidProtocolBufferException { - // Data is corrupt, ignore it - } - } +export "package:burt_network/burt_network.dart" show WrappedMessageStream; +/// A single model to consolidate all messages. +/// +/// Messages can arrive from serial devices or UDP devices, and any device can be unexpectedly +/// disconnected at any time. To simplify the logic of subscribing for new messages, this model +/// holds a [stream] of [WrappedMessage]s that anyone can subscribe to. When a message arrives, +/// simply call [addMessage] to ensure it will be added to the stream. +/// +/// Note that having this model forward [stream] to the serial and UDP streams would *not* work, +/// as those streams can be closed when devices are disconnected, and new streams are created when +/// devices are connected for the first time. In that case, anyone who subscribes to the stream +/// before a device is connected (eg, in [Model.init]) won't get messages received afterwards. To +/// get around this issue, this model uses the same [StreamController] the entire time. +class MessagesModel extends Model { /// Sends a command over the network or over Serial. void sendMessage(Message message, {bool checkVersion = true}) { final shouldCheck = checkVersion && models.settings.dashboard.versionChecking; @@ -42,34 +31,14 @@ class MessagesModel { models.sockets.data.sendMessage(message); } - /// Adds a handler for the given message type. - /// - /// When a new message is received, [onMessage] checks to see if its type matches [name]. - /// If so, it calls [decoder] to parse the binary data into a Protobuf class, and then - /// calls [handler] with that parsed data class. - /// - /// For example, with a message called `ScienceData`, you would use this function as: - /// ```dart - /// registerHandler( - /// name: ScienceData().messageName, // identify the data as a ScienceData message - /// decoder: ScienceData.fromBuffer, // deserialize into a ScienceData instance - /// handler: (ScienceData data) => print(data.methane), // do something with the data - /// ); - /// ``` - void registerHandler({ - required String name, - required MessageDecoder decoder, - required MessageHandler handler, - }) { - if (_handlers.containsKey(name)) { // handler was already registered - throw ArgumentError("There's already a message handler for $name."); - } else { - _handlers[name] = (data) => handler(decoder(data)); - } - } + final _controller = StreamController.broadcast(); + + /// The stream of messages. Use [WrappedMessageStream.onMessage] to subscribe to messages. + Stream get stream => _controller.stream; + + /// Adds a message to the [stream]. + void addMessage(WrappedMessage message) => _controller.add(message); - /// Removes the handler for a given message type. - /// - /// This is useful if you register a handler to update a piece of UI that is no longer on-screen. - void removeHandler(String name) => _handlers.remove(name); + @override + Future init() async { } } diff --git a/lib/src/models/data/serial.dart b/lib/src/models/data/serial.dart index 238409130..b52e1977e 100644 --- a/lib/src/models/data/serial.dart +++ b/lib/src/models/data/serial.dart @@ -10,7 +10,7 @@ import "package:rover_dashboard/models.dart"; /// names instead. /// /// Send messages to the connected devices using the [sendMessage] method, and all messages -/// received all ports are forwarded to [MessagesModel.onMessage]. +/// received all ports are forwarded to [MessagesModel.addMessage]. class SerialModel extends Model { /// All the connected devices and their respective serial ports. /// @@ -37,7 +37,7 @@ class SerialModel extends Model { models.home.setMessage(severity: Severity.error, text: "Could not connect to $port"); return; } - device.messages.listen(models.messages.onMessage); + device.messages.listen(models.messages.addMessage); models.home.setMessage(severity: Severity.info, text: "Connected to $port"); devices[port] = device; notifyListeners(); diff --git a/lib/src/models/data/sockets.dart b/lib/src/models/data/sockets.dart index ae1f10ea0..68ff50cea 100644 --- a/lib/src/models/data/sockets.dart +++ b/lib/src/models/data/sockets.dart @@ -55,7 +55,7 @@ class Sockets extends Model { ? onConnect(socket.device) : onDisconnect(socket.device), ); - socket.messages.listen(models.messages.onMessage); + socket.messages.listen(models.messages.addMessage); await socket.init(); } final level = Logger.level; diff --git a/lib/src/models/data/video.dart b/lib/src/models/data/video.dart index 6a9b9dc71..b1edebc46 100644 --- a/lib/src/models/data/video.dart +++ b/lib/src/models/data/video.dart @@ -17,10 +17,10 @@ class VideoModel extends Model { }; /// How many frames came in the network in the past second. - /// + /// /// This number is updated every frame. Use [networkFps] in the UI. Map framesThisSecond = { - for (final name in CameraName.values) + for (final name in CameraName.values) name: 0, }; @@ -28,7 +28,7 @@ class VideoModel extends Model { Map networkFps = {}; /// Triggers when it's time to update a new frame. - /// + /// /// This is kept here to ensure all widgets are in sync. Timer? frameUpdater; @@ -40,15 +40,15 @@ class VideoModel extends Model { @override Future init() async { - models.messages.registerHandler( + models.messages.stream.onMessage( name: VideoData().messageName, - decoder: VideoData.fromBuffer, - handler: handleData, + constructor: VideoData.fromBuffer, + callback: handleData, ); - models.messages.registerHandler( + models.messages.stream.onMessage( name: VideoCommand().messageName, - decoder: VideoCommand.fromBuffer, - handler: (command) => _handshake = command, + constructor: VideoCommand.fromBuffer, + callback: (command) => _handshake = command, ); fpsTimer = Timer.periodic(const Duration(seconds: 1), resetNetworkFps); reset(); @@ -58,7 +58,7 @@ class VideoModel extends Model { void resetNetworkFps([_]) { networkFps = Map.from(framesThisSecond); framesThisSecond = { - for (final name in CameraName.values) + for (final name in CameraName.values) name: 0, }; notifyListeners(); @@ -110,13 +110,13 @@ class VideoModel extends Model { /// Takes a screenshot of the current frame. Future saveFrame(CameraName name) async { final cachedFrame = feeds[name]?.frame; - if (cachedFrame == null) throw ArgumentError.notNull("Feed for $name"); + if (cachedFrame == null) throw ArgumentError.notNull("Feed for $name"); await services.files.writeImage(cachedFrame, name.humanName); models.home.setMessage(severity: Severity.info, text: "Screenshot saved"); } /// Updates settings for the given camera. - Future updateCamera(String id, CameraDetails details, {bool verify = true}) async { + Future updateCamera(String id, CameraDetails details, {bool verify = true}) async { _handshake = null; final command = VideoCommand(id: id, details: details); models.sockets.video.sendMessage(command); @@ -126,7 +126,7 @@ class VideoModel extends Model { } /// Enables or disables the given camera. - /// + /// /// This function is called automatically, so if the camera is not connected or otherwise available, /// it'll fail silently. However, if the server simply doesn't respond, it'll show a warning. Future toggleCamera(CameraName name, {required bool enable}) async { @@ -141,16 +141,16 @@ class VideoModel extends Model { await Future.delayed(const Duration(seconds: 2)); if (_handshake == null) { models.home.setMessage( - severity: Severity.warning, + severity: Severity.warning, text: "Could not ${enable ? 'enable' : 'disable'} the ${name.humanName} camera", ); } - } + } } /// An exception thrown when the rover does not respond to a handshake. /// /// Certain changes require a handshake to ensure the rover has received and applied the change. /// If the rover fails to acknowledge or apply the change, a response will not be sent. Throw -/// this error to indicate that. +/// this error to indicate that. class RequestNotAccepted implements Exception { } diff --git a/lib/src/models/rover/metrics.dart b/lib/src/models/rover/metrics.dart index a6815617f..a1fe22ead 100644 --- a/lib/src/models/rover/metrics.dart +++ b/lib/src/models/rover/metrics.dart @@ -44,31 +44,32 @@ class RoverMetrics extends Model { @override Future init() async { - models.messages.registerHandler( + models.messages.stream.onMessage( name: DriveData().messageName, - decoder: DriveData.fromBuffer, - handler: drive.update, + constructor: DriveData.fromBuffer, + callback: drive.update, ); - models.messages.registerHandler( + models.messages.stream.onMessage( name: ScienceData().messageName, - decoder: ScienceData.fromBuffer, - handler: science.update, + constructor: ScienceData.fromBuffer, + callback: science.update, ); - models.messages.registerHandler( + models.messages.stream.onMessage( name: RoverPosition().messageName, - decoder: RoverPosition.fromBuffer, - handler: position.update, + constructor: RoverPosition.fromBuffer, + callback: position.update, ); - models.messages.registerHandler( + models.messages.stream.onMessage( name: ArmData().messageName, - decoder: ArmData.fromBuffer, - handler: arm.update, + constructor: ArmData.fromBuffer, + callback: arm.update, ); - models.messages.registerHandler( + models.messages.stream.onMessage( name: GripperData().messageName, - decoder: GripperData.fromBuffer, - handler: gripper.update, + constructor: GripperData.fromBuffer, + callback: gripper.update, ); + drive.addListener(vitals.notify); // versionTimer = Timer.periodic(versionInterval, _sendVersions); } diff --git a/lib/src/models/rover/settings.dart b/lib/src/models/rover/settings.dart index 43db32c6a..099bea3bb 100644 --- a/lib/src/models/rover/settings.dart +++ b/lib/src/models/rover/settings.dart @@ -5,8 +5,8 @@ import "package:rover_dashboard/models.dart"; const confirmationDelay = Duration(seconds: 1); /// Updates sensitive settings on the rover. -/// -/// Certain settings need confirmation that they were actually changed. Due to the nature of UDP, +/// +/// Certain settings need confirmation that they were actually changed. Due to the nature of UDP, /// we have no way to actually guarantee this, so we simply ask that the rover send the exact same /// message in response (see [UpdateSetting]). If we do not get the response after waiting for a /// confirmationDelay], we conclude that the rover didn't receive our request, similar to heartbeat. @@ -22,15 +22,15 @@ class RoverSettings extends Model { @override Future init() async { - models.messages.registerHandler( + models.messages.stream.onMessage( name: UpdateSetting().messageName, - decoder: UpdateSetting.fromBuffer, - handler: (settings) => _handshakes++, + constructor: UpdateSetting.fromBuffer, + callback: (settings) => _handshakes++, ); } /// Sends an [UpdateSetting] and awaits a response. - /// + /// /// The response must be an echo of the data sent, to ensure the rover acknowledges the data. /// Returns true if the handshake succeeds. Future tryChangeSettings(UpdateSetting value) async { @@ -45,7 +45,7 @@ class RoverSettings extends Model { } /// Sets the status of the rover. - /// + /// /// See [RoverStatus] for details. Future setStatus(RoverStatus value) async { if (!models.rover.isConnected) return; @@ -53,7 +53,7 @@ class RoverSettings extends Model { for (final controller in models.rover.controllers) { controller.setMode(OperatingMode.none); } - } else if (value == RoverStatus.MANUAL) { + } else if (value == RoverStatus.MANUAL) { models.rover.setDefaultControls(); } else { final message = UpdateSetting(status: value); @@ -66,7 +66,7 @@ class RoverSettings extends Model { settings.status = value; models.rover.status.value = value; notifyListeners(); - } + } /// Changes the color of the rover's LED strip. Future setColor(ProtoColor color, {required bool blink}) async { diff --git a/lib/src/models/view/builders/autonomy_command.dart b/lib/src/models/view/builders/autonomy_command.dart index b4a8da6d1..54e86bda6 100644 --- a/lib/src/models/view/builders/autonomy_command.dart +++ b/lib/src/models/view/builders/autonomy_command.dart @@ -1,3 +1,5 @@ +import "dart:async"; + import "package:flutter/foundation.dart"; import "package:rover_dashboard/data.dart"; import "package:rover_dashboard/models.dart"; @@ -22,19 +24,21 @@ class AutonomyCommandBuilder extends ValueBuilder { @override List get otherBuilders => [models.rover.status]; + StreamSubscription? _subscription; + /// Listens for incoming confirmations from the rover that it received the command. Future init() async { await Future.delayed(const Duration(seconds: 1)); - models.messages.registerHandler( + _subscription = models.messages.stream.onMessage( name: AutonomyCommand().messageName, - decoder: AutonomyCommand.fromBuffer, - handler: (data) => _handshake = data, + constructor: AutonomyCommand.fromBuffer, + callback: (data) => _handshake = data, ); } @override void dispose() { - models.messages.removeHandler(AutonomyCommand().messageName); + _subscription?.cancel(); super.dispose(); } diff --git a/lib/src/models/view/map.dart b/lib/src/models/view/map.dart index 4a73a5bb6..f9344a963 100644 --- a/lib/src/models/view/map.dart +++ b/lib/src/models/view/map.dart @@ -57,14 +57,16 @@ class AutonomyModel with ChangeNotifier { /// Listens for incoming autonomy or position data. AutonomyModel() { init(); } + StreamSubscription? _subscription; + /// Initializes the view model. Future init() async { recenterRover(); await Future.delayed(const Duration(seconds: 1)); - models.messages.registerHandler( + _subscription = models.messages.stream.onMessage( name: AutonomyData().messageName, - decoder: AutonomyData.fromBuffer, - handler: onNewData, + constructor: AutonomyData.fromBuffer, + callback: onNewData, ); models.rover.metrics.position.addListener(recenterRover); models.settings.addListener(notifyListeners); @@ -75,7 +77,7 @@ class AutonomyModel with ChangeNotifier { @override void dispose() { - models.messages.removeHandler(AutonomyData().messageName); + _subscription?.cancel(); models.settings.removeListener(notifyListeners); models.rover.metrics.position.removeListener(recenterRover); badAppleAudioPlayer.dispose();