Skip to content

Commit

Permalink
refactor(widget): get rid of ProcessingContext and inline it in its…
Browse files Browse the repository at this point in the history
… callers
  • Loading branch information
bnjbvr committed Nov 14, 2024
1 parent 535fada commit 21b4c94
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 78 deletions.
2 changes: 1 addition & 1 deletion crates/matrix-sdk/src/widget/machine/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub(crate) enum IncomingMessage {
/// The `MatrixDriver` notified the `WidgetMachine` of a new matrix event.
///
/// This means that the machine previously subscribed to some events
/// (`Action::Subscribe` request).
/// ([`crate::widget::Action::Subscribe`] request).
MatrixEventReceived(Raw<AnyTimelineEvent>),
}

Expand Down
11 changes: 8 additions & 3 deletions crates/matrix-sdk/src/widget/machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ pub(crate) use self::{
incoming::{IncomingMessage, MatrixDriverResponse},
};

/// Action (a command) that client (driver) must perform.
/// A command to perform in reaction to an [`IncomingMessage`].
///
/// There are also initial actions that may be performed at the creation of a
/// [`WidgetMachine`].
#[derive(Debug)]
pub(crate) enum Action {
/// Send a raw message to the widget.
Expand Down Expand Up @@ -144,8 +147,10 @@ impl WidgetMachine {
capabilities: CapabilitiesState::Unset,
};

let actions = (!init_on_content_load).then(|| machine.negotiate_capabilities());
(machine, actions.unwrap_or_default())
let initial_actions =
if init_on_content_load { Vec::new() } else { machine.negotiate_capabilities() };

(machine, initial_actions)
}

/// Main entry point to drive the state machine.
Expand Down
149 changes: 75 additions & 74 deletions crates/matrix-sdk/src/widget/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ pub struct WidgetDriver {
///
/// These can be both requests and responses.
to_widget_tx: Sender<String>,

/// Drop guard for an event handler forwarding all events from the Matrix
/// room to the widget.
///
/// Only set if a subscription happened ([`Action::Subscribe`]).
event_forwarding_guard: Option<DropGuard>,
}

