From 52557c62935f411869b5f55faa43875432e71663 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Fri, 2 Feb 2024 08:57:38 +0200 Subject: [PATCH 01/10] pass conn id into muxrpc ebt handler for targeted responses --- solar/src/actors/muxrpc/ebt.rs | 144 ++++++++++++--------------------- 1 file changed, 53 insertions(+), 91 deletions(-) diff --git a/solar/src/actors/muxrpc/ebt.rs b/solar/src/actors/muxrpc/ebt.rs index e2deec4..61fc4ba 100644 --- a/solar/src/actors/muxrpc/ebt.rs +++ b/solar/src/actors/muxrpc/ebt.rs @@ -46,6 +46,7 @@ where ch_broker: &mut ChBrokerSend, peer_ssb_id: String, active_request: Option, + connection_id: usize, ) -> Result { trace!(target: "muxrpc-ebt-handler", "Received MUXRPC input: {:?}", op); @@ -62,7 +63,8 @@ where RpcInput::Network(req_no, rpc::RecvMsg::RpcRequest(req)) => { match ApiMethod::from_rpc_body(req) { Some(ApiMethod::EbtReplicate) => { - self.recv_ebtreplicate(api, *req_no, req, peer_ssb_id).await + self.recv_ebtreplicate(api, *req_no, req, peer_ssb_id, connection_id) + .await } _ => Ok(false), } @@ -76,6 +78,7 @@ where .send(BrokerEvent::new( Destination::Broadcast, BrokerMessage::Ebt(EbtEvent::ReceivedClock( + connection_id, *req_no, peer_ssb_id, clock, @@ -88,7 +91,7 @@ where } // Handle an incoming MUXRPC response. RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => { - self.recv_rpc_response(ch_broker, *req_no, res, peer_ssb_id) + self.recv_rpc_response(ch_broker, *req_no, res, peer_ssb_id, connection_id) .await } // Handle an incoming MUXRPC 'cancel stream' response. @@ -101,20 +104,43 @@ where } // Handle a broker message. RpcInput::Message(msg) => match msg { - BrokerMessage::Ebt(EbtEvent::SendClock(req_no, clock)) => { - // Serialize the vector clock as a JSON string. - let json_clock = serde_json::to_string(&clock)?; - // The request number must be negative (response). - api.ebt_clock_res_send(-(*req_no), &json_clock).await?; + BrokerMessage::Ebt(EbtEvent::SendClock(conn_id, req_no, clock)) => { + // Only send the clock if the associated connection is + // being handled by this instance of the handler. + // + // This prevents the clock being sent to every peer with + // whom we have an active session and matching request + // number. + if *conn_id == connection_id { + // Serialize the vector clock as a JSON string. + let json_clock = serde_json::to_string(&clock)?; + // The request number must be negative (response). + api.ebt_clock_res_send(-(*req_no), &json_clock).await?; + + trace!(target: "ebt", "Sent clock to connection {} with request number {}", conn_id, req_no); + } Ok(false) } - BrokerMessage::Ebt(EbtEvent::SendMessage(req_no, ssb_id, msg)) => { - // Ensure the message is sent to the correct peer. - if peer_ssb_id == *ssb_id { - let json_msg = msg.to_string(); - // The request number must be negative (response). - api.ebt_feed_res_send(-(*req_no), &json_msg).await?; + BrokerMessage::Ebt(EbtEvent::SendMessage(conn_id, req_no, ssb_id, msg)) => { + // Only send the message if the associated connection is + // being handled by this instance of the handler. + // + // This prevents the message being sent to every peer with + // whom we have an active session and matching request + // number. + if *conn_id == connection_id { + // TODO: Remove this check; made redundant by the + // connection ID check. + // + // Ensure the message is sent to the correct peer. + if peer_ssb_id == *ssb_id { + let json_msg = msg.to_string(); + // The request number must be negative (response). + api.ebt_feed_res_send(-(*req_no), &json_msg).await?; + + trace!(target: "ebt", "Sent message to {} on connection {}", ssb_id, conn_id); + } } Ok(false) @@ -145,6 +171,7 @@ where req_no: ReqNo, req: &rpc::Body, peer_ssb_id: String, + connection_id: usize, ) -> Result { // Deserialize the args from an incoming EBT replicate request. let mut args: Vec = serde_json::from_value(req.args.clone())?; @@ -180,6 +207,7 @@ where .send(BrokerEvent::new( Destination::Broadcast, BrokerMessage::Ebt(EbtEvent::SessionInitiated( + connection_id, req_no, peer_ssb_id, SessionRole::Responder, @@ -198,6 +226,7 @@ where req_no: ReqNo, res: &[u8], peer_ssb_id: String, + connection_id: usize, ) -> Result { trace!(target: "ebt-handler", "Received RPC response: {}", req_no); @@ -214,7 +243,12 @@ where ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock)), + BrokerMessage::Ebt(EbtEvent::ReceivedClock( + connection_id, + req_no, + peer_ssb_id, + clock, + )), )) .await?; } else { @@ -243,11 +277,13 @@ where Ok(false) } - /// Close the stream and remove the associated request from the map of - /// active requests. + /// Remove the associated request from the map of active requests and close + /// the stream. async fn recv_cancelstream(&mut self, api: &mut ApiCaller, req_no: ReqNo) -> Result { - api.rpc().send_stream_eof(-req_no).await?; + trace!(target: "ebt-handler", "Received cancel stream RPC response: {}", req_no); + self.active_requests.remove(&req_no); + api.rpc().send_stream_eof(-req_no).await?; Ok(true) } @@ -261,78 +297,4 @@ where Err(Error::EbtReplicate((req_no, err_msg.to_string()))) } - - /* - /// Process an incoming MUXRPC response. The response is expected to - /// contain an SSB message. - async fn recv_rpc_response( - &mut self, - _api: &mut ApiCaller, - ch_broker: &mut ChBrokerSend, - req_no: i32, - res: &[u8], - ) -> Result { - // Only handle the response if we made the request. - if self.peers.contains_key(&req_no) { - // First try to deserialize the response into a message value. - // If that fails, try to deserialize into a message KVT and then - // convert that into a message value. Return an error if that fails. - // This approach allows us to handle the unlikely event that - // messages are sent as KVTs and not simply values. - // - // Validation of the message signature and fields is also performed - // as part of the call to `from_slice`. - let msg = match Message::from_slice(res) { - Ok(msg) => msg, - Err(_) => MessageKvt::from_slice(res)?.into_message()?, - }; - - // Retrieve the sequence number of the most recent message for - // the peer that authored the received message. - let last_seq = KV_STORE - .read() - .await - .get_latest_seq(&msg.author().to_string())? - .unwrap_or(0); - - // Validate the sequence number. - if msg.sequence() == last_seq + 1 { - // Append the message to the feed. - KV_STORE.write().await.append_feed(msg.clone()).await?; - - info!( - "received msg number {} from {}", - msg.sequence(), - msg.author() - ); - - // Extract blob references from the received message and - // request those blobs if they are not already in the local - // blobstore. - for key in self.extract_blob_refs(&msg) { - if !BLOB_STORE.read().await.exists(&key) { - let event = super::blobs_get::RpcBlobsGetEvent::Get(dto::BlobsGetIn { - key, - size: None, - max: None, - }); - let broker_msg = BrokerEvent::new(Destination::Broadcast, event); - ch_broker.send(broker_msg).await.unwrap(); - } - } - } else { - warn!( - "received out-of-order msg from {}; recv: {} db: {}", - &msg.author().to_string(), - msg.sequence(), - last_seq - ); - } - - Ok(true) - } else { - Ok(false) - } - } - */ } From 1fa7d82f5250ac8b11d47c539b3b6c02cb8e6362 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Fri, 2 Feb 2024 09:23:53 +0200 Subject: [PATCH 02/10] use a type alias for connection id --- solar/src/actors/network/connection.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/solar/src/actors/network/connection.rs b/solar/src/actors/network/connection.rs index 17911c6..79c9895 100644 --- a/solar/src/actors/network/connection.rs +++ b/solar/src/actors/network/connection.rs @@ -60,11 +60,14 @@ impl Display for TcpConnection { } } +/// Unique ID for a connection. +pub type ConnectionId = usize; + /// Connection data. #[derive(Debug, Default, Clone)] pub struct ConnectionData { /// Connection identifier. - pub id: usize, + pub id: ConnectionId, /// The address of the remote peer. pub peer_addr: Option, /// The public key of the remote peer. @@ -105,7 +108,7 @@ impl Display for ConnectionData { } impl ConnectionData { - pub fn new(id: usize) -> Self { + pub fn new(id: ConnectionId) -> Self { ConnectionData { id, ..ConnectionData::default() From fd969f309ea3c91c25d6ba59c1f5ff6ef9a35de5 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Fri, 2 Feb 2024 09:26:25 +0200 Subject: [PATCH 03/10] report any ebt replicate handler error and add connection id to session concluded event --- .../src/actors/replication/ebt/replicator.rs | 33 ++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/solar/src/actors/replication/ebt/replicator.rs b/solar/src/actors/replication/ebt/replicator.rs index 6eec9e6..74f8c53 100644 --- a/solar/src/actors/replication/ebt/replicator.rs +++ b/solar/src/actors/replication/ebt/replicator.rs @@ -37,6 +37,8 @@ pub async fn run( let mut ch_msg = ch_msg.ok_or(Error::OptionIsNone)?; + let connection_id = connection_data.id; + let stream_reader = connection_data.stream.clone().ok_or(Error::OptionIsNone)?; let stream_writer = connection_data.stream.clone().ok_or(Error::OptionIsNone)?; let handshake = connection_data @@ -105,9 +107,10 @@ pub async fn run( }, msg = ch_msg.next().fuse() => { // Listen for a 'session initiated' event. - if let Some(BrokerMessage::Ebt(EbtEvent::SessionInitiated(_, ref ssb_id, ref session_role))) = msg { + if let Some(BrokerMessage::Ebt(EbtEvent::SessionInitiated(_connection_id, ref req_no, ref ssb_id, ref session_role))) = msg { if peer_ssb_id == *ssb_id && *session_role == SessionRole::Responder { session_initiated = true; + replicate_req_no = Some(*req_no); } } if let Some(msg) = msg { @@ -125,6 +128,7 @@ pub async fn run( &mut ch_broker, peer_ssb_id.to_owned(), replicate_req_no, + connection_data.id, ) .await { @@ -132,6 +136,12 @@ pub async fn run( Err(err) => { error!("EBT replicate handler failed: {:?}", err); + /* + // TODO: This match is not exhaustive. + // + // We end up in a situation where the connection is not + // cleaned up (`ConnectionEvent::Disconnecting`) and the + // session is not removed. if let Error::EbtReplicate((req_no, err_msg)) = err { ch_broker .send(BrokerEvent::new( @@ -145,7 +155,19 @@ pub async fn run( )) .await?; } - + */ + + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::Error( + connection_data, + replicate_req_no, + peer_ssb_id.to_owned(), + err.to_string(), + )), + )) + .await?; // Break out of the input processing loop to conclude // the replication session. break; @@ -165,7 +187,10 @@ pub async fn run( ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SessionTimeout(connection_data)), + BrokerMessage::Ebt(EbtEvent::SessionTimeout( + connection_data, + peer_ssb_id.to_owned(), + )), )) .await?; @@ -178,7 +203,7 @@ pub async fn run( ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SessionConcluded(peer_ssb_id)), + BrokerMessage::Ebt(EbtEvent::SessionConcluded(connection_id, peer_ssb_id)), )) .await?; From 9f48d07a64e8c9836cb56f5f52283c8ff0a9ac5d Mon Sep 17 00:00:00 2001 From: mycognosist Date: Fri, 2 Feb 2024 09:26:59 +0200 Subject: [PATCH 04/10] use connection id for better session tracking --- solar/src/actors/replication/ebt/manager.rs | 156 ++++++++++++++------ 1 file changed, 113 insertions(+), 43 deletions(-) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index c2d1167..fe6675a 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -28,7 +28,10 @@ use serde_json::Value; use crate::{ actors::{ muxrpc::{ReqNo, RpcBlobsGetEvent}, - network::{connection::ConnectionData, connection_manager::ConnectionEvent}, + network::{ + connection::{ConnectionData, ConnectionId}, + connection_manager::ConnectionEvent, + }, replication::{ blobs, ebt::{clock, replicator, EncodedClockValue, VectorClock}, @@ -46,14 +49,14 @@ use crate::{ pub enum EbtEvent { WaitForSessionRequest(ConnectionData), RequestSession(ConnectionData), - SessionInitiated(ReqNo, SsbId, SessionRole), - SendClock(ReqNo, VectorClock), - SendMessage(ReqNo, SsbId, Value), - ReceivedClock(ReqNo, SsbId, VectorClock), + SessionInitiated(ConnectionId, ReqNo, SsbId, SessionRole), + SendClock(ConnectionId, ReqNo, VectorClock), + SendMessage(ConnectionId, ReqNo, SsbId, Value), + ReceivedClock(ConnectionId, ReqNo, SsbId, VectorClock), ReceivedMessage(Message), - SessionConcluded(SsbId), - SessionTimeout(ConnectionData), - Error(ConnectionData, ReqNo, SsbId, String), + SessionConcluded(ConnectionId, SsbId), + SessionTimeout(ConnectionData, SsbId), + Error(ConnectionData, Option, SsbId, String), } /// Role of a peer in an EBT session. @@ -75,7 +78,7 @@ impl Display for SessionRole { #[derive(Debug)] pub struct EbtManager { /// Active EBT peer sessions. - active_sessions: HashMap, + active_sessions: HashSet<(ConnectionId, SsbId)>, /// Duration to wait before switching feed request to a different peer. _feed_wait_timeout: u64, /// The state of the replication loop. @@ -97,6 +100,9 @@ pub struct EbtManager { /// request number. // // TODO: Do we want to remove each entry when the session concludes? + // + // TODO: Rather use connection ID as the key (req no is not unique + // across sessions). sent_clocks: HashMap, /// The sequence number of the latest message sent to each peer /// for each requested feed. @@ -106,7 +112,7 @@ pub struct EbtManager { impl Default for EbtManager { fn default() -> Self { EbtManager { - active_sessions: HashMap::new(), + active_sessions: HashSet::new(), _feed_wait_timeout: 3, _is_replication_loop_active: false, local_clock: HashMap::new(), @@ -149,7 +155,7 @@ impl EbtManager { /// Retrieve either the local vector clock or the stored vector clock /// for the peer represented by the given SSB ID. - fn get_clock(&self, ssb_id: Option<&SsbId>) -> Option { + fn _get_clock(&self, ssb_id: Option<&SsbId>) -> Option { match ssb_id { Some(id) => self.peer_clocks.get(id).cloned(), None => Some(self.local_clock.to_owned()), @@ -228,7 +234,7 @@ impl EbtManager { /// flag. fn _is_receiving(&self, peer_ssb_id: SsbId, ssb_id: SsbId) -> Result { // Retrieve the vector clock for the first peer. - if let Some(clock) = self.get_clock(Some(&peer_ssb_id)) { + if let Some(clock) = self._get_clock(Some(&peer_ssb_id)) { // Check if the second peer is represented in the vector clock. if let Some(encoded_seq_no) = clock.get(&ssb_id) { // Check if the receive flag is true. @@ -273,16 +279,21 @@ impl EbtManager { } /// Register a new EBT session for the given peer. - fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) { + //fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) { + fn register_session(&mut self, connection_id: ConnectionId, peer_ssb_id: &SsbId) { self.active_sessions - .insert(peer_ssb_id.to_owned(), (req_no, session_role)); + .insert((connection_id, peer_ssb_id.to_owned())); - trace!(target: "ebt-session", "Registered new EBT session {} for {}", req_no, peer_ssb_id); + trace!(target: "ebt-session", "Registered new EBT session for connection {} with {}", connection_id, peer_ssb_id); } /// Remove the given peer from the list of active session. - fn remove_session(&mut self, peer_ssb_id: &SsbId) { - let _ = self.active_sessions.remove(peer_ssb_id); + fn remove_session(&mut self, connection_id: ConnectionId, peer_ssb_id: &SsbId) { + // TODO: Clean-up the string story so we're not sprinkling additional + // allocations everywhere. + let _ = self + .active_sessions + .remove(&(connection_id, peer_ssb_id.to_owned())); } /// Revoke a replication request for the feed represented by the given SSB @@ -379,7 +390,10 @@ impl EbtManager { // Only proceed with session initiation if there // is no currently active session with the given peer. - if !self.active_sessions.contains_key(&peer_ssb_id) { + if !self + .active_sessions + .contains(&(connection_data.id, peer_ssb_id)) + { trace!( target: "ebt", "Requesting an EBT session with {:?}", @@ -397,13 +411,14 @@ impl EbtManager { async fn handle_session_initiated( &mut self, + connection_id: ConnectionId, req_no: ReqNo, peer_ssb_id: SsbId, session_role: SessionRole, ) -> Result<()> { trace!(target: "ebt-replication", "Initiated EBT session with {} as {}", peer_ssb_id, session_role); - self.register_session(&peer_ssb_id, req_no, session_role.to_owned()); + self.register_session(connection_id, &peer_ssb_id); let local_clock = self.local_clock.to_owned(); match session_role { @@ -411,10 +426,12 @@ impl EbtManager { // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); + trace!(target: "ebt-replication", "Sending clock as responder for request {}", req_no); + ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SendClock(req_no, local_clock)), + BrokerMessage::Ebt(EbtEvent::SendClock(connection_id, req_no, local_clock)), )) .await?; } @@ -427,12 +444,19 @@ impl EbtManager { Ok(()) } - fn handle_send_clock(&mut self, req_no: ReqNo, clock: VectorClock) -> Option { + fn handle_send_clock( + &mut self, + _connection_id: ConnectionId, + req_no: ReqNo, + clock: VectorClock, + ) -> Option { + // TODO: Include connection ID in sent clock tracking. self.sent_clocks.insert(req_no, clock) } async fn handle_received_clock( &mut self, + connection_id: ConnectionId, req_no: ReqNo, peer_ssb_id: SsbId, clock: VectorClock, @@ -450,11 +474,17 @@ impl EbtManager { // assumed that the clock was sent in response to a locally-sent // EBT replicate request. Ie. the session was requested by the // local peer. - if !self.active_sessions.contains_key(&peer_ssb_id) { + // + // TODO: Can we avoid the allocation on `peer_ssb_id`? + if !self + .active_sessions + .contains(&(connection_id, peer_ssb_id.to_owned())) + { ch_broker .send(BrokerEvent::new( Destination::Broadcast, BrokerMessage::Ebt(EbtEvent::SessionInitiated( + connection_id, req_no, peer_ssb_id.to_owned(), SessionRole::Requester, @@ -472,7 +502,7 @@ impl EbtManager { ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SendClock(req_no, local_clock)), + BrokerMessage::Ebt(EbtEvent::SendClock(connection_id, req_no, local_clock)), )) .await?; } @@ -484,7 +514,12 @@ impl EbtManager { ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SendMessage(req_no, peer_ssb_id.to_owned(), msg)), + BrokerMessage::Ebt(EbtEvent::SendMessage( + connection_id, + req_no, + peer_ssb_id.to_owned(), + msg, + )), )) .await?; } @@ -564,6 +599,10 @@ impl EbtManager { Ok(()) } + /* + TODO: Reintroduce this when we figure out the connection ID / request ID + association. + /// Look up the latest sequence number for the updated feed, encode it as /// the single entry of a vector clock and send that to any active session /// peers. @@ -599,14 +638,26 @@ impl EbtManager { Ok(()) } + */ - async fn handle_session_concluded(&mut self, peer_ssb_id: SsbId) { - trace!(target: "ebt-replication", "Session concluded with: {}", peer_ssb_id); - self.remove_session(&peer_ssb_id); + async fn handle_session_concluded(&mut self, connection_id: ConnectionId, peer_ssb_id: SsbId) { + trace!(target: "ebt-replication", "Session concluded for connection {} with {}", connection_id, peer_ssb_id); + self.remove_session(connection_id, &peer_ssb_id); } - async fn handle_session_timeout(&mut self, connection_data: ConnectionData) -> Result<()> { - trace!(target: "ebt-replication", "Session timeout while waiting for request"); + async fn handle_session_timeout( + &mut self, + connection_data: ConnectionData, + peer_ssb_id: SsbId, + ) -> Result<()> { + trace!(target: "ebt-replication", "Session timeout while waiting for request from {} on connection {}", peer_ssb_id, connection_data.id); + + // Session should not have been initiated in the first place, meaning + // that this removal action should be unnecessary. Keeping it here + // for now out of caution. + // + // TODO: Remove this line when it's clear that it's not needed. + self.remove_session(connection_data.id, &peer_ssb_id); // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); @@ -625,15 +676,28 @@ impl EbtManager { async fn handle_error( &mut self, connection_data: ConnectionData, - req_no: ReqNo, + req_no: Option, peer_ssb_id: SsbId, err_msg: String, ) -> Result<()> { - trace!(target: "ebt-replication", "Session error with {} for request number {}: {}", peer_ssb_id, req_no, err_msg); + trace!(target: "ebt-replication", "Session error with {} for request number {:?}: {}", peer_ssb_id, req_no, err_msg); + + self.remove_session(connection_data.id, &peer_ssb_id); // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Connection(ConnectionEvent::Disconnecting(connection_data)), + )) + .await?; + + // TODO: Match on error variant and fallback to class replication if + // the error was `Error::EbtReplicate`. + + /* // Fallback to classic replication. ch_broker .send(BrokerEvent::new( @@ -641,6 +705,7 @@ impl EbtManager { BrokerMessage::Connection(ConnectionEvent::ReplicatingClassic(connection_data)), )) .await?; + */ Ok(()) } @@ -684,17 +749,17 @@ impl EbtManager { EbtEvent::RequestSession(connection_data) => { self.handle_request_session(connection_data).await; } - EbtEvent::SessionInitiated(req_no, peer_ssb_id, session_role) => { - if let Err(err) = self.handle_session_initiated(req_no, peer_ssb_id, session_role).await { + EbtEvent::SessionInitiated(connection_id, req_no, peer_ssb_id, session_role) => { + if let Err(err) = self.handle_session_initiated(connection_id, req_no, peer_ssb_id, session_role).await { error!("Error while handling 'session initiated' event: {}", err) } } - EbtEvent::SendClock(req_no, clock) => { + EbtEvent::SendClock(connection_id, req_no, clock) => { trace!(target: "ebt-replication", "Sending vector clock: {:?}", clock); - let _ = self.handle_send_clock(req_no, clock); + let _ = self.handle_send_clock(connection_id, req_no, clock); } - EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock) => { - if let Err(err) = self.handle_received_clock(req_no, peer_ssb_id, clock).await { + EbtEvent::ReceivedClock(connection_id, req_no, peer_ssb_id, clock) => { + if let Err(err) = self.handle_received_clock(connection_id, req_no, peer_ssb_id, clock).await { error!("Error while handling 'received clock' event: {}", err) } } @@ -703,17 +768,17 @@ impl EbtManager { error!("Error while handling 'received message' event: {}", err) } } - EbtEvent::SendMessage(_req_no, peer_ssb_id, msg) => { + EbtEvent::SendMessage(_connection_id, _req_no, peer_ssb_id, msg) => { trace!(target: "ebt-replication", "Sending message: {:?}...", msg); if let Err(err) = self.handle_send_message(peer_ssb_id, msg).await { error!("Error while handling 'send message' event: {}", err) } } - EbtEvent::SessionConcluded(connection_data) => { - self.handle_session_concluded(connection_data).await; + EbtEvent::SessionConcluded(connection_id, peer_ssb_id) => { + self.handle_session_concluded(connection_id, peer_ssb_id).await; } - EbtEvent::SessionTimeout(connection_data) => { - if let Err(err) = self.handle_session_timeout(connection_data).await { + EbtEvent::SessionTimeout(connection_data, peer_ssb_id) => { + if let Err(err) = self.handle_session_timeout(connection_data, peer_ssb_id).await { error!("Error while handling 'session timeout' event: {}", err) } } @@ -723,14 +788,19 @@ impl EbtManager { } } } - } else if let Some(BrokerMessage::StoreKv(StoreKvEvent(ssb_id))) = msg { + } else if let Some(BrokerMessage::StoreKv(StoreKvEvent(_ssb_id))) = msg { debug!("Received KV store event from broker"); + /* + TODO: Reintroduce this later, once Manyverse restart + issue is solved. + // Respond to a key-value store state change for the given peer. // This is triggered when a new message is appended to the local feed. if let Err(err) = self.handle_local_store_updated(ssb_id).await { error!("Error while handling 'local store updated' event: {}", err) } + */ } } } From 5cab202714d0aea1ec64e2b5276b9da6e0317051 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Sun, 4 Feb 2024 09:18:01 +0200 Subject: [PATCH 05/10] fix the request number sign based on session role --- solar/src/actors/muxrpc/ebt.rs | 43 +++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/solar/src/actors/muxrpc/ebt.rs b/solar/src/actors/muxrpc/ebt.rs index 61fc4ba..7d5a49b 100644 --- a/solar/src/actors/muxrpc/ebt.rs +++ b/solar/src/actors/muxrpc/ebt.rs @@ -104,7 +104,22 @@ where } // Handle a broker message. RpcInput::Message(msg) => match msg { - BrokerMessage::Ebt(EbtEvent::SendClock(conn_id, req_no, clock)) => { + BrokerMessage::Ebt(EbtEvent::SendClock(conn_id, req_no, clock, session_role)) => { + // This is, regrettably, rather unintuitive. + // + // `api.ebt_clock_res_send()` internally calls + // `send_response()` which applies the negative sign to the + // given request number. However, the request number should + // always be positive when we are acting as the requester - + // even though we are sending a "response". This is why we + // apply a negative here for the session requester: so that + // the double negation results in a response with a + // positive request number. + let req_no = match session_role { + SessionRole::Requester => -(*req_no), + SessionRole::Responder => *req_no, + }; + // Only send the clock if the associated connection is // being handled by this instance of the handler. // @@ -115,14 +130,31 @@ where // Serialize the vector clock as a JSON string. let json_clock = serde_json::to_string(&clock)?; // The request number must be negative (response). - api.ebt_clock_res_send(-(*req_no), &json_clock).await?; + api.ebt_clock_res_send(req_no, &json_clock).await?; - trace!(target: "ebt", "Sent clock to connection {} with request number {}", conn_id, req_no); + trace!(target: "ebt", "Sent clock to connection {} with request number {} as {}", conn_id, req_no, session_role); } Ok(false) } - BrokerMessage::Ebt(EbtEvent::SendMessage(conn_id, req_no, ssb_id, msg)) => { + BrokerMessage::Ebt(EbtEvent::SendMessage( + conn_id, + req_no, + ssb_id, + msg, + session_role, + )) => { + // Define the sign of the request number based on session + // role (note: this is the opposite sign of the number + // that will ultimately be sent. + // + // See the comment in the `SendClock` event above for + // further explantation. + let req_no = match session_role { + SessionRole::Requester => -(*req_no), + SessionRole::Responder => *req_no, + }; + // Only send the message if the associated connection is // being handled by this instance of the handler. // @@ -136,8 +168,7 @@ where // Ensure the message is sent to the correct peer. if peer_ssb_id == *ssb_id { let json_msg = msg.to_string(); - // The request number must be negative (response). - api.ebt_feed_res_send(-(*req_no), &json_msg).await?; + api.ebt_feed_res_send(req_no, &json_msg).await?; trace!(target: "ebt", "Sent message to {} on connection {}", ssb_id, conn_id); } From 312fc0e405bf8511ca2b2e8a437475aea9b04ae3 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Sun, 4 Feb 2024 09:19:21 +0200 Subject: [PATCH 06/10] track session role and improve session error matching --- solar/src/actors/replication/ebt/manager.rs | 193 +++++++++++--------- 1 file changed, 109 insertions(+), 84 deletions(-) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index fe6675a..c02e980 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -44,19 +44,21 @@ use crate::{ Error, Result, }; +type ErrorMsg = String; + /// EBT replication events. #[derive(Debug, Clone)] pub enum EbtEvent { WaitForSessionRequest(ConnectionData), RequestSession(ConnectionData), SessionInitiated(ConnectionId, ReqNo, SsbId, SessionRole), - SendClock(ConnectionId, ReqNo, VectorClock), - SendMessage(ConnectionId, ReqNo, SsbId, Value), + SendClock(ConnectionId, ReqNo, VectorClock, SessionRole), + SendMessage(ConnectionId, ReqNo, SsbId, Value, SessionRole), ReceivedClock(ConnectionId, ReqNo, SsbId, VectorClock), ReceivedMessage(Message), SessionConcluded(ConnectionId, SsbId), SessionTimeout(ConnectionData, SsbId), - Error(ConnectionData, Option, SsbId, String), + Error(ConnectionData, Option, SsbId, ErrorMsg), } /// Role of a peer in an EBT session. @@ -78,7 +80,7 @@ impl Display for SessionRole { #[derive(Debug)] pub struct EbtManager { /// Active EBT peer sessions. - active_sessions: HashSet<(ConnectionId, SsbId)>, + active_sessions: HashMap, /// Duration to wait before switching feed request to a different peer. _feed_wait_timeout: u64, /// The state of the replication loop. @@ -97,13 +99,12 @@ pub struct EbtManager { /// Duration to wait for a connected peer to initiate an EBT session. session_wait_timeout: u64, /// The latest vector clock sent for each session, identified by the - /// request number. - // - // TODO: Do we want to remove each entry when the session concludes? + /// connection ID. + // TODO: Should we include the SsbId? Then we can track clocks sent to + // each peer across multiple sessions. // - // TODO: Rather use connection ID as the key (req no is not unique - // across sessions). - sent_clocks: HashMap, + // Based on current usage, this could just be a HashSet of ConnectionId. + sent_clocks: HashMap, /// The sequence number of the latest message sent to each peer /// for each requested feed. sent_messages: HashMap>, @@ -112,7 +113,7 @@ pub struct EbtManager { impl Default for EbtManager { fn default() -> Self { EbtManager { - active_sessions: HashSet::new(), + active_sessions: HashMap::new(), _feed_wait_timeout: 3, _is_replication_loop_active: false, local_clock: HashMap::new(), @@ -280,20 +281,33 @@ impl EbtManager { /// Register a new EBT session for the given peer. //fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) { - fn register_session(&mut self, connection_id: ConnectionId, peer_ssb_id: &SsbId) { + fn register_session( + &mut self, + connection_id: ConnectionId, + peer_ssb_id: &SsbId, + session_role: SessionRole, + ) { self.active_sessions - .insert((connection_id, peer_ssb_id.to_owned())); + .insert(connection_id, (peer_ssb_id.to_owned(), session_role)); trace!(target: "ebt-session", "Registered new EBT session for connection {} with {}", connection_id, peer_ssb_id); } /// Remove the given peer from the list of active session. - fn remove_session(&mut self, connection_id: ConnectionId, peer_ssb_id: &SsbId) { + fn remove_session(&mut self, connection_id: ConnectionId) { // TODO: Clean-up the string story so we're not sprinkling additional // allocations everywhere. - let _ = self - .active_sessions - .remove(&(connection_id, peer_ssb_id.to_owned())); + let _ = self.active_sessions.remove(&connection_id); + } + + /// Return the role of the local peer for the active session (represented + /// by connection ID). + fn session_role(&self, connection_id: ConnectionId) -> Option { + if let Some((_ssb_id, session_role)) = self.active_sessions.get(&connection_id) { + Some(session_role.to_owned()) + } else { + None + } } /// Revoke a replication request for the feed represented by the given SSB @@ -390,14 +404,11 @@ impl EbtManager { // Only proceed with session initiation if there // is no currently active session with the given peer. - if !self - .active_sessions - .contains(&(connection_data.id, peer_ssb_id)) - { + if !self.active_sessions.contains_key(&connection_data.id) { trace!( target: "ebt", - "Requesting an EBT session with {:?}", - connection_data.peer_public_key.unwrap() + "Requesting an EBT session with {}", + peer_ssb_id ); task::spawn(replicator::run( @@ -418,7 +429,7 @@ impl EbtManager { ) -> Result<()> { trace!(target: "ebt-replication", "Initiated EBT session with {} as {}", peer_ssb_id, session_role); - self.register_session(connection_id, &peer_ssb_id); + self.register_session(connection_id, &peer_ssb_id, session_role.to_owned()); let local_clock = self.local_clock.to_owned(); match session_role { @@ -431,7 +442,12 @@ impl EbtManager { ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SendClock(connection_id, req_no, local_clock)), + BrokerMessage::Ebt(EbtEvent::SendClock( + connection_id, + req_no, + local_clock, + SessionRole::Responder, + )), )) .await?; } @@ -446,12 +462,10 @@ impl EbtManager { fn handle_send_clock( &mut self, - _connection_id: ConnectionId, - req_no: ReqNo, + connection_id: ConnectionId, clock: VectorClock, ) -> Option { - // TODO: Include connection ID in sent clock tracking. - self.sent_clocks.insert(req_no, clock) + self.sent_clocks.insert(connection_id, clock) } async fn handle_received_clock( @@ -469,40 +483,43 @@ impl EbtManager { // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); - // If a clock is received without a prior EBT replicate - // request having been received from the associated peer, it is - // assumed that the clock was sent in response to a locally-sent - // EBT replicate request. Ie. the session was requested by the - // local peer. - // - // TODO: Can we avoid the allocation on `peer_ssb_id`? - if !self - .active_sessions - .contains(&(connection_id, peer_ssb_id.to_owned())) - { - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SessionInitiated( - connection_id, - req_no, - peer_ssb_id.to_owned(), - SessionRole::Requester, - )), - )) - .await?; - } + // TODO: What if we initiated a session as requester when sending + // replicate request? That might simply things. + let session_role = match self.session_role(connection_id) { + Some(role) => role, + None => { + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::SessionInitiated( + connection_id, + req_no, + peer_ssb_id.to_owned(), + SessionRole::Requester, + )), + )) + .await?; + + SessionRole::Requester + } + }; - // If we have not previously sent a clock, send one now. + // If we have not previously sent a clock during this connection, + // send one now. Connection is used here as a proxy for session. // // This indicates that the local peer is acting as the session // requester. - if self.sent_clocks.get(&req_no).is_none() { + if self.sent_clocks.get(&connection_id).is_none() { let local_clock = self.local_clock.to_owned(); ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SendClock(connection_id, req_no, local_clock)), + BrokerMessage::Ebt(EbtEvent::SendClock( + connection_id, + req_no, + local_clock, + SessionRole::Requester, + )), )) .await?; } @@ -519,6 +536,7 @@ impl EbtManager { req_no, peer_ssb_id.to_owned(), msg, + session_role.to_owned(), )), )) .await?; @@ -642,7 +660,7 @@ impl EbtManager { async fn handle_session_concluded(&mut self, connection_id: ConnectionId, peer_ssb_id: SsbId) { trace!(target: "ebt-replication", "Session concluded for connection {} with {}", connection_id, peer_ssb_id); - self.remove_session(connection_id, &peer_ssb_id); + self.remove_session(connection_id); } async fn handle_session_timeout( @@ -657,7 +675,7 @@ impl EbtManager { // for now out of caution. // // TODO: Remove this line when it's clear that it's not needed. - self.remove_session(connection_data.id, &peer_ssb_id); + self.remove_session(connection_data.id); // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); @@ -678,34 +696,41 @@ impl EbtManager { connection_data: ConnectionData, req_no: Option, peer_ssb_id: SsbId, - err_msg: String, + error_msg: ErrorMsg, ) -> Result<()> { - trace!(target: "ebt-replication", "Session error with {} for request number {:?}: {}", peer_ssb_id, req_no, err_msg); + trace!(target: "ebt-replication", "Session error with {} for request number {:?}: {}", peer_ssb_id, req_no, error_msg); - self.remove_session(connection_data.id, &peer_ssb_id); + self.remove_session(connection_data.id); // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Connection(ConnectionEvent::Disconnecting(connection_data)), - )) - .await?; - - // TODO: Match on error variant and fallback to class replication if - // the error was `Error::EbtReplicate`. - - /* - // Fallback to classic replication. - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Connection(ConnectionEvent::ReplicatingClassic(connection_data)), - )) - .await?; - */ + if error_msg.starts_with("Serde JSON error") + || error_msg.starts_with("EBT replication error") + { + // Either the received EBT replicate request was invalid or the sent + // EBT replicate request received an error response from the remote + // peer. + // + // Fallback to classic replication. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Connection(ConnectionEvent::ReplicatingClassic(connection_data)), + )) + .await?; + } else { + // Something else went wrong. Kill the connection. + // + // TODO: In the future we may want to match on other specific error + // variants. For now, this is good enough. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Connection(ConnectionEvent::Disconnecting(connection_data)), + )) + .await?; + } Ok(()) } @@ -754,9 +779,9 @@ impl EbtManager { error!("Error while handling 'session initiated' event: {}", err) } } - EbtEvent::SendClock(connection_id, req_no, clock) => { + EbtEvent::SendClock(connection_id, _req_no, clock, _session_role) => { trace!(target: "ebt-replication", "Sending vector clock: {:?}", clock); - let _ = self.handle_send_clock(connection_id, req_no, clock); + let _ = self.handle_send_clock(connection_id, clock); } EbtEvent::ReceivedClock(connection_id, req_no, peer_ssb_id, clock) => { if let Err(err) = self.handle_received_clock(connection_id, req_no, peer_ssb_id, clock).await { @@ -768,7 +793,7 @@ impl EbtManager { error!("Error while handling 'received message' event: {}", err) } } - EbtEvent::SendMessage(_connection_id, _req_no, peer_ssb_id, msg) => { + EbtEvent::SendMessage(_connection_id, _req_no, peer_ssb_id, msg, _session_role) => { trace!(target: "ebt-replication", "Sending message: {:?}...", msg); if let Err(err) = self.handle_send_message(peer_ssb_id, msg).await { error!("Error while handling 'send message' event: {}", err) @@ -782,8 +807,8 @@ impl EbtManager { error!("Error while handling 'session timeout' event: {}", err) } } - EbtEvent::Error(connection_data, req_no, peer_ssb_id, err_msg) => { - if let Err(err) = self.handle_error(connection_data, req_no, peer_ssb_id, err_msg).await { + EbtEvent::Error(connection_data, req_no, peer_ssb_id, error_msg) => { + if let Err(err) = self.handle_error(connection_data, req_no, peer_ssb_id, error_msg).await { error!("Error while handling 'error' event: {}", err) } } From 4abff6c747c75bc5af647a4b7f0e99aeed0c7cb4 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Sun, 4 Feb 2024 09:20:32 +0200 Subject: [PATCH 07/10] remove ebt error event for replicate --- .../src/actors/replication/ebt/replicator.rs | 24 +++---------------- 1 file changed, 3 insertions(+), 21 deletions(-) diff --git a/solar/src/actors/replication/ebt/replicator.rs b/solar/src/actors/replication/ebt/replicator.rs index 74f8c53..2cd9839 100644 --- a/solar/src/actors/replication/ebt/replicator.rs +++ b/solar/src/actors/replication/ebt/replicator.rs @@ -83,6 +83,7 @@ pub async fn run( // received. let ebt_session_start = Instant::now(); + // TODO: Could this be moved into the MUXRPC EBT handler? if let SessionRole::Requester = session_role { // Send EBT request. let ebt_args = EbtReplicate::default(); @@ -136,27 +137,6 @@ pub async fn run( Err(err) => { error!("EBT replicate handler failed: {:?}", err); - /* - // TODO: This match is not exhaustive. - // - // We end up in a situation where the connection is not - // cleaned up (`ConnectionEvent::Disconnecting`) and the - // session is not removed. - if let Error::EbtReplicate((req_no, err_msg)) = err { - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::Error( - connection_data, - req_no, - peer_ssb_id.to_owned(), - err_msg, - )), - )) - .await?; - } - */ - ch_broker .send(BrokerEvent::new( Destination::Broadcast, @@ -200,6 +180,8 @@ pub async fn run( } } + // TODO: Consider including session role in SessionConcluded so that we can + // await another request if acting as the responder. ch_broker .send(BrokerEvent::new( Destination::Broadcast, From 90d4fb68b2b089783331f0041c734f11e59fde89 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Tue, 6 Feb 2024 08:55:48 +0200 Subject: [PATCH 08/10] separate handler code into methods --- solar/src/actors/muxrpc/ebt.rs | 135 ++++++++++++++++++--------------- 1 file changed, 74 insertions(+), 61 deletions(-) diff --git a/solar/src/actors/muxrpc/ebt.rs b/solar/src/actors/muxrpc/ebt.rs index 7d5a49b..d15eb46 100644 --- a/solar/src/actors/muxrpc/ebt.rs +++ b/solar/src/actors/muxrpc/ebt.rs @@ -1,12 +1,12 @@ //! Epidemic Broadcast Tree (EBT) Replication Handler. -use std::{collections::HashMap, marker::PhantomData}; +use std::marker::PhantomData; use async_std::io::Write; use futures::SinkExt; use kuska_ssb::{ api::{ - dto::{self, content::SsbId}, + dto::{self}, ApiCaller, ApiMethod, }, feed::{Feed as MessageKvt, Message}, @@ -30,7 +30,10 @@ where W: Write + Unpin + Send + Sync, { /// EBT-related requests which are known and allowed. - active_requests: HashMap, + // TODO: Include connection ID as key. Then we can remove request ID from + // all `EbtEvent` variants and simply look-up the request ID associated + // with the connection ID (as defined in the `EbtEvent` data). + active_request: ReqNo, phantom: PhantomData, } @@ -38,6 +41,14 @@ impl EbtReplicateHandler where W: Write + Unpin + Send + Sync, { + /// Instantiate a new instance of `EbtReplicateHandler`. + pub fn new() -> Self { + Self { + active_request: 0, + phantom: PhantomData, + } + } + /// Handle an RPC event. pub async fn handle( &mut self, @@ -45,49 +56,27 @@ where op: &RpcInput, ch_broker: &mut ChBrokerSend, peer_ssb_id: String, - active_request: Option, connection_id: usize, + active_req_no: Option, ) -> Result { trace!(target: "muxrpc-ebt-handler", "Received MUXRPC input: {:?}", op); // An outbound EBT replicate request was made before the handler was - // called; add it to the map of active requests. - if let Some(session_req_no) = active_request { - let _ = self - .active_requests - .insert(session_req_no, peer_ssb_id.to_owned()); + // called. + if let Some(req_no) = active_req_no { + self.active_request = req_no } match op { // Handle an incoming MUXRPC request. RpcInput::Network(req_no, rpc::RecvMsg::RpcRequest(req)) => { - match ApiMethod::from_rpc_body(req) { - Some(ApiMethod::EbtReplicate) => { - self.recv_ebtreplicate(api, *req_no, req, peer_ssb_id, connection_id) - .await - } - _ => Ok(false), - } + self.recv_rpc_request(api, *req_no, req, peer_ssb_id, connection_id) + .await } + // Hanlde an incoming 'other' MUXRPC request. RpcInput::Network(req_no, rpc::RecvMsg::OtherRequest(_type, req)) => { - // Attempt to deserialize bytes into vector clock hashmap. - // If the deserialization is successful, emit a 'received clock' - // event. - if let Ok(clock) = serde_json::from_slice(req) { - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::ReceivedClock( - connection_id, - *req_no, - peer_ssb_id, - clock, - )), - )) - .await?; - } - - Ok(false) + self.recv_other_request(ch_broker, *req_no, req, peer_ssb_id, connection_id) + .await } // Handle an incoming MUXRPC response. RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => { @@ -162,16 +151,10 @@ where // whom we have an active session and matching request // number. if *conn_id == connection_id { - // TODO: Remove this check; made redundant by the - // connection ID check. - // - // Ensure the message is sent to the correct peer. - if peer_ssb_id == *ssb_id { - let json_msg = msg.to_string(); - api.ebt_feed_res_send(req_no, &json_msg).await?; - - trace!(target: "ebt", "Sent message to {} on connection {}", ssb_id, conn_id); - } + let json_msg = msg.to_string(); + api.ebt_feed_res_send(req_no, &json_msg).await?; + + trace!(target: "ebt", "Sent message to {} on connection {}", ssb_id, conn_id); } Ok(false) @@ -181,17 +164,22 @@ where _ => Ok(false), } } -} -impl EbtReplicateHandler -where - W: Write + Unpin + Send + Sync, -{ - /// Instantiate a new instance of `EbtReplicateHandler`. - pub fn new() -> Self { - Self { - active_requests: HashMap::new(), - phantom: PhantomData, + /// Process an incoming MUXRPC request. + async fn recv_rpc_request( + &mut self, + api: &mut ApiCaller, + req_no: ReqNo, + req: &rpc::Body, + peer_ssb_id: String, + connection_id: usize, + ) -> Result { + match ApiMethod::from_rpc_body(req) { + Some(ApiMethod::EbtReplicate) => { + self.recv_ebtreplicate(api, req_no, req, peer_ssb_id, connection_id) + .await + } + _ => Ok(false), } } @@ -230,9 +218,8 @@ where trace!(target: "ebt-handler", "Successfully validated replicate request arguments"); - // Insert the request number and peer public key into the active - // requests map. - self.active_requests.insert(req_no, peer_ssb_id.to_owned()); + // Set the request number for this session. + self.active_request = req_no; ch_broker .send(BrokerEvent::new( @@ -249,6 +236,35 @@ where Ok(false) } + /// Process an incoming MUXRPC request containing a vector clock. + async fn recv_other_request( + &mut self, + ch_broker: &mut ChBrokerSend, + req_no: ReqNo, + req: &[u8], + peer_ssb_id: String, + connection_id: usize, + ) -> Result { + // Attempt to deserialize bytes into vector clock hashmap. + // If the deserialization is successful, emit a 'received clock' + // event. + if let Ok(clock) = serde_json::from_slice(req) { + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::ReceivedClock( + connection_id, + req_no, + peer_ssb_id, + clock, + )), + )) + .await?; + } + + Ok(false) + } + /// Process an incoming MUXRPC response. /// The response is expected to contain a vector clock or an SSB message. async fn recv_rpc_response( @@ -264,7 +280,7 @@ where // Only handle the response if the associated request number is known // to us, either because we sent or received the initiating replicate // request. - if self.active_requests.contains_key(&req_no) { + if self.active_request == req_no || self.active_request == -(req_no) { // The response may be a vector clock (aka. notes) or an SSB message. // // Since there is no explicit way to determine which was received, @@ -313,7 +329,6 @@ where async fn recv_cancelstream(&mut self, api: &mut ApiCaller, req_no: ReqNo) -> Result { trace!(target: "ebt-handler", "Received cancel stream RPC response: {}", req_no); - self.active_requests.remove(&req_no); api.rpc().send_stream_eof(-req_no).await?; Ok(true) @@ -324,8 +339,6 @@ where async fn recv_error_response(&mut self, req_no: ReqNo, err_msg: &str) -> Result { warn!("Received MUXRPC error response: {}", err_msg); - self.active_requests.remove(&req_no); - Err(Error::EbtReplicate((req_no, err_msg.to_string()))) } } From 65e4da268dd273ecf4409427f9f14779e98ffb09 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Tue, 6 Feb 2024 08:56:35 +0200 Subject: [PATCH 09/10] simplify the error handling --- solar/src/actors/replication/ebt/manager.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index c02e980..5200a1a 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -51,6 +51,8 @@ type ErrorMsg = String; pub enum EbtEvent { WaitForSessionRequest(ConnectionData), RequestSession(ConnectionData), + // TODO: See if we can remove `ReqNo` from all these events. + // Then `ReqNo` lives purely inside the MUXRPC EBT handler. SessionInitiated(ConnectionId, ReqNo, SsbId, SessionRole), SendClock(ConnectionId, ReqNo, VectorClock, SessionRole), SendMessage(ConnectionId, ReqNo, SsbId, Value, SessionRole), @@ -58,7 +60,7 @@ pub enum EbtEvent { ReceivedMessage(Message), SessionConcluded(ConnectionId, SsbId), SessionTimeout(ConnectionData, SsbId), - Error(ConnectionData, Option, SsbId, ErrorMsg), + Error(ConnectionData, SsbId, ErrorMsg), } /// Role of a peer in an EBT session. @@ -694,11 +696,10 @@ impl EbtManager { async fn handle_error( &mut self, connection_data: ConnectionData, - req_no: Option, peer_ssb_id: SsbId, error_msg: ErrorMsg, ) -> Result<()> { - trace!(target: "ebt-replication", "Session error with {} for request number {:?}: {}", peer_ssb_id, req_no, error_msg); + trace!(target: "ebt-replication", "Session error with {}: {}", peer_ssb_id, error_msg); self.remove_session(connection_data.id); @@ -807,8 +808,8 @@ impl EbtManager { error!("Error while handling 'session timeout' event: {}", err) } } - EbtEvent::Error(connection_data, req_no, peer_ssb_id, error_msg) => { - if let Err(err) = self.handle_error(connection_data, req_no, peer_ssb_id, error_msg).await { + EbtEvent::Error(connection_data, peer_ssb_id, error_msg) => { + if let Err(err) = self.handle_error(connection_data, peer_ssb_id, error_msg).await { error!("Error while handling 'error' event: {}", err) } } From 165bb4b15554038cefbe6f57da0c0387019d67e0 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Tue, 6 Feb 2024 08:57:21 +0200 Subject: [PATCH 10/10] change variable names slightly --- solar/src/actors/replication/ebt/replicator.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/solar/src/actors/replication/ebt/replicator.rs b/solar/src/actors/replication/ebt/replicator.rs index 2cd9839..1a7cf08 100644 --- a/solar/src/actors/replication/ebt/replicator.rs +++ b/solar/src/actors/replication/ebt/replicator.rs @@ -75,7 +75,7 @@ pub async fn run( trace!(target: "ebt-session", "Initiating EBT replication session with: {}", peer_ssb_id); let mut session_initiated = false; - let mut replicate_req_no = None; + let mut active_req_no = None; // Record the time at which we begin the EBT session. // @@ -83,15 +83,13 @@ pub async fn run( // received. let ebt_session_start = Instant::now(); - // TODO: Could this be moved into the MUXRPC EBT handler? if let SessionRole::Requester = session_role { // Send EBT request. let ebt_args = EbtReplicate::default(); let req_no = api.ebt_replicate_req_send(&ebt_args).await?; - // Set the request number to be passed into the MUXRPC EBT handler. - // This allows tracking of the request (ensuring we respond to - // MUXRPC responses with this request number). - replicate_req_no = Some(req_no); + + // Set the request number for this session. + active_req_no = Some(req_no); } loop { @@ -111,7 +109,7 @@ pub async fn run( if let Some(BrokerMessage::Ebt(EbtEvent::SessionInitiated(_connection_id, ref req_no, ref ssb_id, ref session_role))) = msg { if peer_ssb_id == *ssb_id && *session_role == SessionRole::Responder { session_initiated = true; - replicate_req_no = Some(*req_no); + active_req_no = Some(*req_no); } } if let Some(msg) = msg { @@ -127,9 +125,11 @@ pub async fn run( &mut api, &input, &mut ch_broker, + // TODO: Can we remove this? + // We could look it up from the connection ID instead. peer_ssb_id.to_owned(), - replicate_req_no, connection_data.id, + active_req_no, ) .await { @@ -142,7 +142,6 @@ pub async fn run( Destination::Broadcast, BrokerMessage::Ebt(EbtEvent::Error( connection_data, - replicate_req_no, peer_ssb_id.to_owned(), err.to_string(), )),