Skip to content

Commit

Permalink
timeline: prevent deadlock in replace_with_initial_events
Browse files Browse the repository at this point in the history
The `state` lock was taken at the top level of this function, and
indirectly implicitly in the `set_fully_read_event` function. This fixes
it, and adds a regression test.
  • Loading branch information
bnjbvr committed Apr 22, 2024
1 parent c471ee4 commit f63187f
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 75 deletions.
107 changes: 43 additions & 64 deletions crates/matrix-sdk-ui/src/timeline/inner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use ruma::RoomId;
use ruma::{
api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
events::{
fully_read::FullyReadEvent,
poll::unstable_start::UnstablePollStartEventContent,
reaction::ReactionEventContent,
receipt::{Receipt, ReceiptThread, ReceiptType},
Expand Down Expand Up @@ -433,6 +432,44 @@ impl<P: RoomDataProvider> TimelineInner<P> {
self.state.write().await.clear();
}

/// Replaces the content of the current timeline with initial events.
///
/// Also sets up read receipts and the read marker for a live timeline of a
/// room.
///
/// This is all done with a single lock guard, since we don't want the state
/// to be modified between the clear and re-insertion of new events.
pub(super) async fn replace_with_initial_events(&self, events: Vec<SyncTimelineEvent>) {
let mut state = self.state.write().await;

state.clear();

let track_read_markers = self.settings.track_read_receipts;
if track_read_markers {
self.populate_initial_user_receipt(ReceiptType::Read).await;
self.populate_initial_user_receipt(ReceiptType::ReadPrivate).await;
}

if !events.is_empty() {
state
.add_events_at(
events,
TimelineEnd::Back { from_cache: true },
&self.room_data_provider,
&self.settings,
)
.await;
}

if track_read_markers {
if let Some(fully_read_event_id) =
self.room_data_provider.load_fully_read_marker().await
{
state.set_fully_read_event(fully_read_event_id);
}
}
}

pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
self.state.write().await.handle_fully_read_marker(fully_read_event_id);
}
Expand Down Expand Up @@ -674,6 +711,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
}
}

#[cfg(any(test, feature = "testing"))]
pub(super) async fn set_fully_read_event(&self, fully_read_event_id: OwnedEventId) {
self.state.write().await.set_fully_read_event(fully_read_event_id);
}
Expand Down Expand Up @@ -950,65 +988,6 @@ impl TimelineInner {
&self.room_data_provider
}

/// Replaces the content of the current timeline with initial events.
///
/// Also sets up read receipts and the read marker for a live timeline of a
/// room.
///
/// This is all done with a single lock guard, since we don't want the state
/// to be modified between the clear and re-insertion of new events.
pub(super) async fn replace_with_initial_events(&self, events: Vec<SyncTimelineEvent>) {
let mut state = self.state.write().await;

state.clear();

let track_read_markers = self.settings.track_read_receipts;
if track_read_markers {
self.populate_initial_user_receipt(ReceiptType::Read).await;
self.populate_initial_user_receipt(ReceiptType::ReadPrivate).await;
}

if !events.is_empty() {
state
.add_events_at(
events,
TimelineEnd::Back { from_cache: true },
&self.room_data_provider,
&self.settings,
)
.await;
}

if track_read_markers {
self.load_fully_read_event().await;
}
}

/// Get the current fully-read event, from storage.
pub(super) async fn fully_read_event(&self) -> Option<FullyReadEvent> {
match self.room().account_data_static().await {
Ok(Some(fully_read)) => match fully_read.deserialize() {
Ok(fully_read) => Some(fully_read),
Err(e) => {
error!("Failed to deserialize fully-read account data: {e}");
None
}
},
Err(e) => {
error!("Failed to get fully-read account data from the store: {e}");
None
}
_ => None,
}
}

