Skip to content

Commit

Permalink
refactor: new type wrapping (EventHandlerHandle, Receiver)
Browse files Browse the repository at this point in the history
  • Loading branch information
torrybr committed Nov 11, 2024
1 parent ab3e73e commit dd1c78f
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 44 deletions.
19 changes: 19 additions & 0 deletions crates/matrix-sdk/src/live_location_share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use ruma::{
events::{beacon_info::BeaconInfoEventContent, location::LocationContent},
MilliSecondsSinceUnixEpoch, OwnedUserId,
};
use tokio::sync::broadcast;

use crate::event_handler::EventHandlerHandle;

/// Details of the last known location beacon.
#[derive(Clone, Debug)]
Expand All @@ -38,3 +41,19 @@ pub struct LiveLocationShare {
/// The user ID of the person sharing their live location.
pub user_id: OwnedUserId,
}

/// A subscription to live location sharing events.
///
/// This struct holds the `EventHandlerHandle` and the
/// `Receiver<LiveLocationShare>` for live location shares.
#[derive(Debug)]
pub struct LiveLocationSubscription {
/// Manages the event handler lifecycle.
pub event_handler_handle: EventHandlerHandle,
/// Receives live location shares.
pub receiver: broadcast::Receiver<LiveLocationShare>,
}

impl Drop for LiveLocationSubscription {
fn drop(&mut self) {}
}
60 changes: 25 additions & 35 deletions crates/matrix-sdk/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ use matrix_sdk_base::{
ComposerDraft, RoomInfoNotableUpdateReasons, RoomMemberships, StateChanges, StateStoreDataKey,
StateStoreDataValue,
};
use matrix_sdk_common::{
deserialized_responses::SyncTimelineEvent,
executor::{spawn, JoinHandle},
timeout::timeout,
};
use matrix_sdk_common::{deserialized_responses::SyncTimelineEvent, timeout::timeout};
use mime::Mime;
#[cfg(feature = "e2e-encryption")]
use ruma::events::{
Expand Down Expand Up @@ -136,7 +132,7 @@ use crate::{
error::{BeaconError, WrongRoomState},
event_cache::{self, EventCacheDropHandles, RoomEventCache},
event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
live_location_share::{LastLocation, LiveLocationShare},
live_location_share::{LastLocation, LiveLocationShare, LiveLocationSubscription},
media::{MediaFormat, MediaRequestParameters},
notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
room::power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
Expand Down Expand Up @@ -3151,45 +3147,39 @@ impl Room {
///
/// The returned receiver will receive a new event for each sync response
/// that contains a `m.beacon` event.
pub fn subscribe_to_live_location_shares(
&self,
) -> (JoinHandle<()>, broadcast::Receiver<LiveLocationShare>) {
pub fn subscribe_to_live_location_shares(&self) -> LiveLocationSubscription {
let (sender, receiver) = broadcast::channel(128);

let client = self.client.clone();
let room_id = self.room_id().to_owned();
let room = self.clone();

let handle: JoinHandle<()> = spawn(async move {
let beacon_event_handler_handle = client.add_room_event_handler(&room_id, {
move |event: OriginalSyncBeaconEvent| async move {
let user_id = event.sender;
let beacon_event_handler_handle = client.add_room_event_handler(&room_id, {
move |event: OriginalSyncBeaconEvent| async move {
let user_id = event.sender;

let beacon_info = match room.get_user_beacon_info(&user_id).await {
Ok(info) => info.content,
Err(e) => {
eprintln!("Failed to get beacon info: {:?}", e);
return;
}
};

let live_location_share = LiveLocationShare {
user_id: user_id.clone(),
last_location: LastLocation {
location: event.content.location,
ts: event.content.ts,
},
beacon_info,
};

let _ = sender.send(live_location_share);
}
});
let beacon_info = match room.get_user_beacon_info(&user_id).await {
Ok(info) => info.content,
Err(e) => {
eprintln!("Failed to get beacon info: {:?}", e);
return;
}
};

let _ = beacon_event_handler_handle;
let live_location_share = LiveLocationShare {
last_location: LastLocation {
location: event.content.location,
ts: event.content.ts,
},
user_id,
beacon_info,
};

let _ = sender.send(live_location_share);
}
});

(handle, receiver)
LiveLocationSubscription { event_handler_handle: beacon_event_handler_handle, receiver }
}

/// Load pinned state events for a room from the `/state` endpoint in the
Expand Down
14 changes: 5 additions & 9 deletions crates/matrix-sdk/tests/integration/room/beacon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ async fn test_subscribe_to_live_location_shares() {

let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();

let (task_handle, mut subscriber) = room.subscribe_to_live_location_shares();
let mut subscription = room.subscribe_to_live_location_shares();

let mut timeline_events = Vec::new();

Expand Down Expand Up @@ -250,7 +250,7 @@ async fn test_subscribe_to_live_location_shares() {

for i in 0..timeline_events.len() {
let live_location_share =
subscriber.recv().await.expect("Failed to receive live location share");
subscription.receiver.recv().await.expect("Failed to receive live location share");

assert_eq!(live_location_share.user_id.to_string(), "@example:localhost");

Expand All @@ -273,8 +273,6 @@ async fn test_subscribe_to_live_location_shares() {
assert_eq!(live_location_share.beacon_info.ts, current_time);
assert_eq!(live_location_share.beacon_info.asset.type_, AssetType::Self_);
}

task_handle.await.unwrap();
}

#[async_test]
Expand Down Expand Up @@ -350,7 +348,7 @@ async fn test_subscribe_to_live_location_shares_with_multiple_users() {

let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();

let (task_handle, mut subscriber) = room.subscribe_to_live_location_shares();
let mut subscription = room.subscribe_to_live_location_shares();

sync_builder.add_joined_room(JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_bulk(
[
Expand Down Expand Up @@ -400,7 +398,7 @@ async fn test_subscribe_to_live_location_shares_with_multiple_users() {
server.reset().await;

let live_location_share =
subscriber.recv().await.expect("Failed to receive live location share");
subscription.receiver.recv().await.expect("Failed to receive live location share");

assert_eq!(live_location_share.user_id.to_string(), "@user1:localhost");

Expand All @@ -423,7 +421,7 @@ async fn test_subscribe_to_live_location_shares_with_multiple_users() {
assert_eq!(live_location_share.beacon_info.asset.type_, AssetType::Self_);

let live_location_share =
subscriber.recv().await.expect("Failed to receive live location share");
subscription.receiver.recv().await.expect("Failed to receive live location share");

assert_eq!(live_location_share.user_id.to_string(), "@user2:localhost");

Expand All @@ -444,6 +442,4 @@ async fn test_subscribe_to_live_location_shares_with_multiple_users() {
assert_eq!(live_location_share.beacon_info.timeout, Duration::from_millis(3000));
assert_eq!(live_location_share.beacon_info.ts, current_time);
assert_eq!(live_location_share.beacon_info.asset.type_, AssetType::Self_);

task_handle.await.unwrap();
}

0 comments on commit dd1c78f

Please sign in to comment.