diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 1087bdd4813..c72a39f1bf1 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -62,16 +62,17 @@ use ruma::{ use tokio::{ sync::{ broadcast::{error::RecvError, Receiver, Sender}, - Mutex, Notify, RwLock, + Mutex, Notify, RwLock, RwLockReadGuard, RwLockWriteGuard, }, time::timeout, }; use tracing::{error, instrument, trace, warn}; -use self::store::{EventCacheStore, MemoryStore, TimelineEntry}; -use crate::{ - client::ClientInner, event_cache::store::PaginationToken, room::MessagesOptions, Client, Room, +use self::{ + linked_chunk::ChunkContent, + store::{Gap, PaginationToken, RoomEvents}, }; +use crate::{client::ClientInner, room::MessagesOptions, Client, Room}; mod linked_chunk; mod store; @@ -149,15 +150,14 @@ impl Debug for EventCache { impl EventCache { /// Create a new [`EventCache`] for the given client. pub(crate) fn new(client: &Arc) -> Self { - let store = Arc::new(MemoryStore::new()); - let inner = Arc::new(EventCacheInner { - client: Arc::downgrade(client), - by_room: Default::default(), - store: Arc::new(Mutex::new(store)), - drop_handles: Default::default(), - }); - - Self { inner } + Self { + inner: Arc::new(EventCacheInner { + client: Arc::downgrade(client), + multiple_room_updates_lock: Default::default(), + by_room: Default::default(), + drop_handles: Default::default(), + }), + } } /// Starts subscribing the [`EventCache`] to sync responses, if not done @@ -205,14 +205,7 @@ impl EventCache { // Forget everything we know; we could have missed events, and we have // no way to reconcile at the moment! // TODO: implement Smart Matching™, - let store = inner.store.lock().await; - let mut by_room = inner.by_room.write().await; - for room_id in by_room.keys() { - if let Err(err) = store.clear_room(room_id).await { - error!("unable to clear room after room updates lag: {err}"); - } - } - by_room.clear(); + inner.by_room.write().await.clear(); } Err(RecvError::Closed) => { @@ -256,15 +249,10 @@ impl EventCache { // We could have received events during a previous sync; remove them all, since // we can't know where to insert the "initial events" with respect to // them. - let store = self.inner.store.lock().await; - - store.clear_room(room_id).await?; - let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear); room_cache .inner - .append_events( - &**store, + .replace_all_events_by( events, prev_batch, Default::default(), @@ -282,17 +270,16 @@ struct EventCacheInner { /// on the owning client. client: Weak, - /// Lazily-filled cache of live [`RoomEventCache`], once per room. - by_room: RwLock>, - - /// Backend used for storage. + /// A lock used when many rooms must be updated at once. /// /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to /// ensure that multiple updates will be applied in the correct order, which - /// is enforced by taking the store lock when handling an update. - /// - /// TODO: replace with a cross-process lock - store: Arc>>, + /// is enforced by taking this lock when handling an update. + // TODO: that's the place to add a cross-process lock! + multiple_room_updates_lock: Mutex<()>, + + /// Lazily-filled cache of live [`RoomEventCache`], once per room. + by_room: RwLock>, /// Handles to keep alive the task listening to updates. drop_handles: OnceLock>, @@ -308,7 +295,7 @@ impl EventCacheInner { async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> { // First, take the lock that indicates we're processing updates, to avoid // handling multiple updates concurrently. - let store = self.store.lock().await; + let _lock = self.multiple_room_updates_lock.lock().await; // Left rooms. for (room_id, left_room_update) in updates.leave { @@ -317,7 +304,7 @@ impl EventCacheInner { continue; }; - if let Err(err) = room.inner.handle_left_room_update(&**store, left_room_update).await { + if let Err(err) = room.inner.handle_left_room_update(left_room_update).await { // Non-fatal error, try to continue to the next room. error!("handling left room update: {err}"); } @@ -330,9 +317,7 @@ impl EventCacheInner { continue; }; - if let Err(err) = - room.inner.handle_joined_room_update(&**store, joined_room_update).await - { + if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await { // Non-fatal error, try to continue to the next room. error!("handling joined room update: {err}"); } @@ -371,7 +356,7 @@ impl EventCacheInner { return Ok(None); }; - let room_event_cache = RoomEventCache::new(room, self.store.clone()); + let room_event_cache = RoomEventCache::new(room); by_room_guard.insert(room_id.to_owned(), room_event_cache.clone()); @@ -397,8 +382,8 @@ impl Debug for RoomEventCache { impl RoomEventCache { /// Create a new [`RoomEventCache`] using the given room and store. - fn new(room: Room, store: Arc>>) -> Self { - Self { inner: Arc::new(RoomEventCacheInner::new(room, store)) } + fn new(room: Room) -> Self { + Self { inner: Arc::new(RoomEventCacheInner::new(room)) } } /// Subscribe to room updates for this room, after getting the initial list @@ -408,9 +393,10 @@ impl RoomEventCache { pub async fn subscribe( &self, ) -> Result<(Vec, Receiver)> { - let store = self.inner.store.lock().await; + let events = + self.inner.events.read().await.events().map(|(_position, item)| item.clone()).collect(); - Ok((store.room_events(self.inner.room.room_id()).await?, self.inner.sender.subscribe())) + Ok((events, self.inner.sender.subscribe())) } /// Returns the oldest back-pagination token, that is, the one closest to @@ -447,14 +433,12 @@ struct RoomEventCacheInner { /// Sender part for subscribers to this room. sender: Sender, - /// Backend used for storage, shared with the parent [`EventCacheInner`]. - /// - /// See comment there. - store: Arc>>, - /// The Client [`Room`] this event cache pertains to. room: Room, + /// The events of the room. + events: RwLock, + /// A notifier that we received a new pagination token. pagination_token_notifier: Notify, @@ -466,24 +450,20 @@ struct RoomEventCacheInner { impl RoomEventCacheInner { /// Creates a new cache for a room, and subscribes to room updates, so as /// to handle new timeline events. - fn new(room: Room, store: Arc>>) -> Self { + fn new(room: Room) -> Self { let sender = Sender::new(32); + Self { room, - store, + events: RwLock::new(RoomEvents::default()), sender, pagination_lock: Default::default(), pagination_token_notifier: Default::default(), } } - async fn handle_joined_room_update( - &self, - store: &dyn EventCacheStore, - updates: JoinedRoomUpdate, - ) -> Result<()> { + async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { self.handle_timeline( - store, updates.timeline, updates.ephemeral.clone(), updates.account_data, @@ -495,7 +475,6 @@ impl RoomEventCacheInner { async fn handle_timeline( &self, - store: &dyn EventCacheStore, timeline: Timeline, ephemeral: Vec>, account_data: Vec>, @@ -505,51 +484,99 @@ impl RoomEventCacheInner { // Ideally we'd try to reconcile existing events against those received in the // timeline, but we're not there yet. In the meanwhile, clear the // items from the room. TODO: implement Smart Matching™. - trace!("limited timeline, clearing all previous events"); + trace!("limited timeline, clearing all previous events and pushing new events"); + + self.replace_all_events_by( + timeline.events, + timeline.prev_batch, + account_data, + ephemeral, + ambiguity_changes, + ) + .await?; + } else { + // Add all the events to the backend. + trace!("adding new events"); + + self.append_new_events( + timeline.events, + timeline.prev_batch, + account_data, + ephemeral, + ambiguity_changes, + ) + .await?; + } - // Clear internal state (events, pagination tokens, etc.). - store.clear_room(self.room.room_id()).await?; + Ok(()) + } - // Propagate to observers. - let _ = self.sender.send(RoomEventCacheUpdate::Clear); - } + async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> { + self.handle_timeline(updates.timeline, Vec::new(), Vec::new(), updates.ambiguity_changes) + .await?; + Ok(()) + } - // Add all the events to the backend. - trace!("adding new events"); - self.append_events( - store, - timeline.events, - timeline.prev_batch, + /// Remove existing events, and append a set of events to the room cache and + /// storage, notifying observers. + async fn replace_all_events_by( + &self, + events: Vec, + prev_batch: Option, + account_data: Vec>, + ephemeral: Vec>, + ambiguity_changes: BTreeMap, + ) -> Result<()> { + // Acquire the lock. + let mut room_events = self.events.write().await; + + // Reset the events. + room_events.reset(); + + // Propagate to observers. + let _ = self.sender.send(RoomEventCacheUpdate::Clear); + + // Push the new events. + self.append_events_locked_impl( + room_events, + events, + prev_batch, account_data, ephemeral, ambiguity_changes, ) - .await?; - - Ok(()) + .await } - async fn handle_left_room_update( + /// Append a set of events to the room cache and storage, notifying + /// observers. + async fn append_new_events( &self, - store: &dyn EventCacheStore, - updates: LeftRoomUpdate, + events: Vec, + prev_batch: Option, + account_data: Vec>, + ephemeral: Vec>, + ambiguity_changes: BTreeMap, ) -> Result<()> { - self.handle_timeline( - store, - updates.timeline, - Vec::new(), - Vec::new(), - updates.ambiguity_changes, + self.append_events_locked_impl( + self.events.write().await, + events, + prev_batch, + account_data, + ephemeral, + ambiguity_changes, ) - .await?; - Ok(()) + .await } - /// Append a set of events to the room cache and storage, notifying - /// observers. - async fn append_events( + /// Append a set of events, with an attached lock. + /// + /// If the lock `room_events` is `None`, one will be created. + /// + /// This is a private implementation. It must not be exposed publicly. + async fn append_events_locked_impl( &self, - store: &dyn EventCacheStore, + mut room_events: RwLockWriteGuard<'_, RoomEvents>, events: Vec, prev_batch: Option, account_data: Vec>, @@ -565,22 +592,18 @@ impl RoomEventCacheInner { return Ok(()); } - let room_id = self.room.room_id(); - // Add the previous back-pagination token (if present), followed by the timeline // events themselves. - let gap_with_token = prev_batch - .clone() - .map(|val| TimelineEntry::Gap { prev_token: PaginationToken(val) }) - .into_iter(); - - store - .append_room_entries( - room_id, - gap_with_token.chain(events.iter().cloned().map(TimelineEntry::Event)).collect(), - ) - .await?; + { + if let Some(prev_token) = &prev_batch { + room_events.push_gap(Gap { prev_token: PaginationToken(prev_token.clone()) }); + } + room_events.push_events(events.clone().into_iter()); + } + + // Now that all events have been added, we can trigger the + // `pagination_token_notifier`. if prev_batch.is_some() { self.pagination_token_notifier.notify_one(); } @@ -610,13 +633,7 @@ impl RoomEventCacheInner { // Make sure there's at most one back-pagination request. let _guard = self.pagination_lock.lock().await; - if let Some(token) = token.as_ref() { - let store = self.store.lock().await; - if !store.contains_gap(self.room.room_id(), token).await? { - return Err(EventCacheError::UnknownBackpaginationToken); - } - } - + // Get messages. let messages = self .room .messages(assign!(MessagesOptions::backward(), { @@ -626,10 +643,32 @@ impl RoomEventCacheInner { .await .map_err(EventCacheError::SdkError)?; + // Make sure the `RoomEvents` isn't updated while we are saving events from + // backpagination. + let mut room_events = self.events.write().await; + + // Check that the `token` exists if any. + let gap_identifier = if let Some(token) = token.as_ref() { + let gap_identifier = room_events.chunk_identifier(|chunk| { + matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if prev_token == token) + }); + + // The method has been called with `token` but it doesn't exist in `RoomEvents`, + // it's an error. + if gap_identifier.is_none() { + return Ok(BackPaginationOutcome::UnknownBackpaginationToken); + } + + gap_identifier + } else { + None + }; + // Would we want to backpaginate again, we'd start from the `end` token as the // next `from` token. - let prev_token = messages.end; + let prev_token = + messages.end.map(|prev_token| Gap { prev_token: PaginationToken(prev_token) }); // If this token is missing, then we've reached the end of the timeline. let reached_start = prev_token.is_none(); @@ -640,41 +679,78 @@ impl RoomEventCacheInner { // should be prepended first). let events = messages.chunk; - // Prepend the previous token (if any) at the beginning of the timeline, - // followed by the events received in the response (in reverse order). - let new_gap = prev_token - .map(|token| TimelineEntry::Gap { prev_token: PaginationToken(token) }) - .into_iter(); + let sync_events = events + .iter() + // Reverse the order of the events as `/messages` has been called with `dir=b` + // (backward). The `RoomEvents` API expects the first event to be the oldest. + .rev() + .cloned() + .map(SyncTimelineEvent::from); + + // There is a `token`/gap, let's replace it by new events! + if let Some(gap_identifier) = gap_identifier { + let new_position = { + // Replace the gap by new events. + let new_chunk = room_events + .replace_gap_at(sync_events, gap_identifier) + // SAFETY: we are sure that `gap_identifier` represents a valid + // `ChunkIdentifier` for a `Gap` chunk. + .expect("The `gap_identifier` must represent a `Gap`"); + + new_chunk.first_position() + }; - // For storage, reverse events to store them in the normal (non-reversed order). - // - // It's fine to convert from `TimelineEvent` (i.e. that has a room id) to - // `SyncTimelineEvent` (i.e. that doesn't have it), because those events are - // always tied to a room in storage anyways. - let new_events = events.iter().rev().map(|ev| TimelineEntry::Event(ev.clone().into())); - - let replaced = self - .store - .lock() - .await - .replace_gap(self.room.room_id(), token.as_ref(), new_gap.chain(new_events).collect()) - .await?; + // And insert a new gap if there is any `prev_token`. + if let Some(prev_token_gap) = prev_token { + room_events + .insert_gap_at(prev_token_gap, new_position) + // SAFETY: we are sure that `new_position` represents a valid + // `ChunkIdentifier` for an `Item` chunk. + .expect("The `new_position` must represent an `Item`"); + } - if !replaced { - // The previous token disappeared! - // This can happen if we got a limited timeline and lost track of our pagination - // token, because the whole timeline has been reset. - // - // TODO: With smarter reconciliation, this might get away. In the meanwhile, - // early return and forget about all the events. - trace!("gap was missing, likely because we observed a gappy sync response"); - Ok(BackPaginationOutcome::UnknownBackpaginationToken) - } else { trace!("replaced gap with new events from backpagination"); // TODO: implement smarter reconciliation later //let _ = self.sender.send(RoomEventCacheUpdate::Prepend { events }); + Ok(BackPaginationOutcome::Success { events, reached_start }) + } else { + // There is no `token`/gap identifier. Let's assume we must prepend the new + // events. + let first_item_position = + room_events.events().next().map(|(item_position, _)| item_position); + + match first_item_position { + // Is there a first item? Insert at this position. + Some(first_item_position) => { + if let Some(prev_token_gap) = prev_token { + room_events + .insert_gap_at(prev_token_gap, first_item_position) + // SAFETY: The `first_item_position` can only be an `Item` chunk, it's + // an invariant of `LinkedChunk`. Also, it can only represent a valid + // `ChunkIdentifier` as the data structure isn't modified yet. + .expect("The `first_item_position` must represent a valid `Item`"); + } + + room_events + .insert_events_at(sync_events, first_item_position) + // SAFETY: The `first_item_position` can only be an `Item` chunk, it's + // an invariant of `LinkedChunk`. The chunk it points to has not been + // removed. + .expect("The `first_item_position` must represent an `Item`"); + } + + // There is no first item. Let's simply push. + None => { + if let Some(prev_token_gap) = prev_token { + room_events.push_gap(prev_token_gap); + } + + room_events.push_events(sync_events); + } + } + Ok(BackPaginationOutcome::Success { events, reached_start }) } } @@ -689,14 +765,19 @@ impl RoomEventCacheInner { max_wait: Option, ) -> Result> { // Optimistically try to return the backpagination token immediately. - if let Some(token) = - self.store.lock().await.oldest_backpagination_token(self.room.room_id()).await? - { + fn get_oldest(room_events: RwLockReadGuard<'_, RoomEvents>) -> Option { + room_events.chunks().find_map(|chunk| match chunk.content() { + ChunkContent::Gap(gap) => Some(gap.prev_token.clone()), + ChunkContent::Items(..) => None, + }) + } + + if let Some(token) = get_oldest(self.events.read().await) { return Ok(Some(token)); } let Some(max_wait) = max_wait else { - // We had no token and no time to wait, so... no tokens. + // We had no token and no time to wait, so… no tokens. return Ok(None); }; @@ -704,7 +785,7 @@ impl RoomEventCacheInner { // Timeouts are fine, per this function's contract. let _ = timeout(max_wait, self.pagination_token_notifier.notified()).await; - self.store.lock().await.oldest_backpagination_token(self.room.room_id()).await + Ok(get_oldest(self.events.read().await)) } } @@ -758,13 +839,12 @@ pub enum RoomEventCacheUpdate { #[cfg(test)] mod tests { - use assert_matches2::assert_matches; use matrix_sdk_common::executor::spawn; use matrix_sdk_test::{async_test, sync_timeline_event}; use ruma::room_id; - use super::{store::TimelineEntry, EventCacheError}; + use super::{BackPaginationOutcome, EventCacheError}; use crate::{event_cache::store::PaginationToken, test_utils::logged_in_client}; #[async_test] @@ -783,70 +863,84 @@ mod tests { assert_matches!(result, Err(EventCacheError::NotSubscribedYet)); } - #[async_test] - async fn test_unknown_pagination_token() { - let client = logged_in_client(None).await; - let room_id = room_id!("!galette:saucisse.bzh"); - client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); - - client.event_cache().subscribe().unwrap(); - - let (room_event_cache, _drop_handles) = - client.event_cache().for_room(room_id).await.unwrap(); - let room_event_cache = room_event_cache.unwrap(); - - // If I try to back-paginate with an unknown back-pagination token, - let token = PaginationToken("old".to_owned()); - - // Then I run into an error. - let res = room_event_cache.backpaginate(20, Some(token)).await; - assert_matches!(res.unwrap_err(), EventCacheError::UnknownBackpaginationToken); - } - // Those tests require time to work, and it does not on wasm32. #[cfg(not(target_arch = "wasm32"))] mod time_tests { use std::time::{Duration, Instant}; + use matrix_sdk_base::RoomState; + use serde_json::json; use tokio::time::sleep; + use wiremock::{ + matchers::{header, method, path_regex, query_param}, + Mock, ResponseTemplate, + }; - use super::*; + use super::{super::store::Gap, *}; + use crate::test_utils::logged_in_client_with_server; #[async_test] - async fn test_wait_no_pagination_token() { - let client = logged_in_client(None).await; + async fn test_unknown_pagination_token() { + let (client, server) = logged_in_client_with_server().await; + let room_id = room_id!("!galette:saucisse.bzh"); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); client.event_cache().subscribe().unwrap(); - // When I only have events in a room, - client - .event_cache() - .inner - .store - .lock() - .await - .append_room_entries( - room_id, - vec![TimelineEntry::Event( - sync_timeline_event!({ - "sender": "b@z.h", - "type": "m.room.message", - "event_id": "$ida", - "origin_server_ts": 12344446, - "content": { "body":"yolo", "msgtype": "m.text" }, - }) - .into(), - )], - ) - .await - .unwrap(); - - let (room_event_cache, _drop_handlers) = + let (room_event_cache, _drop_handles) = client.event_cache().for_room(room_id).await.unwrap(); let room_event_cache = room_event_cache.unwrap(); + // If I try to back-paginate with an unknown back-pagination token, + let token_name = "unknown"; + let token = PaginationToken(token_name.to_owned()); + + // Then I run into an error. + Mock::given(method("GET")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$")) + .and(header("authorization", "Bearer 1234")) + .and(query_param("from", token_name)) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "start": token_name, + "chunk": [], + }))) + .expect(1) + .mount(&server) + .await; + + let res = room_event_cache.backpaginate(20, Some(token)).await; + assert_matches!(res, Ok(BackPaginationOutcome::UnknownBackpaginationToken)); + + server.verify().await + } + + #[async_test] + async fn test_wait_no_pagination_token() { + let client = logged_in_client(None).await; + let room_id = room_id!("!galette:saucisse.bzh"); + client.base_client().get_or_create_room(room_id, RoomState::Joined); + + let event_cache = client.event_cache(); + + event_cache.subscribe().unwrap(); + + let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap(); + let room_event_cache = room_event_cache.unwrap(); + + // When I only have events in a room, + { + let mut room_events = room_event_cache.inner.events.write().await; + room_events.push_events([sync_timeline_event!({ + "sender": "b@z.h", + "type": "m.room.message", + "event_id": "$ida", + "origin_server_ts": 12344446, + "content": { "body":"yolo", "msgtype": "m.text" }, + }) + .into()]); + } + // If I don't wait for the backpagination token, let found = room_event_cache.oldest_backpagination_token(None).await.unwrap(); // Then I don't find it. @@ -884,39 +978,28 @@ mod tests { let room_id = room_id!("!galette:saucisse.bzh"); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); - client.event_cache().subscribe().unwrap(); + let event_cache = client.event_cache(); - let (room_event_cache, _drop_handles) = - client.event_cache().for_room(room_id).await.unwrap(); + event_cache.subscribe().unwrap(); + + let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap(); let room_event_cache = room_event_cache.unwrap(); let expected_token = PaginationToken("old".to_owned()); // When I have events and multiple gaps, in a room, - client - .event_cache() - .inner - .store - .lock() - .await - .append_room_entries( - room_id, - vec![ - TimelineEntry::Gap { prev_token: expected_token.clone() }, - TimelineEntry::Event( - sync_timeline_event!({ - "sender": "b@z.h", - "type": "m.room.message", - "event_id": "$ida", - "origin_server_ts": 12344446, - "content": { "body":"yolo", "msgtype": "m.text" }, - }) - .into(), - ), - ], - ) - .await - .unwrap(); + { + let mut room_events = room_event_cache.inner.events.write().await; + room_events.push_gap(Gap { prev_token: expected_token.clone() }); + room_events.push_events([sync_timeline_event!({ + "sender": "b@z.h", + "type": "m.room.message", + "event_id": "$ida", + "origin_server_ts": 12344446, + "content": { "body":"yolo", "msgtype": "m.text" }, + }) + .into()]); + } // If I don't wait for a back-pagination token, let found = room_event_cache.oldest_backpagination_token(None).await.unwrap(); @@ -954,32 +1037,26 @@ mod tests { let room_id = room_id!("!galette:saucisse.bzh"); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); - client.event_cache().subscribe().unwrap(); + let event_cache = client.event_cache(); - let (room_event_cache, _drop_handles) = - client.event_cache().for_room(room_id).await.unwrap(); + event_cache.subscribe().unwrap(); + + let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap(); let room_event_cache = room_event_cache.unwrap(); let expected_token = PaginationToken("old".to_owned()); let before = Instant::now(); let cloned_expected_token = expected_token.clone(); + let cloned_room_event_cache = room_event_cache.clone(); let insert_token_task = spawn(async move { // If a backpagination token is inserted after 400 milliseconds, sleep(Duration::from_millis(400)).await; - client - .event_cache() - .inner - .store - .lock() - .await - .append_room_entries( - room_id, - vec![TimelineEntry::Gap { prev_token: cloned_expected_token }], - ) - .await - .unwrap(); + { + let mut room_events = cloned_room_event_cache.inner.events.write().await; + room_events.push_gap(Gap { prev_token: cloned_expected_token }); + } }); // Then first I don't get it (if I'm not waiting,) diff --git a/crates/matrix-sdk/src/event_cache/store.rs b/crates/matrix-sdk/src/event_cache/store.rs index f5d12ae97e8..80b87fa5020 100644 --- a/crates/matrix-sdk/src/event_cache/store.rs +++ b/crates/matrix-sdk/src/event_cache/store.rs @@ -12,199 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::BTreeMap, fmt, iter::once, result::Result as StdResult}; +use std::{fmt, iter::once}; -use async_trait::async_trait; use matrix_sdk_common::deserialized_responses::SyncTimelineEvent; -use ruma::{OwnedRoomId, RoomId}; -use tokio::sync::RwLock; -use super::{ - linked_chunk::{ - Chunk, ChunkIdentifier, LinkedChunk, LinkedChunkError, LinkedChunkIter, - LinkedChunkIterBackward, Position, - }, - Result, +use super::linked_chunk::{ + Chunk, ChunkIdentifier, LinkedChunk, LinkedChunkError, LinkedChunkIter, + LinkedChunkIterBackward, Position, }; -/// A store that can be remember information about the event cache. -/// -/// It really acts as a cache, in the sense that clearing the backing data -/// should not have any irremediable effect, other than providing a lesser user -/// experience. -#[async_trait] -pub trait EventCacheStore: Send + Sync { - /// Returns all the known events for the given room. - async fn room_events(&self, room: &RoomId) -> Result>; - - /// Adds all the entries to the given room's timeline. - async fn append_room_entries(&self, room: &RoomId, entries: Vec) -> Result<()>; - - /// Returns whether the store knows about the given pagination token. - async fn contains_gap(&self, room: &RoomId, pagination_token: &PaginationToken) - -> Result; - - /// Replaces a given gap (identified by its pagination token) with the given - /// entries. - /// - /// Note: if the gap hasn't been found, then nothing happens, and the events - /// are lost. - /// - /// Returns whether the gap was found. - async fn replace_gap( - &self, - room: &RoomId, - gap_id: Option<&PaginationToken>, - entries: Vec, - ) -> Result; - - /// Retrieve the oldest backpagination token for the given room. - async fn oldest_backpagination_token(&self, room: &RoomId) -> Result>; - - /// Clear all the information tied to a given room. - /// - /// This forgets the following: - /// - events in the room - /// - pagination tokens - async fn clear_room(&self, room: &RoomId) -> Result<()>; -} - /// A newtype wrapper for a pagination token returned by a /messages response. #[derive(Clone, Debug, PartialEq)] pub struct PaginationToken(pub String); -#[derive(Clone)] -pub enum TimelineEntry { - Event(SyncTimelineEvent), - - Gap { - /// The token to use in the query, extracted from a previous "from" / - /// "end" field of a `/messages` response. - prev_token: PaginationToken, - }, -} - -/// All the information related to a room and stored in the event cache. -#[derive(Default)] -struct RoomInfo { - /// All the timeline entries per room, in sync order. - entries: Vec, -} - -impl RoomInfo { - fn clear(&mut self) { - self.entries.clear(); - } -} - -/// An [`EventCacheStore`] implementation that keeps all the information in -/// memory. -#[derive(Default)] -pub(crate) struct MemoryStore { - by_room: RwLock>, -} - -impl MemoryStore { - /// Create a new empty [`MemoryStore`]. - pub fn new() -> Self { - Default::default() - } -} - -#[async_trait] -impl EventCacheStore for MemoryStore { - async fn room_events(&self, room: &RoomId) -> Result> { - Ok(self - .by_room - .read() - .await - .get(room) - .map(|room_info| { - room_info - .entries - .iter() - .filter_map( - |entry| if let TimelineEntry::Event(ev) = entry { Some(ev) } else { None }, - ) - .cloned() - .collect() - }) - .unwrap_or_default()) - } - - async fn append_room_entries(&self, room: &RoomId, entries: Vec) -> Result<()> { - self.by_room.write().await.entry(room.to_owned()).or_default().entries.extend(entries); - Ok(()) - } - - async fn clear_room(&self, room: &RoomId) -> Result<()> { - // Clear the room, so as to avoid reallocations if the room is being reused. - // XXX: do we also want an actual way to *remove* a room? (for left rooms) - if let Some(room) = self.by_room.write().await.get_mut(room) { - room.clear(); - } - - Ok(()) - } - - async fn oldest_backpagination_token(&self, room: &RoomId) -> Result> { - Ok(self.by_room.read().await.get(room).and_then(|room| { - room.entries.iter().find_map(|entry| { - if let TimelineEntry::Gap { prev_token: backpagination_token } = entry { - Some(backpagination_token.clone()) - } else { - None - } - }) - })) - } - - async fn contains_gap(&self, room: &RoomId, needle: &PaginationToken) -> Result { - let mut by_room_guard = self.by_room.write().await; - let room = by_room_guard.entry(room.to_owned()).or_default(); - - Ok(room.entries.iter().any(|entry| { - if let TimelineEntry::Gap { prev_token: existing } = entry { - existing == needle - } else { - false - } - })) - } - - async fn replace_gap( - &self, - room: &RoomId, - token: Option<&PaginationToken>, - entries: Vec, - ) -> Result { - let mut by_room_guard = self.by_room.write().await; - let room = by_room_guard.entry(room.to_owned()).or_default(); - - if let Some(token) = token { - let gap_pos = room.entries.iter().enumerate().find_map(|(i, t)| { - if let TimelineEntry::Gap { prev_token: existing } = t { - if existing == token { - return Some(i); - } - } - None - }); - - if let Some(pos) = gap_pos { - room.entries.splice(pos..pos + 1, entries); - Ok(true) - } else { - Ok(false) - } - } else { - // We had no previous token: assume we can prepend the events. - room.entries.splice(0..0, entries); - Ok(true) - } - } -} - #[derive(Debug)] pub struct Gap { /// The token to use in the query, extracted from a previous "from" / @@ -230,6 +50,11 @@ impl RoomEvents { Self { chunks: LinkedChunk::new() } } + /// Clear all events. + pub fn reset(&mut self) { + self.chunks = LinkedChunk::new(); + } + /// Return the number of events. pub fn len(&self) -> usize { self.chunks.len() @@ -240,7 +65,7 @@ impl RoomEvents { self.push_events(once(event)) } - /// Push events after existing events. + /// Push events after all events or gaps. /// /// The last event in `events` is the most recent one. pub fn push_events(&mut self, events: I) @@ -251,12 +76,17 @@ impl RoomEvents { self.chunks.push_items_back(events) } + /// Push a gap after all events or gaps. + pub fn push_gap(&mut self, gap: Gap) { + self.chunks.push_gap_back(gap) + } + /// Insert events at a specified position. pub fn insert_events_at( &mut self, events: I, position: Position, - ) -> StdResult<(), LinkedChunkError> + ) -> Result<(), LinkedChunkError> where I: IntoIterator, I::IntoIter: ExactSizeIterator, @@ -265,14 +95,29 @@ impl RoomEvents { } /// Insert a gap at a specified position. - pub fn insert_gap_at( - &mut self, - gap: Gap, - position: Position, - ) -> StdResult<(), LinkedChunkError> { + pub fn insert_gap_at(&mut self, gap: Gap, position: Position) -> Result<(), LinkedChunkError> { self.chunks.insert_gap_at(gap, position) } + /// Replace the gap identified by `gap_identifier`, by events. + /// + /// Because the `gap_identifier` can represent non-gap chunk, this method + /// returns a `Result`. + /// + /// This method returns a reference to the (first if many) newly created + /// `Chunk` that contains the `items`. + pub fn replace_gap_at( + &mut self, + events: I, + gap_identifier: ChunkIdentifier, + ) -> Result<&Chunk, LinkedChunkError> + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + { + self.chunks.replace_gap_at(events, gap_identifier) + } + /// Search for a chunk, and return its identifier. pub fn chunk_identifier<'a, P>(&'a self, predicate: P) -> Option where @@ -298,11 +143,18 @@ impl RoomEvents { self.chunks.rchunks() } + /// Iterate over the chunks, forward. + /// + /// The oldest chunk comes first. + pub fn chunks(&self) -> LinkedChunkIter<'_, SyncTimelineEvent, Gap, DEFAULT_CHUNK_CAPACITY> { + self.chunks.chunks() + } + /// Iterate over the chunks, starting from `identifier`, backward. pub fn rchunks_from( &self, identifier: ChunkIdentifier, - ) -> StdResult< + ) -> Result< LinkedChunkIterBackward<'_, SyncTimelineEvent, Gap, DEFAULT_CHUNK_CAPACITY>, LinkedChunkError, > { @@ -314,10 +166,8 @@ impl RoomEvents { pub fn chunks_from( &self, identifier: ChunkIdentifier, - ) -> StdResult< - LinkedChunkIter<'_, SyncTimelineEvent, Gap, DEFAULT_CHUNK_CAPACITY>, - LinkedChunkError, - > { + ) -> Result, LinkedChunkError> + { self.chunks.chunks_from(identifier) } @@ -328,11 +178,18 @@ impl RoomEvents { self.chunks.ritems() } + /// Iterate over the events, forward. + /// + /// The oldest event comes first. + pub fn events(&self) -> impl Iterator { + self.chunks.items() + } + /// Iterate over the events, starting from `position`, backward. pub fn revents_from( &self, position: Position, - ) -> StdResult, LinkedChunkError> { + ) -> Result, LinkedChunkError> { self.chunks.ritems_from(position) } @@ -341,13 +198,13 @@ impl RoomEvents { pub fn events_from( &self, position: Position, - ) -> StdResult, LinkedChunkError> { + ) -> Result, LinkedChunkError> { self.chunks.items_from(position) } } impl fmt::Debug for RoomEvents { - fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { formatter.debug_struct("RoomEvents").field("chunk", &self.chunks).finish() } }