/// A handle that encapsulates the communication between a widget driver and the
Expand Down Expand Up @@ -109,7 +115,7 @@ impl WidgetDriver {
let (from_widget_tx, from_widget_rx) = async_channel::unbounded();
let (to_widget_tx, to_widget_rx) = async_channel::unbounded();

let driver = Self { settings, from_widget_rx, to_widget_tx };
let driver = Self { settings, from_widget_rx, to_widget_tx, event_forwarding_guard: None };
let channels = WidgetDriverHandle { from_widget_tx, to_widget_rx };

(driver, channels)
Expand All @@ -120,77 +126,69 @@ impl WidgetDriver {
/// The function returns once the widget is disconnected or any terminal
/// error occurs.
pub async fn run(
self,
mut self,
room: Room,
capabilities_provider: impl CapabilitiesProvider,
) -> Result<(), ()> {
// Create a channel so that we can conveniently send all messages to it.
let (incoming_messages_tx, mut incoming_messages_rx) = unbounded_channel();
//
// It will receive:
// - all incoming messages from the widget
// - all responses from the Matrix driver
// - all events from the Matrix driver, if subscribed
let (incoming_msg_tx, mut incoming_msg_rx) = unbounded_channel();

// Forward all of the incoming messages from the widget.
tokio::spawn({
let incoming_messages_tx = incoming_messages_tx.clone();
let incoming_msg_tx = incoming_msg_tx.clone();
let from_widget_rx = self.from_widget_rx.clone();
async move {
while let Ok(msg) = self.from_widget_rx.recv().await {
let _ = incoming_messages_tx.send(IncomingMessage::WidgetMessage(msg));
while let Ok(msg) = from_widget_rx.recv().await {
let _ = incoming_msg_tx.send(IncomingMessage::WidgetMessage(msg));
}
}
});

// Create widget API machine.
let (client_api, initial_actions) = WidgetMachine::new(
let (mut widget_machine, initial_actions) = WidgetMachine::new(
self.settings.widget_id().to_owned(),
room.room_id().to_owned(),
self.settings.init_on_content_load(),
None,
);

// The environment for the processing of actions from the widget machine.
let mut ctx = ProcessingContext {
widget_machine: client_api,
matrix_driver: MatrixDriver::new(room.clone()),
event_forwarding_guard: None,
to_widget_tx: self.to_widget_tx,
events_tx: incoming_messages_tx,
capabilities_provider,
};
let matrix_driver = MatrixDriver::new(room.clone());

// Process initial actions that "initialise" the widget api machine.
for action in initial_actions {
ctx.process_action(action).await?;
self.process_action(&matrix_driver, &incoming_msg_tx, &capabilities_provider, action)
.await?;
}

// Process incoming messages as they're coming in.
while let Some(message) = incoming_messages_rx.recv().await {
ctx.process_incoming_message(message).await?;
}

Ok(())
}
}

/// A small wrapper of all the data that we need to process an incoming event.
struct ProcessingContext<T> {
widget_machine: WidgetMachine,
matrix_driver: MatrixDriver,
event_forwarding_guard: Option<DropGuard>,
to_widget_tx: Sender<String>,
events_tx: UnboundedSender<IncomingMessage>,
capabilities_provider: T,
}

impl<T: CapabilitiesProvider> ProcessingContext<T> {
/// Compute the actions for a single given incoming message, and performs
/// them immediately.
async fn process_incoming_message(&mut self, msg: IncomingMessage) -> Result<(), ()> {
for action in self.widget_machine.process(msg) {
self.process_action(action).await?;
while let Some(msg) = incoming_msg_rx.recv().await {
for action in widget_machine.process(msg) {
self.process_action(
&matrix_driver,
&incoming_msg_tx,
&capabilities_provider,
action,
)
.await?;
}
}

Ok(())
}

async fn process_action(&mut self, action: Action) -> Result<(), ()> {
/// Process a single [`Action`].
async fn process_action(
&mut self,
matrix_driver: &MatrixDriver,
incoming_msg_tx: &UnboundedSender<IncomingMessage>,
capabilities_provider: &impl CapabilitiesProvider,
action: Action,
) -> Result<(), ()> {
match action {
Action::SendToWidget(msg) => {
self.to_widget_tx.send(msg).await.map_err(|_| ())?;
Expand All @@ -199,29 +197,25 @@ impl<T: CapabilitiesProvider> ProcessingContext<T> {
Action::MatrixDriverRequest { request_id, data } => {
let response = match data {
MatrixDriverRequestData::AcquireCapabilities(cmd) => {
let obtained = self
.capabilities_provider
let obtained = capabilities_provider
.acquire_capabilities(cmd.desired_capabilities)
.await;
Ok(MatrixDriverResponse::CapabilitiesAcquired(obtained))
}

MatrixDriverRequestData::GetOpenId => self
.matrix_driver
MatrixDriverRequestData::GetOpenId => matrix_driver
.get_open_id()
.await
.map(MatrixDriverResponse::OpenIdReceived)
.map_err(|e| e.to_string()),

MatrixDriverRequestData::ReadMessageLikeEvent(cmd) => self
.matrix_driver
MatrixDriverRequestData::ReadMessageLikeEvent(cmd) => matrix_driver
.read_message_like_events(cmd.event_type.clone(), cmd.limit)
.await
.map(MatrixDriverResponse::MatrixEventRead)
.map_err(|e| e.to_string()),

MatrixDriverRequestData::ReadStateEvent(cmd) => self
.matrix_driver
MatrixDriverRequestData::ReadStateEvent(cmd) => matrix_driver
.read_state_events(cmd.event_type.clone(), &cmd.state_key)
.await
.map(MatrixDriverResponse::MatrixEventRead)
Expand All @@ -236,50 +230,57 @@ impl<T: CapabilitiesProvider> ProcessingContext<T> {
let delay_event_parameter = delay.map(|d| DelayParameters::Timeout {
timeout: Duration::from_millis(d),
});
self.matrix_driver
matrix_driver
.send(event_type, state_key, content, delay_event_parameter)
.await
.map(MatrixDriverResponse::MatrixEventSent)
.map_err(|e: crate::Error| e.to_string())
}

MatrixDriverRequestData::UpdateDelayedEvent(req) => self
.matrix_driver
MatrixDriverRequestData::UpdateDelayedEvent(req) => matrix_driver
.update_delayed_event(req.delay_id, req.action)
.await
.map(MatrixDriverResponse::MatrixDelayedEventUpdate)
.map_err(|e: HttpError| e.to_string()),
};

self.events_tx
// Forward the matrix driver response to the incoming message stream.
incoming_msg_tx
.send(IncomingMessage::MatrixDriverResponse { request_id, response })
.map_err(|_| ())?;
}

Action::Subscribe => {
// Only subscribe if we are not already subscribed.
if self.event_forwarding_guard.is_none() {
let (stop_forwarding, guard) = {
let token = CancellationToken::new();
(token.child_token(), token.drop_guard())
};

self.event_forwarding_guard = Some(guard);
let (mut matrix, events_tx) =
(self.matrix_driver.events(), self.events_tx.clone());

tokio::spawn(async move {
loop {
tokio::select! {
_ = stop_forwarding.cancelled() => { return }

Some(event) = matrix.recv() => {
let _ = events_tx.send(IncomingMessage::MatrixEventReceived(event));
}
if self.event_forwarding_guard.is_some() {
return Ok(());
}

let (stop_forwarding, guard) = {
let token = CancellationToken::new();
(token.child_token(), token.drop_guard())
};

self.event_forwarding_guard = Some(guard);

let mut matrix = matrix_driver.events();
let incoming_msg_tx = incoming_msg_tx.clone();

tokio::spawn(async move {
loop {
tokio::select! {
_ = stop_forwarding.cancelled() => {
// Upon cancellation, stop this task.
return;
}

Some(event) = matrix.recv() => {
// Forward all events to the incoming messages stream.
let _ = incoming_msg_tx.send(IncomingMessage::MatrixEventReceived(event));
}
}
});
}
}
});
}

Action::Unsubscribe => {
Expand Down

0 comments on commit 21b4c94

Please sign in to comment.