From 21b4c948691e30618a6cb00c13b629cc530f387e Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 14 Nov 2024 11:17:43 +0100 Subject: [PATCH] refactor(widget): get rid of `ProcessingContext` and inline it in its callers --- .../matrix-sdk/src/widget/machine/incoming.rs | 2 +- crates/matrix-sdk/src/widget/machine/mod.rs | 11 +- crates/matrix-sdk/src/widget/mod.rs | 149 +++++++++--------- 3 files changed, 84 insertions(+), 78 deletions(-) diff --git a/crates/matrix-sdk/src/widget/machine/incoming.rs b/crates/matrix-sdk/src/widget/machine/incoming.rs index e0ca2c1b968..7d165b59c99 100644 --- a/crates/matrix-sdk/src/widget/machine/incoming.rs +++ b/crates/matrix-sdk/src/widget/machine/incoming.rs @@ -44,7 +44,7 @@ pub(crate) enum IncomingMessage { /// The `MatrixDriver` notified the `WidgetMachine` of a new matrix event. /// /// This means that the machine previously subscribed to some events - /// (`Action::Subscribe` request). + /// ([`crate::widget::Action::Subscribe`] request). MatrixEventReceived(Raw), } diff --git a/crates/matrix-sdk/src/widget/machine/mod.rs b/crates/matrix-sdk/src/widget/machine/mod.rs index d3b600c2c7f..e7b86075b87 100644 --- a/crates/matrix-sdk/src/widget/machine/mod.rs +++ b/crates/matrix-sdk/src/widget/machine/mod.rs @@ -68,7 +68,10 @@ pub(crate) use self::{ incoming::{IncomingMessage, MatrixDriverResponse}, }; -/// Action (a command) that client (driver) must perform. +/// A command to perform in reaction to an [`IncomingMessage`]. +/// +/// There are also initial actions that may be performed at the creation of a +/// [`WidgetMachine`]. #[derive(Debug)] pub(crate) enum Action { /// Send a raw message to the widget. @@ -144,8 +147,10 @@ impl WidgetMachine { capabilities: CapabilitiesState::Unset, }; - let actions = (!init_on_content_load).then(|| machine.negotiate_capabilities()); - (machine, actions.unwrap_or_default()) + let initial_actions = + if init_on_content_load { Vec::new() } else { machine.negotiate_capabilities() }; + + (machine, initial_actions) } /// Main entry point to drive the state machine. diff --git a/crates/matrix-sdk/src/widget/mod.rs b/crates/matrix-sdk/src/widget/mod.rs index 3176c5f0889..74e847f0137 100644 --- a/crates/matrix-sdk/src/widget/mod.rs +++ b/crates/matrix-sdk/src/widget/mod.rs @@ -61,6 +61,12 @@ pub struct WidgetDriver { /// /// These can be both requests and responses. to_widget_tx: Sender, + + /// Drop guard for an event handler forwarding all events from the Matrix + /// room to the widget. + /// + /// Only set if a subscription happened ([`Action::Subscribe`]). + event_forwarding_guard: Option, } /// A handle that encapsulates the communication between a widget driver and the @@ -109,7 +115,7 @@ impl WidgetDriver { let (from_widget_tx, from_widget_rx) = async_channel::unbounded(); let (to_widget_tx, to_widget_rx) = async_channel::unbounded(); - let driver = Self { settings, from_widget_rx, to_widget_tx }; + let driver = Self { settings, from_widget_rx, to_widget_tx, event_forwarding_guard: None }; let channels = WidgetDriverHandle { from_widget_tx, to_widget_rx }; (driver, channels) @@ -120,77 +126,69 @@ impl WidgetDriver { /// The function returns once the widget is disconnected or any terminal /// error occurs. pub async fn run( - self, + mut self, room: Room, capabilities_provider: impl CapabilitiesProvider, ) -> Result<(), ()> { // Create a channel so that we can conveniently send all messages to it. - let (incoming_messages_tx, mut incoming_messages_rx) = unbounded_channel(); + // + // It will receive: + // - all incoming messages from the widget + // - all responses from the Matrix driver + // - all events from the Matrix driver, if subscribed + let (incoming_msg_tx, mut incoming_msg_rx) = unbounded_channel(); // Forward all of the incoming messages from the widget. tokio::spawn({ - let incoming_messages_tx = incoming_messages_tx.clone(); + let incoming_msg_tx = incoming_msg_tx.clone(); + let from_widget_rx = self.from_widget_rx.clone(); async move { - while let Ok(msg) = self.from_widget_rx.recv().await { - let _ = incoming_messages_tx.send(IncomingMessage::WidgetMessage(msg)); + while let Ok(msg) = from_widget_rx.recv().await { + let _ = incoming_msg_tx.send(IncomingMessage::WidgetMessage(msg)); } } }); // Create widget API machine. - let (client_api, initial_actions) = WidgetMachine::new( + let (mut widget_machine, initial_actions) = WidgetMachine::new( self.settings.widget_id().to_owned(), room.room_id().to_owned(), self.settings.init_on_content_load(), None, ); - // The environment for the processing of actions from the widget machine. - let mut ctx = ProcessingContext { - widget_machine: client_api, - matrix_driver: MatrixDriver::new(room.clone()), - event_forwarding_guard: None, - to_widget_tx: self.to_widget_tx, - events_tx: incoming_messages_tx, - capabilities_provider, - }; + let matrix_driver = MatrixDriver::new(room.clone()); // Process initial actions that "initialise" the widget api machine. for action in initial_actions { - ctx.process_action(action).await?; + self.process_action(&matrix_driver, &incoming_msg_tx, &capabilities_provider, action) + .await?; } // Process incoming messages as they're coming in. - while let Some(message) = incoming_messages_rx.recv().await { - ctx.process_incoming_message(message).await?; - } - - Ok(()) - } -} - -/// A small wrapper of all the data that we need to process an incoming event. -struct ProcessingContext { - widget_machine: WidgetMachine, - matrix_driver: MatrixDriver, - event_forwarding_guard: Option, - to_widget_tx: Sender, - events_tx: UnboundedSender, - capabilities_provider: T, -} - -impl ProcessingContext { - /// Compute the actions for a single given incoming message, and performs - /// them immediately. - async fn process_incoming_message(&mut self, msg: IncomingMessage) -> Result<(), ()> { - for action in self.widget_machine.process(msg) { - self.process_action(action).await?; + while let Some(msg) = incoming_msg_rx.recv().await { + for action in widget_machine.process(msg) { + self.process_action( + &matrix_driver, + &incoming_msg_tx, + &capabilities_provider, + action, + ) + .await?; + } } Ok(()) } - async fn process_action(&mut self, action: Action) -> Result<(), ()> { + /// Process a single [`Action`]. + async fn process_action( + &mut self, + matrix_driver: &MatrixDriver, + incoming_msg_tx: &UnboundedSender, + capabilities_provider: &impl CapabilitiesProvider, + action: Action, + ) -> Result<(), ()> { match action { Action::SendToWidget(msg) => { self.to_widget_tx.send(msg).await.map_err(|_| ())?; @@ -199,29 +197,25 @@ impl ProcessingContext { Action::MatrixDriverRequest { request_id, data } => { let response = match data { MatrixDriverRequestData::AcquireCapabilities(cmd) => { - let obtained = self - .capabilities_provider + let obtained = capabilities_provider .acquire_capabilities(cmd.desired_capabilities) .await; Ok(MatrixDriverResponse::CapabilitiesAcquired(obtained)) } - MatrixDriverRequestData::GetOpenId => self - .matrix_driver + MatrixDriverRequestData::GetOpenId => matrix_driver .get_open_id() .await .map(MatrixDriverResponse::OpenIdReceived) .map_err(|e| e.to_string()), - MatrixDriverRequestData::ReadMessageLikeEvent(cmd) => self - .matrix_driver + MatrixDriverRequestData::ReadMessageLikeEvent(cmd) => matrix_driver .read_message_like_events(cmd.event_type.clone(), cmd.limit) .await .map(MatrixDriverResponse::MatrixEventRead) .map_err(|e| e.to_string()), - MatrixDriverRequestData::ReadStateEvent(cmd) => self - .matrix_driver + MatrixDriverRequestData::ReadStateEvent(cmd) => matrix_driver .read_state_events(cmd.event_type.clone(), &cmd.state_key) .await .map(MatrixDriverResponse::MatrixEventRead) @@ -236,50 +230,57 @@ impl ProcessingContext { let delay_event_parameter = delay.map(|d| DelayParameters::Timeout { timeout: Duration::from_millis(d), }); - self.matrix_driver + matrix_driver .send(event_type, state_key, content, delay_event_parameter) .await .map(MatrixDriverResponse::MatrixEventSent) .map_err(|e: crate::Error| e.to_string()) } - MatrixDriverRequestData::UpdateDelayedEvent(req) => self - .matrix_driver + MatrixDriverRequestData::UpdateDelayedEvent(req) => matrix_driver .update_delayed_event(req.delay_id, req.action) .await .map(MatrixDriverResponse::MatrixDelayedEventUpdate) .map_err(|e: HttpError| e.to_string()), }; - self.events_tx + // Forward the matrix driver response to the incoming message stream. + incoming_msg_tx .send(IncomingMessage::MatrixDriverResponse { request_id, response }) .map_err(|_| ())?; } Action::Subscribe => { // Only subscribe if we are not already subscribed. - if self.event_forwarding_guard.is_none() { - let (stop_forwarding, guard) = { - let token = CancellationToken::new(); - (token.child_token(), token.drop_guard()) - }; - - self.event_forwarding_guard = Some(guard); - let (mut matrix, events_tx) = - (self.matrix_driver.events(), self.events_tx.clone()); - - tokio::spawn(async move { - loop { - tokio::select! { - _ = stop_forwarding.cancelled() => { return } - - Some(event) = matrix.recv() => { - let _ = events_tx.send(IncomingMessage::MatrixEventReceived(event)); - } + if self.event_forwarding_guard.is_some() { + return Ok(()); + } + + let (stop_forwarding, guard) = { + let token = CancellationToken::new(); + (token.child_token(), token.drop_guard()) + }; + + self.event_forwarding_guard = Some(guard); + + let mut matrix = matrix_driver.events(); + let incoming_msg_tx = incoming_msg_tx.clone(); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = stop_forwarding.cancelled() => { + // Upon cancellation, stop this task. + return; + } + + Some(event) = matrix.recv() => { + // Forward all events to the incoming messages stream. + let _ = incoming_msg_tx.send(IncomingMessage::MatrixEventReceived(event)); } } - }); - } + } + }); } Action::Unsubscribe => {