Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

timeline: after an event cache update lag, fetch previous events back from the cache #3345

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions crates/matrix-sdk-ui/src/timeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ use matrix_sdk::{
executor::spawn,
Room,
};
use ruma::{
events::{receipt::ReceiptType, AnySyncTimelineEvent},
RoomVersionId,
};
use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
use tokio::sync::{broadcast, mpsc};
use tracing::{info, info_span, trace, warn, Instrument, Span};

Expand All @@ -35,7 +32,7 @@ use super::{
queue::send_queued_messages,
BackPaginationStatus, Timeline, TimelineDropHandle,
};
use crate::{timeline::inner::TimelineEnd, unable_to_decrypt_hook::UtdHookManager};
use crate::unable_to_decrypt_hook::UtdHookManager;

/// Builder that allows creating and configuring various parts of a
/// [`Timeline`].
Expand Down Expand Up @@ -137,26 +134,16 @@ impl TimelineBuilder {
let (events, mut event_subscriber) = room_event_cache.subscribe().await?;

let has_events = !events.is_empty();
let track_read_marker_and_receipts = settings.track_read_receipts;

let mut inner = TimelineInner::new(room, unable_to_decrypt_hook).with_settings(settings);

if track_read_marker_and_receipts {
inner.populate_initial_user_receipt(ReceiptType::Read).await;
inner.populate_initial_user_receipt(ReceiptType::ReadPrivate).await;
}
let inner = TimelineInner::new(room, unable_to_decrypt_hook).with_settings(settings);

if has_events {
inner.add_events_at(events, TimelineEnd::Back { from_cache: true }).await;
}
if track_read_marker_and_receipts {
inner.load_fully_read_event().await;
}
inner.replace_with_initial_events(events).await;

let room = inner.room();
let client = room.client();

let room_update_join_handle = spawn({
let room_event_cache = room_event_cache.clone();
let inner = inner.clone();

let span =
Expand All @@ -177,7 +164,23 @@ impl TimelineBuilder {
num_skipped,
"Lagged behind event cache updates, resetting timeline"
);
inner.clear().await;

// The updates might have lagged, but the room event cache might have
// events, so retrieve them and add them back again to the timeline,
// after clearing it.
//
// If we can't get a handle on the room cache's events, just clear the
// current timeline.
match room_event_cache.subscribe().await {
Ok((events, _)) => {
inner.replace_with_initial_events(events).await;
}
Err(err) => {
warn!("Error when re-inserting initial events into the timeline: {err}");
inner.clear().await;
}
}

continue;
}
};
Expand Down
36 changes: 35 additions & 1 deletion crates/matrix-sdk-ui/src/timeline/inner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {

/// Populates our own latest read receipt in the in-memory by-user read
/// receipt cache.
pub(super) async fn populate_initial_user_receipt(&mut self, receipt_type: ReceiptType) {
pub(super) async fn populate_initial_user_receipt(&self, receipt_type: ReceiptType) {
let own_user_id = self.room_data_provider.own_user_id().to_owned();

let mut read_receipt = self
Expand Down Expand Up @@ -965,6 +965,40 @@ impl TimelineInner {
&self.room_data_provider
}

/// Replaces the content of the current timeline with initial events.
///
/// Also sets up read receipts and the read marker for a live timeline of a
/// room.
///
/// This is all done with a single lock guard, since we don't want the state
/// to be modified between the clear and re-insertion of new events.
pub(super) async fn replace_with_initial_events(&self, events: Vec<SyncTimelineEvent>) {
let mut state = self.state.write().await;

state.clear();

let track_read_markers = self.settings.track_read_receipts;
if track_read_markers {
self.populate_initial_user_receipt(ReceiptType::Read).await;
self.populate_initial_user_receipt(ReceiptType::ReadPrivate).await;
}

if !events.is_empty() {
state
.add_events_at(
events,
TimelineEnd::Back { from_cache: true },
&self.room_data_provider,
&self.settings,
)
.await;
}

if track_read_markers {
self.load_fully_read_event().await;
}
}

/// Get the current fully-read event, from storage.
pub(super) async fn fully_read_event(&self) -> Option<FullyReadEvent> {
match self.room().account_data_static().await {
Expand Down
Loading