diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 6210777083f..187cd818813 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -128,8 +128,7 @@ impl EventCache { let inner = Arc::new(EventCacheInner { client: Arc::downgrade(client), by_room: Default::default(), - store, - process_lock: Default::default(), + store: Arc::new(Mutex::new(store)), drop_handles: Default::default(), }); @@ -181,9 +180,10 @@ impl EventCache { // Forget everything we know; we could have missed events, and we have // no way to reconcile at the moment! // TODO: implement Smart Matching™, + let store = inner.store.lock().await; let mut by_room = inner.by_room.write().await; for room_id in by_room.keys() { - if let Err(err) = inner.store.clear_room_events(room_id).await { + if let Err(err) = store.clear_room_events(room_id).await { error!("unable to clear room after room updates lag: {err}"); } } @@ -230,10 +230,12 @@ impl EventCache { // We could have received events during a previous sync; remove them all, since // we can't know where to insert the "initial events" with respect to // them. - self.inner.store.clear_room_events(room_id).await?; + let store = self.inner.store.lock().await; + + store.clear_room_events(room_id).await?; let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear); - room_cache.inner.append_events(events).await?; + room_cache.inner.append_events(&**store, events).await?; Ok(()) } @@ -248,14 +250,13 @@ struct EventCacheInner { by_room: RwLock>, /// Backend used for storage. - store: Arc, - - /// A lock to make sure that despite multiple updates coming to the - /// `EventCache`, it will only handle one at a time. /// /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to - /// ensure that multiple updates will be applied in the correct order. - process_lock: Mutex<()>, + /// ensure that multiple updates will be applied in the correct order, which + /// is enforced by taking the store lock when handling an update. + /// + /// TODO: replace with a cross-process lock + store: Arc>>, /// Handles to keep alive the task listening to updates. drop_handles: OnceLock>, @@ -271,7 +272,7 @@ impl EventCacheInner { async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> { // First, take the lock that indicates we're processing updates, to avoid // handling multiple updates concurrently. - let _process_lock = self.process_lock.lock().await; + let store = self.store.lock().await; // Left rooms. for (room_id, left_room_update) in updates.leave { @@ -280,7 +281,7 @@ impl EventCacheInner { continue; }; - if let Err(err) = room.inner.handle_left_room_update(left_room_update).await { + if let Err(err) = room.inner.handle_left_room_update(&**store, left_room_update).await { // Non-fatal error, try to continue to the next room. error!("handling left room update: {err}"); } @@ -293,7 +294,9 @@ impl EventCacheInner { continue; }; - if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await { + if let Err(err) = + room.inner.handle_joined_room_update(&**store, joined_room_update).await + { // Non-fatal error, try to continue to the next room. error!("handling joined room update: {err}"); } @@ -358,7 +361,7 @@ impl Debug for RoomEventCache { impl RoomEventCache { /// Create a new [`RoomEventCache`] using the given room and store. - fn new(room: Room, store: Arc) -> Self { + fn new(room: Room, store: Arc>>) -> Self { Self { inner: Arc::new(RoomEventCacheInner::new(room, store)) } } @@ -369,10 +372,9 @@ impl RoomEventCache { pub async fn subscribe( &self, ) -> Result<(Vec, Receiver)> { - Ok(( - self.inner.store.room_events(self.inner.room.room_id()).await?, - self.inner.sender.subscribe(), - )) + let store = self.inner.store.lock().await; + + Ok((store.room_events(self.inner.room.room_id()).await?, self.inner.sender.subscribe())) } } @@ -381,8 +383,10 @@ struct RoomEventCacheInner { /// Sender part for subscribers to this room. sender: Sender, - /// A pointer to the store implementation used for this event cache. - store: Arc, + /// Backend used for storage, shared with the parent [`EventCacheInner`]. + /// + /// See comment there. + store: Arc>>, /// The Client [`Room`] this event cache pertains to. room: Room, @@ -391,13 +395,18 @@ struct RoomEventCacheInner { impl RoomEventCacheInner { /// Creates a new cache for a room, and subscribes to room updates, so as /// to handle new timeline events. - fn new(room: Room, store: Arc) -> Self { + fn new(room: Room, store: Arc>>) -> Self { let sender = Sender::new(32); Self { room, store, sender } } - async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { + async fn handle_joined_room_update( + &self, + store: &dyn EventCacheStore, + updates: JoinedRoomUpdate, + ) -> Result<()> { self.handle_timeline( + store, updates.timeline, updates.ephemeral.clone(), updates.account_data, @@ -409,6 +418,7 @@ impl RoomEventCacheInner { async fn handle_timeline( &self, + store: &dyn EventCacheStore, timeline: Timeline, ephemeral: Vec>, account_data: Vec>, @@ -419,7 +429,7 @@ impl RoomEventCacheInner { // timeline, but we're not there yet. In the meanwhile, clear the // items from the room. TODO: implement Smart Matching™. trace!("limited timeline, clearing all previous events"); - self.store.clear_room_events(self.room.room_id()).await?; + store.clear_room_events(self.room.room_id()).await?; let _ = self.sender.send(RoomEventCacheUpdate::Clear); } @@ -431,7 +441,7 @@ impl RoomEventCacheInner { || !ambiguity_changes.is_empty() { trace!("adding new events"); - self.store.add_room_events(self.room.room_id(), timeline.events.clone()).await?; + store.add_room_events(self.room.room_id(), timeline.events.clone()).await?; // Propagate events to observers. let _ = self.sender.send(RoomEventCacheUpdate::Append { @@ -446,20 +456,34 @@ impl RoomEventCacheInner { Ok(()) } - async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> { - self.handle_timeline(updates.timeline, Vec::new(), Vec::new(), updates.ambiguity_changes) - .await?; + async fn handle_left_room_update( + &self, + store: &dyn EventCacheStore, + updates: LeftRoomUpdate, + ) -> Result<()> { + self.handle_timeline( + store, + updates.timeline, + Vec::new(), + Vec::new(), + updates.ambiguity_changes, + ) + .await?; Ok(()) } /// Append a set of events to the room cache and storage, notifying /// observers. - async fn append_events(&self, events: Vec) -> Result<()> { + async fn append_events( + &self, + store: &dyn EventCacheStore, + events: Vec, + ) -> Result<()> { if events.is_empty() { return Ok(()); } - self.store.add_room_events(self.room.room_id(), events.clone()).await?; + store.add_room_events(self.room.room_id(), events.clone()).await?; let _ = self.sender.send(RoomEventCacheUpdate::Append { events,