From 9f48d07a64e8c9836cb56f5f52283c8ff0a9ac5d Mon Sep 17 00:00:00 2001 From: mycognosist Date: Fri, 2 Feb 2024 09:26:59 +0200 Subject: [PATCH] use connection id for better session tracking --- solar/src/actors/replication/ebt/manager.rs | 156 ++++++++++++++------ 1 file changed, 113 insertions(+), 43 deletions(-) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index c2d1167..fe6675a 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -28,7 +28,10 @@ use serde_json::Value; use crate::{ actors::{ muxrpc::{ReqNo, RpcBlobsGetEvent}, - network::{connection::ConnectionData, connection_manager::ConnectionEvent}, + network::{ + connection::{ConnectionData, ConnectionId}, + connection_manager::ConnectionEvent, + }, replication::{ blobs, ebt::{clock, replicator, EncodedClockValue, VectorClock}, @@ -46,14 +49,14 @@ use crate::{ pub enum EbtEvent { WaitForSessionRequest(ConnectionData), RequestSession(ConnectionData), - SessionInitiated(ReqNo, SsbId, SessionRole), - SendClock(ReqNo, VectorClock), - SendMessage(ReqNo, SsbId, Value), - ReceivedClock(ReqNo, SsbId, VectorClock), + SessionInitiated(ConnectionId, ReqNo, SsbId, SessionRole), + SendClock(ConnectionId, ReqNo, VectorClock), + SendMessage(ConnectionId, ReqNo, SsbId, Value), + ReceivedClock(ConnectionId, ReqNo, SsbId, VectorClock), ReceivedMessage(Message), - SessionConcluded(SsbId), - SessionTimeout(ConnectionData), - Error(ConnectionData, ReqNo, SsbId, String), + SessionConcluded(ConnectionId, SsbId), + SessionTimeout(ConnectionData, SsbId), + Error(ConnectionData, Option, SsbId, String), } /// Role of a peer in an EBT session. @@ -75,7 +78,7 @@ impl Display for SessionRole { #[derive(Debug)] pub struct EbtManager { /// Active EBT peer sessions. - active_sessions: HashMap, + active_sessions: HashSet<(ConnectionId, SsbId)>, /// Duration to wait before switching feed request to a different peer. _feed_wait_timeout: u64, /// The state of the replication loop. @@ -97,6 +100,9 @@ pub struct EbtManager { /// request number. // // TODO: Do we want to remove each entry when the session concludes? + // + // TODO: Rather use connection ID as the key (req no is not unique + // across sessions). sent_clocks: HashMap, /// The sequence number of the latest message sent to each peer /// for each requested feed. @@ -106,7 +112,7 @@ pub struct EbtManager { impl Default for EbtManager { fn default() -> Self { EbtManager { - active_sessions: HashMap::new(), + active_sessions: HashSet::new(), _feed_wait_timeout: 3, _is_replication_loop_active: false, local_clock: HashMap::new(), @@ -149,7 +155,7 @@ impl EbtManager { /// Retrieve either the local vector clock or the stored vector clock /// for the peer represented by the given SSB ID. - fn get_clock(&self, ssb_id: Option<&SsbId>) -> Option { + fn _get_clock(&self, ssb_id: Option<&SsbId>) -> Option { match ssb_id { Some(id) => self.peer_clocks.get(id).cloned(), None => Some(self.local_clock.to_owned()), @@ -228,7 +234,7 @@ impl EbtManager { /// flag. fn _is_receiving(&self, peer_ssb_id: SsbId, ssb_id: SsbId) -> Result { // Retrieve the vector clock for the first peer. - if let Some(clock) = self.get_clock(Some(&peer_ssb_id)) { + if let Some(clock) = self._get_clock(Some(&peer_ssb_id)) { // Check if the second peer is represented in the vector clock. if let Some(encoded_seq_no) = clock.get(&ssb_id) { // Check if the receive flag is true. @@ -273,16 +279,21 @@ impl EbtManager { } /// Register a new EBT session for the given peer. - fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) { + //fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) { + fn register_session(&mut self, connection_id: ConnectionId, peer_ssb_id: &SsbId) { self.active_sessions - .insert(peer_ssb_id.to_owned(), (req_no, session_role)); + .insert((connection_id, peer_ssb_id.to_owned())); - trace!(target: "ebt-session", "Registered new EBT session {} for {}", req_no, peer_ssb_id); + trace!(target: "ebt-session", "Registered new EBT session for connection {} with {}", connection_id, peer_ssb_id); } /// Remove the given peer from the list of active session. - fn remove_session(&mut self, peer_ssb_id: &SsbId) { - let _ = self.active_sessions.remove(peer_ssb_id); + fn remove_session(&mut self, connection_id: ConnectionId, peer_ssb_id: &SsbId) { + // TODO: Clean-up the string story so we're not sprinkling additional + // allocations everywhere. + let _ = self + .active_sessions + .remove(&(connection_id, peer_ssb_id.to_owned())); } /// Revoke a replication request for the feed represented by the given SSB @@ -379,7 +390,10 @@ impl EbtManager { // Only proceed with session initiation if there // is no currently active session with the given peer. - if !self.active_sessions.contains_key(&peer_ssb_id) { + if !self + .active_sessions + .contains(&(connection_data.id, peer_ssb_id)) + { trace!( target: "ebt", "Requesting an EBT session with {:?}", @@ -397,13 +411,14 @@ impl EbtManager { async fn handle_session_initiated( &mut self, + connection_id: ConnectionId, req_no: ReqNo, peer_ssb_id: SsbId, session_role: SessionRole, ) -> Result<()> { trace!(target: "ebt-replication", "Initiated EBT session with {} as {}", peer_ssb_id, session_role); - self.register_session(&peer_ssb_id, req_no, session_role.to_owned()); + self.register_session(connection_id, &peer_ssb_id); let local_clock = self.local_clock.to_owned(); match session_role { @@ -411,10 +426,12 @@ impl EbtManager { // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); + trace!(target: "ebt-replication", "Sending clock as responder for request {}", req_no); + ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SendClock(req_no, local_clock)), + BrokerMessage::Ebt(EbtEvent::SendClock(connection_id, req_no, local_clock)), )) .await?; } @@ -427,12 +444,19 @@ impl EbtManager { Ok(()) } - fn handle_send_clock(&mut self, req_no: ReqNo, clock: VectorClock) -> Option { + fn handle_send_clock( + &mut self, + _connection_id: ConnectionId, + req_no: ReqNo, + clock: VectorClock, + ) -> Option { + // TODO: Include connection ID in sent clock tracking. self.sent_clocks.insert(req_no, clock) } async fn handle_received_clock( &mut self, + connection_id: ConnectionId, req_no: ReqNo, peer_ssb_id: SsbId, clock: VectorClock, @@ -450,11 +474,17 @@ impl EbtManager { // assumed that the clock was sent in response to a locally-sent // EBT replicate request. Ie. the session was requested by the // local peer. - if !self.active_sessions.contains_key(&peer_ssb_id) { + // + // TODO: Can we avoid the allocation on `peer_ssb_id`? + if !self + .active_sessions + .contains(&(connection_id, peer_ssb_id.to_owned())) + { ch_broker .send(BrokerEvent::new( Destination::Broadcast, BrokerMessage::Ebt(EbtEvent::SessionInitiated( + connection_id, req_no, peer_ssb_id.to_owned(), SessionRole::Requester, @@ -472,7 +502,7 @@ impl EbtManager { ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SendClock(req_no, local_clock)), + BrokerMessage::Ebt(EbtEvent::SendClock(connection_id, req_no, local_clock)), )) .await?; } @@ -484,7 +514,12 @@ impl EbtManager { ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SendMessage(req_no, peer_ssb_id.to_owned(), msg)), + BrokerMessage::Ebt(EbtEvent::SendMessage( + connection_id, + req_no, + peer_ssb_id.to_owned(), + msg, + )), )) .await?; } @@ -564,6 +599,10 @@ impl EbtManager { Ok(()) } + /* + TODO: Reintroduce this when we figure out the connection ID / request ID + association. + /// Look up the latest sequence number for the updated feed, encode it as /// the single entry of a vector clock and send that to any active session /// peers. @@ -599,14 +638,26 @@ impl EbtManager { Ok(()) } + */ - async fn handle_session_concluded(&mut self, peer_ssb_id: SsbId) { - trace!(target: "ebt-replication", "Session concluded with: {}", peer_ssb_id); - self.remove_session(&peer_ssb_id); + async fn handle_session_concluded(&mut self, connection_id: ConnectionId, peer_ssb_id: SsbId) { + trace!(target: "ebt-replication", "Session concluded for connection {} with {}", connection_id, peer_ssb_id); + self.remove_session(connection_id, &peer_ssb_id); } - async fn handle_session_timeout(&mut self, connection_data: ConnectionData) -> Result<()> { - trace!(target: "ebt-replication", "Session timeout while waiting for request"); + async fn handle_session_timeout( + &mut self, + connection_data: ConnectionData, + peer_ssb_id: SsbId, + ) -> Result<()> { + trace!(target: "ebt-replication", "Session timeout while waiting for request from {} on connection {}", peer_ssb_id, connection_data.id); + + // Session should not have been initiated in the first place, meaning + // that this removal action should be unnecessary. Keeping it here + // for now out of caution. + // + // TODO: Remove this line when it's clear that it's not needed. + self.remove_session(connection_data.id, &peer_ssb_id); // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); @@ -625,15 +676,28 @@ impl EbtManager { async fn handle_error( &mut self, connection_data: ConnectionData, - req_no: ReqNo, + req_no: Option, peer_ssb_id: SsbId, err_msg: String, ) -> Result<()> { - trace!(target: "ebt-replication", "Session error with {} for request number {}: {}", peer_ssb_id, req_no, err_msg); + trace!(target: "ebt-replication", "Session error with {} for request number {:?}: {}", peer_ssb_id, req_no, err_msg); + + self.remove_session(connection_data.id, &peer_ssb_id); // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Connection(ConnectionEvent::Disconnecting(connection_data)), + )) + .await?; + + // TODO: Match on error variant and fallback to class replication if + // the error was `Error::EbtReplicate`. + + /* // Fallback to classic replication. ch_broker .send(BrokerEvent::new( @@ -641,6 +705,7 @@ impl EbtManager { BrokerMessage::Connection(ConnectionEvent::ReplicatingClassic(connection_data)), )) .await?; + */ Ok(()) } @@ -684,17 +749,17 @@ impl EbtManager { EbtEvent::RequestSession(connection_data) => { self.handle_request_session(connection_data).await; } - EbtEvent::SessionInitiated(req_no, peer_ssb_id, session_role) => { - if let Err(err) = self.handle_session_initiated(req_no, peer_ssb_id, session_role).await { + EbtEvent::SessionInitiated(connection_id, req_no, peer_ssb_id, session_role) => { + if let Err(err) = self.handle_session_initiated(connection_id, req_no, peer_ssb_id, session_role).await { error!("Error while handling 'session initiated' event: {}", err) } } - EbtEvent::SendClock(req_no, clock) => { + EbtEvent::SendClock(connection_id, req_no, clock) => { trace!(target: "ebt-replication", "Sending vector clock: {:?}", clock); - let _ = self.handle_send_clock(req_no, clock); + let _ = self.handle_send_clock(connection_id, req_no, clock); } - EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock) => { - if let Err(err) = self.handle_received_clock(req_no, peer_ssb_id, clock).await { + EbtEvent::ReceivedClock(connection_id, req_no, peer_ssb_id, clock) => { + if let Err(err) = self.handle_received_clock(connection_id, req_no, peer_ssb_id, clock).await { error!("Error while handling 'received clock' event: {}", err) } } @@ -703,17 +768,17 @@ impl EbtManager { error!("Error while handling 'received message' event: {}", err) } } - EbtEvent::SendMessage(_req_no, peer_ssb_id, msg) => { + EbtEvent::SendMessage(_connection_id, _req_no, peer_ssb_id, msg) => { trace!(target: "ebt-replication", "Sending message: {:?}...", msg); if let Err(err) = self.handle_send_message(peer_ssb_id, msg).await { error!("Error while handling 'send message' event: {}", err) } } - EbtEvent::SessionConcluded(connection_data) => { - self.handle_session_concluded(connection_data).await; + EbtEvent::SessionConcluded(connection_id, peer_ssb_id) => { + self.handle_session_concluded(connection_id, peer_ssb_id).await; } - EbtEvent::SessionTimeout(connection_data) => { - if let Err(err) = self.handle_session_timeout(connection_data).await { + EbtEvent::SessionTimeout(connection_data, peer_ssb_id) => { + if let Err(err) = self.handle_session_timeout(connection_data, peer_ssb_id).await { error!("Error while handling 'session timeout' event: {}", err) } } @@ -723,14 +788,19 @@ impl EbtManager { } } } - } else if let Some(BrokerMessage::StoreKv(StoreKvEvent(ssb_id))) = msg { + } else if let Some(BrokerMessage::StoreKv(StoreKvEvent(_ssb_id))) = msg { debug!("Received KV store event from broker"); + /* + TODO: Reintroduce this later, once Manyverse restart + issue is solved. + // Respond to a key-value store state change for the given peer. // This is triggered when a new message is appended to the local feed. if let Err(err) = self.handle_local_store_updated(ssb_id).await { error!("Error while handling 'local store updated' event: {}", err) } + */ } } }