/// Load the current fully-read event in this inner timeline from storage.
pub(super) async fn load_fully_read_event(&self) {
if let Some(fully_read) = self.fully_read_event().await {
self.set_fully_read_event(fully_read.content.event_id).await;
}
}

#[instrument(skip(self))]
pub(super) async fn fetch_in_reply_to_details(
&self,
Expand Down Expand Up @@ -1126,10 +1105,10 @@ impl TimelineInner {
}
}
SendReceiptType::FullyRead => {
if let Some(old_fully_read) = self.fully_read_event().await {
if let Some(relative_pos) = state
.meta
.compare_events_positions(&old_fully_read.content.event_id, event_id)
if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
{
if let Some(relative_pos) =
state.meta.compare_events_positions(&prev_event_id, event_id)
{
return relative_pos == RelativePosition::After;
}
Expand Down
35 changes: 33 additions & 2 deletions crates/matrix-sdk-ui/src/timeline/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use assert_matches::assert_matches;
use assert_matches2::assert_let;
use eyeball_im::VectorDiff;
use matrix_sdk::test_utils::events::EventFactory;
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
use matrix_sdk_test::{async_test, sync_timeline_event, ALICE, BOB, CAROL};
use ruma::{
Expand All @@ -29,13 +30,16 @@ use ruma::{
},
FullStateEventContent,
},
owned_event_id,
};
use stream_assert::assert_next_matches;

use super::TestTimeline;
use crate::timeline::{
event_item::AnyOtherFullStateEventContent, inner::TimelineEnd, MembershipChange,
TimelineDetails, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
event_item::AnyOtherFullStateEventContent,
inner::{TimelineEnd, TimelineInnerSettings},
tests::TestRoomDataProvider,
MembershipChange, TimelineDetails, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
};

#[async_test]
Expand Down Expand Up @@ -72,6 +76,33 @@ async fn test_initial_events() {
assert_matches!(&item.kind, TimelineItemKind::Virtual(VirtualTimelineItem::DayDivider(_)));
}

#[async_test]
async fn test_replace_with_initial_events_and_read_marker() {
let event_id = owned_event_id!("$1");
let timeline = TestTimeline::with_room_data_provider(
TestRoomDataProvider::default().with_fully_read_marker(event_id),
)
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });

let factory = EventFactory::new();
let ev = factory.text_msg("hey").sender(*ALICE).into_sync();

timeline.inner.add_events_at(vec![ev], TimelineEnd::Back { from_cache: false }).await;

let items = timeline.inner.items().await;
assert_eq!(items.len(), 2);
assert!(items[0].is_day_divider());
assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "hey");

let ev = factory.text_msg("yo").sender(*BOB).into_sync();
timeline.inner.replace_with_initial_events(vec![ev]).await;

let items = timeline.inner.items().await;
assert_eq!(items.len(), 2);
assert!(items[0].is_day_divider());
assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "yo");
}

#[async_test]
async fn test_sticker() {
let timeline = TestTimeline::new();
Expand Down
14 changes: 12 additions & 2 deletions crates/matrix-sdk-ui/src/timeline/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,17 @@ type ReadReceiptMap =
#[derive(Clone, Default)]
struct TestRoomDataProvider {
initial_user_receipts: ReadReceiptMap,
fully_read_marker: Option<OwnedEventId>,
}

impl TestRoomDataProvider {
fn with_initial_user_receipts(initial_user_receipts: ReadReceiptMap) -> Self {
Self { initial_user_receipts }
fn with_initial_user_receipts(mut self, initial_user_receipts: ReadReceiptMap) -> Self {
self.initial_user_receipts = initial_user_receipts;
self
}
fn with_fully_read_marker(mut self, event_id: OwnedEventId) -> Self {
self.fully_read_marker = Some(event_id);
self
}
}

Expand Down Expand Up @@ -345,6 +351,10 @@ impl RoomDataProvider for TestRoomDataProvider {

Some((push_rules, push_context))
}

async fn load_fully_read_marker(&self) -> Option<OwnedEventId> {
self.fully_read_marker.clone()
}
}

pub(super) async fn assert_event_is_updated(
Expand Down
8 changes: 4 additions & 4 deletions crates/matrix-sdk-ui/src/timeline/tests/read_receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ async fn test_initial_public_unthreaded_receipt() {
);

let timeline = TestTimeline::with_room_data_provider(
TestRoomDataProvider::with_initial_user_receipts(initial_user_receipts),
TestRoomDataProvider::default().with_initial_user_receipts(initial_user_receipts),
)
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });

