Skip to content

Commit

Permalink
event cache: enforce unique access on the EventCacheStore
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Mar 7, 2024
1 parent 0469c27 commit b7d6fd0
Showing 1 changed file with 54 additions and 30 deletions.
84 changes: 54 additions & 30 deletions crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ impl EventCache {
let inner = Arc::new(EventCacheInner {
client: Arc::downgrade(client),
by_room: Default::default(),
store,
process_lock: Default::default(),
store: Arc::new(Mutex::new(store)),
drop_handles: Default::default(),
});

Expand Down Expand Up @@ -181,9 +180,10 @@ impl EventCache {
// Forget everything we know; we could have missed events, and we have
// no way to reconcile at the moment!
// TODO: implement Smart Matching™,
let store = inner.store.lock().await;
let mut by_room = inner.by_room.write().await;
for room_id in by_room.keys() {
if let Err(err) = inner.store.clear_room_events(room_id).await {
if let Err(err) = store.clear_room_events(room_id).await {
error!("unable to clear room after room updates lag: {err}");
}
}
Expand Down Expand Up @@ -230,10 +230,12 @@ impl EventCache {
// We could have received events during a previous sync; remove them all, since
// we can't know where to insert the "initial events" with respect to
// them.
self.inner.store.clear_room_events(room_id).await?;
let store = self.inner.store.lock().await;

store.clear_room_events(room_id).await?;
let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear);

room_cache.inner.append_events(events).await?;
room_cache.inner.append_events(&**store, events).await?;

Ok(())
}
Expand All @@ -248,14 +250,13 @@ struct EventCacheInner {
by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,

/// Backend used for storage.
store: Arc<dyn EventCacheStore>,

/// A lock to make sure that despite multiple updates coming to the
/// `EventCache`, it will only handle one at a time.
///
/// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
/// ensure that multiple updates will be applied in the correct order.
process_lock: Mutex<()>,
/// ensure that multiple updates will be applied in the correct order, which
/// is enforced by taking the store lock when handling an update.
///
/// TODO: replace with a cross-process lock
store: Arc<Mutex<Arc<dyn EventCacheStore>>>,

/// Handles to keep alive the task listening to updates.
drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
Expand All @@ -271,7 +272,7 @@ impl EventCacheInner {
async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
// First, take the lock that indicates we're processing updates, to avoid
// handling multiple updates concurrently.
let _process_lock = self.process_lock.lock().await;
let store = self.store.lock().await;

// Left rooms.
for (room_id, left_room_update) in updates.leave {
Expand All @@ -280,7 +281,7 @@ impl EventCacheInner {
continue;
};

if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
if let Err(err) = room.inner.handle_left_room_update(&**store, left_room_update).await {
// Non-fatal error, try to continue to the next room.
error!("handling left room update: {err}");
}
Expand All @@ -293,7 +294,9 @@ impl EventCacheInner {
continue;
};

if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
if let Err(err) =
room.inner.handle_joined_room_update(&**store, joined_room_update).await
{
// Non-fatal error, try to continue to the next room.
error!("handling joined room update: {err}");
}
Expand Down Expand Up @@ -358,7 +361,7 @@ impl Debug for RoomEventCache {

impl RoomEventCache {
/// Create a new [`RoomEventCache`] using the given room and store.
fn new(room: Room, store: Arc<dyn EventCacheStore>) -> Self {
fn new(room: Room, store: Arc<Mutex<Arc<dyn EventCacheStore>>>) -> Self {
Self { inner: Arc::new(RoomEventCacheInner::new(room, store)) }
}

Expand All @@ -369,10 +372,9 @@ impl RoomEventCache {
pub async fn subscribe(
&self,
) -> Result<(Vec<SyncTimelineEvent>, Receiver<RoomEventCacheUpdate>)> {
Ok((
self.inner.store.room_events(self.inner.room.room_id()).await?,
self.inner.sender.subscribe(),
))
let store = self.inner.store.lock().await;

Ok((store.room_events(self.inner.room.room_id()).await?, self.inner.sender.subscribe()))
}
}

Expand All @@ -381,8 +383,10 @@ struct RoomEventCacheInner {
/// Sender part for subscribers to this room.
sender: Sender<RoomEventCacheUpdate>,

/// A pointer to the store implementation used for this event cache.
store: Arc<dyn EventCacheStore>,
/// Backend used for storage, shared with the parent [`EventCacheInner`].
///
/// See comment there.
store: Arc<Mutex<Arc<dyn EventCacheStore>>>,

/// The Client [`Room`] this event cache pertains to.
room: Room,
Expand All @@ -391,13 +395,18 @@ struct RoomEventCacheInner {
impl RoomEventCacheInner {
/// Creates a new cache for a room, and subscribes to room updates, so as
/// to handle new timeline events.
fn new(room: Room, store: Arc<dyn EventCacheStore>) -> Self {
fn new(room: Room, store: Arc<Mutex<Arc<dyn EventCacheStore>>>) -> Self {
let sender = Sender::new(32);
Self { room, store, sender }
}

async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
async fn handle_joined_room_update(
&self,
store: &dyn EventCacheStore,
updates: JoinedRoomUpdate,
) -> Result<()> {
self.handle_timeline(
store,
updates.timeline,
updates.ephemeral.clone(),
updates.account_data,
Expand All @@ -409,6 +418,7 @@ impl RoomEventCacheInner {

async fn handle_timeline(
&self,
store: &dyn EventCacheStore,
timeline: Timeline,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
Expand All @@ -419,7 +429,7 @@ impl RoomEventCacheInner {
// timeline, but we're not there yet. In the meanwhile, clear the
// items from the room. TODO: implement Smart Matching™.
trace!("limited timeline, clearing all previous events");
self.store.clear_room_events(self.room.room_id()).await?;
store.clear_room_events(self.room.room_id()).await?;
let _ = self.sender.send(RoomEventCacheUpdate::Clear);
}

Expand All @@ -431,7 +441,7 @@ impl RoomEventCacheInner {
|| !ambiguity_changes.is_empty()
{
trace!("adding new events");
self.store.add_room_events(self.room.room_id(), timeline.events.clone()).await?;
store.add_room_events(self.room.room_id(), timeline.events.clone()).await?;

// Propagate events to observers.
let _ = self.sender.send(RoomEventCacheUpdate::Append {
Expand All @@ -446,20 +456,34 @@ impl RoomEventCacheInner {
Ok(())
}

async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
self.handle_timeline(updates.timeline, Vec::new(), Vec::new(), updates.ambiguity_changes)
.await?;
async fn handle_left_room_update(
&self,
store: &dyn EventCacheStore,
updates: LeftRoomUpdate,
) -> Result<()> {
self.handle_timeline(
store,
updates.timeline,
Vec::new(),
Vec::new(),
updates.ambiguity_changes,
)
.await?;
Ok(())
}

/// Append a set of events to the room cache and storage, notifying
/// observers.
async fn append_events(&self, events: Vec<SyncTimelineEvent>) -> Result<()> {
async fn append_events(
&self,
store: &dyn EventCacheStore,
events: Vec<SyncTimelineEvent>,
) -> Result<()> {
if events.is_empty() {
return Ok(());
}

self.store.add_room_events(self.room.room_id(), events.clone()).await?;
store.add_room_events(self.room.room_id(), events.clone()).await?;

let _ = self.sender.send(RoomEventCacheUpdate::Append {
events,
Expand Down

0 comments on commit b7d6fd0

Please sign in to comment.