Expand All @@ -515,7 +515,7 @@ async fn test_initial_public_main_thread_receipt() {
);

let timeline = TestTimeline::with_room_data_provider(
TestRoomDataProvider::with_initial_user_receipts(initial_user_receipts),
TestRoomDataProvider::default().with_initial_user_receipts(initial_user_receipts),
)
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });

Expand All @@ -540,7 +540,7 @@ async fn test_initial_private_unthreaded_receipt() {
);

let timeline = TestTimeline::with_room_data_provider(
TestRoomDataProvider::with_initial_user_receipts(initial_user_receipts),
TestRoomDataProvider::default().with_initial_user_receipts(initial_user_receipts),
)
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });

Expand All @@ -565,7 +565,7 @@ async fn test_initial_private_main_thread_receipt() {
);

let timeline = TestTimeline::with_room_data_provider(
TestRoomDataProvider::with_initial_user_receipts(initial_user_receipts),
TestRoomDataProvider::default().with_initial_user_receipts(initial_user_receipts),
)
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });

Expand Down
29 changes: 26 additions & 3 deletions crates/matrix-sdk-ui/src/timeline/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ use indexmap::IndexMap;
use matrix_sdk::{deserialized_responses::TimelineEvent, Result};
use matrix_sdk::{event_cache, Room};
use matrix_sdk_base::latest_event::LatestEvent;
#[cfg(feature = "e2e-encryption")]
use ruma::{events::AnySyncTimelineEvent, serde::Raw};
use ruma::{
events::receipt::{Receipt, ReceiptThread, ReceiptType},
events::{
fully_read::FullyReadEventContent,
receipt::{Receipt, ReceiptThread, ReceiptType},
},
push::{PushConditionRoomCtx, Ruleset},
EventId, OwnedEventId, OwnedUserId, RoomVersionId, UserId,
};
#[cfg(feature = "e2e-encryption")]
use ruma::{events::AnySyncTimelineEvent, serde::Raw};
use tracing::{debug, error};

use super::{Profile, TimelineBuilder};
Expand Down Expand Up @@ -81,6 +84,9 @@ pub(super) trait RoomDataProvider: Clone + Send + Sync + 'static {
/// Loads read receipts for an event from the storage backend.
async fn load_event_receipts(&self, event_id: &EventId) -> IndexMap<OwnedUserId, Receipt>;

/// Load the current fully-read event id, from storage.
async fn load_fully_read_marker(&self) -> Option<OwnedEventId>;

async fn push_rules_and_context(&self) -> Option<(Ruleset, PushConditionRoomCtx)>;
}

Expand Down Expand Up @@ -188,6 +194,23 @@ impl RoomDataProvider for Room {
}
}
}

async fn load_fully_read_marker(&self) -> Option<OwnedEventId> {
match self.account_data_static::<FullyReadEventContent>().await {
Ok(Some(fully_read)) => match fully_read.deserialize() {
Ok(fully_read) => Some(fully_read.content.event_id),
Err(e) => {
error!("Failed to deserialize fully-read account data: {e}");
None
}
},
Err(e) => {
error!("Failed to get fully-read account data from the store: {e}");
None
}
_ => None,
}
}
}

// Internal helper to make most of retry_event_decryption independent of a room
Expand Down

0 comments on commit f63187f

Please sign in to comment.