diff --git a/solar/src/actors/muxrpc/ebt.rs b/solar/src/actors/muxrpc/ebt.rs index e2deec4..d15eb46 100644 --- a/solar/src/actors/muxrpc/ebt.rs +++ b/solar/src/actors/muxrpc/ebt.rs @@ -1,12 +1,12 @@ //! Epidemic Broadcast Tree (EBT) Replication Handler. -use std::{collections::HashMap, marker::PhantomData}; +use std::marker::PhantomData; use async_std::io::Write; use futures::SinkExt; use kuska_ssb::{ api::{ - dto::{self, content::SsbId}, + dto::{self}, ApiCaller, ApiMethod, }, feed::{Feed as MessageKvt, Message}, @@ -30,7 +30,10 @@ where W: Write + Unpin + Send + Sync, { /// EBT-related requests which are known and allowed. - active_requests: HashMap, + // TODO: Include connection ID as key. Then we can remove request ID from + // all `EbtEvent` variants and simply look-up the request ID associated + // with the connection ID (as defined in the `EbtEvent` data). + active_request: ReqNo, phantom: PhantomData, } @@ -38,6 +41,14 @@ impl EbtReplicateHandler where W: Write + Unpin + Send + Sync, { + /// Instantiate a new instance of `EbtReplicateHandler`. + pub fn new() -> Self { + Self { + active_request: 0, + phantom: PhantomData, + } + } + /// Handle an RPC event. pub async fn handle( &mut self, @@ -45,50 +56,31 @@ where op: &RpcInput, ch_broker: &mut ChBrokerSend, peer_ssb_id: String, - active_request: Option, + connection_id: usize, + active_req_no: Option, ) -> Result { trace!(target: "muxrpc-ebt-handler", "Received MUXRPC input: {:?}", op); // An outbound EBT replicate request was made before the handler was - // called; add it to the map of active requests. - if let Some(session_req_no) = active_request { - let _ = self - .active_requests - .insert(session_req_no, peer_ssb_id.to_owned()); + // called. + if let Some(req_no) = active_req_no { + self.active_request = req_no } match op { // Handle an incoming MUXRPC request. RpcInput::Network(req_no, rpc::RecvMsg::RpcRequest(req)) => { - match ApiMethod::from_rpc_body(req) { - Some(ApiMethod::EbtReplicate) => { - self.recv_ebtreplicate(api, *req_no, req, peer_ssb_id).await - } - _ => Ok(false), - } + self.recv_rpc_request(api, *req_no, req, peer_ssb_id, connection_id) + .await } + // Hanlde an incoming 'other' MUXRPC request. RpcInput::Network(req_no, rpc::RecvMsg::OtherRequest(_type, req)) => { - // Attempt to deserialize bytes into vector clock hashmap. - // If the deserialization is successful, emit a 'received clock' - // event. - if let Ok(clock) = serde_json::from_slice(req) { - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::ReceivedClock( - *req_no, - peer_ssb_id, - clock, - )), - )) - .await?; - } - - Ok(false) + self.recv_other_request(ch_broker, *req_no, req, peer_ssb_id, connection_id) + .await } // Handle an incoming MUXRPC response. RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => { - self.recv_rpc_response(ch_broker, *req_no, res, peer_ssb_id) + self.recv_rpc_response(ch_broker, *req_no, res, peer_ssb_id, connection_id) .await } // Handle an incoming MUXRPC 'cancel stream' response. @@ -101,20 +93,68 @@ where } // Handle a broker message. RpcInput::Message(msg) => match msg { - BrokerMessage::Ebt(EbtEvent::SendClock(req_no, clock)) => { - // Serialize the vector clock as a JSON string. - let json_clock = serde_json::to_string(&clock)?; - // The request number must be negative (response). - api.ebt_clock_res_send(-(*req_no), &json_clock).await?; + BrokerMessage::Ebt(EbtEvent::SendClock(conn_id, req_no, clock, session_role)) => { + // This is, regrettably, rather unintuitive. + // + // `api.ebt_clock_res_send()` internally calls + // `send_response()` which applies the negative sign to the + // given request number. However, the request number should + // always be positive when we are acting as the requester - + // even though we are sending a "response". This is why we + // apply a negative here for the session requester: so that + // the double negation results in a response with a + // positive request number. + let req_no = match session_role { + SessionRole::Requester => -(*req_no), + SessionRole::Responder => *req_no, + }; + + // Only send the clock if the associated connection is + // being handled by this instance of the handler. + // + // This prevents the clock being sent to every peer with + // whom we have an active session and matching request + // number. + if *conn_id == connection_id { + // Serialize the vector clock as a JSON string. + let json_clock = serde_json::to_string(&clock)?; + // The request number must be negative (response). + api.ebt_clock_res_send(req_no, &json_clock).await?; + + trace!(target: "ebt", "Sent clock to connection {} with request number {} as {}", conn_id, req_no, session_role); + } Ok(false) } - BrokerMessage::Ebt(EbtEvent::SendMessage(req_no, ssb_id, msg)) => { - // Ensure the message is sent to the correct peer. - if peer_ssb_id == *ssb_id { + BrokerMessage::Ebt(EbtEvent::SendMessage( + conn_id, + req_no, + ssb_id, + msg, + session_role, + )) => { + // Define the sign of the request number based on session + // role (note: this is the opposite sign of the number + // that will ultimately be sent. + // + // See the comment in the `SendClock` event above for + // further explantation. + let req_no = match session_role { + SessionRole::Requester => -(*req_no), + SessionRole::Responder => *req_no, + }; + + // Only send the message if the associated connection is + // being handled by this instance of the handler. + // + // This prevents the message being sent to every peer with + // whom we have an active session and matching request + // number. + if *conn_id == connection_id { let json_msg = msg.to_string(); - // The request number must be negative (response). - api.ebt_feed_res_send(-(*req_no), &json_msg).await?; + api.ebt_feed_res_send(req_no, &json_msg).await?; + + trace!(target: "ebt", "Sent message to {} on connection {}", ssb_id, conn_id); } Ok(false) @@ -124,17 +164,22 @@ where _ => Ok(false), } } -} -impl EbtReplicateHandler -where - W: Write + Unpin + Send + Sync, -{ - /// Instantiate a new instance of `EbtReplicateHandler`. - pub fn new() -> Self { - Self { - active_requests: HashMap::new(), - phantom: PhantomData, + /// Process an incoming MUXRPC request. + async fn recv_rpc_request( + &mut self, + api: &mut ApiCaller, + req_no: ReqNo, + req: &rpc::Body, + peer_ssb_id: String, + connection_id: usize, + ) -> Result { + match ApiMethod::from_rpc_body(req) { + Some(ApiMethod::EbtReplicate) => { + self.recv_ebtreplicate(api, req_no, req, peer_ssb_id, connection_id) + .await + } + _ => Ok(false), } } @@ -145,6 +190,7 @@ where req_no: ReqNo, req: &rpc::Body, peer_ssb_id: String, + connection_id: usize, ) -> Result { // Deserialize the args from an incoming EBT replicate request. let mut args: Vec = serde_json::from_value(req.args.clone())?; @@ -172,14 +218,14 @@ where trace!(target: "ebt-handler", "Successfully validated replicate request arguments"); - // Insert the request number and peer public key into the active - // requests map. - self.active_requests.insert(req_no, peer_ssb_id.to_owned()); + // Set the request number for this session. + self.active_request = req_no; ch_broker .send(BrokerEvent::new( Destination::Broadcast, BrokerMessage::Ebt(EbtEvent::SessionInitiated( + connection_id, req_no, peer_ssb_id, SessionRole::Responder, @@ -190,6 +236,35 @@ where Ok(false) } + /// Process an incoming MUXRPC request containing a vector clock. + async fn recv_other_request( + &mut self, + ch_broker: &mut ChBrokerSend, + req_no: ReqNo, + req: &[u8], + peer_ssb_id: String, + connection_id: usize, + ) -> Result { + // Attempt to deserialize bytes into vector clock hashmap. + // If the deserialization is successful, emit a 'received clock' + // event. + if let Ok(clock) = serde_json::from_slice(req) { + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::ReceivedClock( + connection_id, + req_no, + peer_ssb_id, + clock, + )), + )) + .await?; + } + + Ok(false) + } + /// Process an incoming MUXRPC response. /// The response is expected to contain a vector clock or an SSB message. async fn recv_rpc_response( @@ -198,13 +273,14 @@ where req_no: ReqNo, res: &[u8], peer_ssb_id: String, + connection_id: usize, ) -> Result { trace!(target: "ebt-handler", "Received RPC response: {}", req_no); // Only handle the response if the associated request number is known // to us, either because we sent or received the initiating replicate // request. - if self.active_requests.contains_key(&req_no) { + if self.active_request == req_no || self.active_request == -(req_no) { // The response may be a vector clock (aka. notes) or an SSB message. // // Since there is no explicit way to determine which was received, @@ -214,7 +290,12 @@ where ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock)), + BrokerMessage::Ebt(EbtEvent::ReceivedClock( + connection_id, + req_no, + peer_ssb_id, + clock, + )), )) .await?; } else { @@ -243,11 +324,12 @@ where Ok(false) } - /// Close the stream and remove the associated request from the map of - /// active requests. + /// Remove the associated request from the map of active requests and close + /// the stream. async fn recv_cancelstream(&mut self, api: &mut ApiCaller, req_no: ReqNo) -> Result { + trace!(target: "ebt-handler", "Received cancel stream RPC response: {}", req_no); + api.rpc().send_stream_eof(-req_no).await?; - self.active_requests.remove(&req_no); Ok(true) } @@ -257,82 +339,6 @@ where async fn recv_error_response(&mut self, req_no: ReqNo, err_msg: &str) -> Result { warn!("Received MUXRPC error response: {}", err_msg); - self.active_requests.remove(&req_no); - Err(Error::EbtReplicate((req_no, err_msg.to_string()))) } - - /* - /// Process an incoming MUXRPC response. The response is expected to - /// contain an SSB message. - async fn recv_rpc_response( - &mut self, - _api: &mut ApiCaller, - ch_broker: &mut ChBrokerSend, - req_no: i32, - res: &[u8], - ) -> Result { - // Only handle the response if we made the request. - if self.peers.contains_key(&req_no) { - // First try to deserialize the response into a message value. - // If that fails, try to deserialize into a message KVT and then - // convert that into a message value. Return an error if that fails. - // This approach allows us to handle the unlikely event that - // messages are sent as KVTs and not simply values. - // - // Validation of the message signature and fields is also performed - // as part of the call to `from_slice`. - let msg = match Message::from_slice(res) { - Ok(msg) => msg, - Err(_) => MessageKvt::from_slice(res)?.into_message()?, - }; - - // Retrieve the sequence number of the most recent message for - // the peer that authored the received message. - let last_seq = KV_STORE - .read() - .await - .get_latest_seq(&msg.author().to_string())? - .unwrap_or(0); - - // Validate the sequence number. - if msg.sequence() == last_seq + 1 { - // Append the message to the feed. - KV_STORE.write().await.append_feed(msg.clone()).await?; - - info!( - "received msg number {} from {}", - msg.sequence(), - msg.author() - ); - - // Extract blob references from the received message and - // request those blobs if they are not already in the local - // blobstore. - for key in self.extract_blob_refs(&msg) { - if !BLOB_STORE.read().await.exists(&key) { - let event = super::blobs_get::RpcBlobsGetEvent::Get(dto::BlobsGetIn { - key, - size: None, - max: None, - }); - let broker_msg = BrokerEvent::new(Destination::Broadcast, event); - ch_broker.send(broker_msg).await.unwrap(); - } - } - } else { - warn!( - "received out-of-order msg from {}; recv: {} db: {}", - &msg.author().to_string(), - msg.sequence(), - last_seq - ); - } - - Ok(true) - } else { - Ok(false) - } - } - */ } diff --git a/solar/src/actors/network/connection.rs b/solar/src/actors/network/connection.rs index 17911c6..79c9895 100644 --- a/solar/src/actors/network/connection.rs +++ b/solar/src/actors/network/connection.rs @@ -60,11 +60,14 @@ impl Display for TcpConnection { } } +/// Unique ID for a connection. +pub type ConnectionId = usize; + /// Connection data. #[derive(Debug, Default, Clone)] pub struct ConnectionData { /// Connection identifier. - pub id: usize, + pub id: ConnectionId, /// The address of the remote peer. pub peer_addr: Option, /// The public key of the remote peer. @@ -105,7 +108,7 @@ impl Display for ConnectionData { } impl ConnectionData { - pub fn new(id: usize) -> Self { + pub fn new(id: ConnectionId) -> Self { ConnectionData { id, ..ConnectionData::default() diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index c2d1167..5200a1a 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -28,7 +28,10 @@ use serde_json::Value; use crate::{ actors::{ muxrpc::{ReqNo, RpcBlobsGetEvent}, - network::{connection::ConnectionData, connection_manager::ConnectionEvent}, + network::{ + connection::{ConnectionData, ConnectionId}, + connection_manager::ConnectionEvent, + }, replication::{ blobs, ebt::{clock, replicator, EncodedClockValue, VectorClock}, @@ -41,19 +44,23 @@ use crate::{ Error, Result, }; +type ErrorMsg = String; + /// EBT replication events. #[derive(Debug, Clone)] pub enum EbtEvent { WaitForSessionRequest(ConnectionData), RequestSession(ConnectionData), - SessionInitiated(ReqNo, SsbId, SessionRole), - SendClock(ReqNo, VectorClock), - SendMessage(ReqNo, SsbId, Value), - ReceivedClock(ReqNo, SsbId, VectorClock), + // TODO: See if we can remove `ReqNo` from all these events. + // Then `ReqNo` lives purely inside the MUXRPC EBT handler. + SessionInitiated(ConnectionId, ReqNo, SsbId, SessionRole), + SendClock(ConnectionId, ReqNo, VectorClock, SessionRole), + SendMessage(ConnectionId, ReqNo, SsbId, Value, SessionRole), + ReceivedClock(ConnectionId, ReqNo, SsbId, VectorClock), ReceivedMessage(Message), - SessionConcluded(SsbId), - SessionTimeout(ConnectionData), - Error(ConnectionData, ReqNo, SsbId, String), + SessionConcluded(ConnectionId, SsbId), + SessionTimeout(ConnectionData, SsbId), + Error(ConnectionData, SsbId, ErrorMsg), } /// Role of a peer in an EBT session. @@ -75,7 +82,7 @@ impl Display for SessionRole { #[derive(Debug)] pub struct EbtManager { /// Active EBT peer sessions. - active_sessions: HashMap, + active_sessions: HashMap, /// Duration to wait before switching feed request to a different peer. _feed_wait_timeout: u64, /// The state of the replication loop. @@ -94,10 +101,12 @@ pub struct EbtManager { /// Duration to wait for a connected peer to initiate an EBT session. session_wait_timeout: u64, /// The latest vector clock sent for each session, identified by the - /// request number. + /// connection ID. + // TODO: Should we include the SsbId? Then we can track clocks sent to + // each peer across multiple sessions. // - // TODO: Do we want to remove each entry when the session concludes? - sent_clocks: HashMap, + // Based on current usage, this could just be a HashSet of ConnectionId. + sent_clocks: HashMap, /// The sequence number of the latest message sent to each peer /// for each requested feed. sent_messages: HashMap>, @@ -149,7 +158,7 @@ impl EbtManager { /// Retrieve either the local vector clock or the stored vector clock /// for the peer represented by the given SSB ID. - fn get_clock(&self, ssb_id: Option<&SsbId>) -> Option { + fn _get_clock(&self, ssb_id: Option<&SsbId>) -> Option { match ssb_id { Some(id) => self.peer_clocks.get(id).cloned(), None => Some(self.local_clock.to_owned()), @@ -228,7 +237,7 @@ impl EbtManager { /// flag. fn _is_receiving(&self, peer_ssb_id: SsbId, ssb_id: SsbId) -> Result { // Retrieve the vector clock for the first peer. - if let Some(clock) = self.get_clock(Some(&peer_ssb_id)) { + if let Some(clock) = self._get_clock(Some(&peer_ssb_id)) { // Check if the second peer is represented in the vector clock. if let Some(encoded_seq_no) = clock.get(&ssb_id) { // Check if the receive flag is true. @@ -273,16 +282,34 @@ impl EbtManager { } /// Register a new EBT session for the given peer. - fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) { + //fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) { + fn register_session( + &mut self, + connection_id: ConnectionId, + peer_ssb_id: &SsbId, + session_role: SessionRole, + ) { self.active_sessions - .insert(peer_ssb_id.to_owned(), (req_no, session_role)); + .insert(connection_id, (peer_ssb_id.to_owned(), session_role)); - trace!(target: "ebt-session", "Registered new EBT session {} for {}", req_no, peer_ssb_id); + trace!(target: "ebt-session", "Registered new EBT session for connection {} with {}", connection_id, peer_ssb_id); } /// Remove the given peer from the list of active session. - fn remove_session(&mut self, peer_ssb_id: &SsbId) { - let _ = self.active_sessions.remove(peer_ssb_id); + fn remove_session(&mut self, connection_id: ConnectionId) { + // TODO: Clean-up the string story so we're not sprinkling additional + // allocations everywhere. + let _ = self.active_sessions.remove(&connection_id); + } + + /// Return the role of the local peer for the active session (represented + /// by connection ID). + fn session_role(&self, connection_id: ConnectionId) -> Option { + if let Some((_ssb_id, session_role)) = self.active_sessions.get(&connection_id) { + Some(session_role.to_owned()) + } else { + None + } } /// Revoke a replication request for the feed represented by the given SSB @@ -379,11 +406,11 @@ impl EbtManager { // Only proceed with session initiation if there // is no currently active session with the given peer. - if !self.active_sessions.contains_key(&peer_ssb_id) { + if !self.active_sessions.contains_key(&connection_data.id) { trace!( target: "ebt", - "Requesting an EBT session with {:?}", - connection_data.peer_public_key.unwrap() + "Requesting an EBT session with {}", + peer_ssb_id ); task::spawn(replicator::run( @@ -397,13 +424,14 @@ impl EbtManager { async fn handle_session_initiated( &mut self, + connection_id: ConnectionId, req_no: ReqNo, peer_ssb_id: SsbId, session_role: SessionRole, ) -> Result<()> { trace!(target: "ebt-replication", "Initiated EBT session with {} as {}", peer_ssb_id, session_role); - self.register_session(&peer_ssb_id, req_no, session_role.to_owned()); + self.register_session(connection_id, &peer_ssb_id, session_role.to_owned()); let local_clock = self.local_clock.to_owned(); match session_role { @@ -411,10 +439,17 @@ impl EbtManager { // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); + trace!(target: "ebt-replication", "Sending clock as responder for request {}", req_no); + ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SendClock(req_no, local_clock)), + BrokerMessage::Ebt(EbtEvent::SendClock( + connection_id, + req_no, + local_clock, + SessionRole::Responder, + )), )) .await?; } @@ -427,12 +462,17 @@ impl EbtManager { Ok(()) } - fn handle_send_clock(&mut self, req_no: ReqNo, clock: VectorClock) -> Option { - self.sent_clocks.insert(req_no, clock) + fn handle_send_clock( + &mut self, + connection_id: ConnectionId, + clock: VectorClock, + ) -> Option { + self.sent_clocks.insert(connection_id, clock) } async fn handle_received_clock( &mut self, + connection_id: ConnectionId, req_no: ReqNo, peer_ssb_id: SsbId, clock: VectorClock, @@ -445,34 +485,43 @@ impl EbtManager { // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); - // If a clock is received without a prior EBT replicate - // request having been received from the associated peer, it is - // assumed that the clock was sent in response to a locally-sent - // EBT replicate request. Ie. the session was requested by the - // local peer. - if !self.active_sessions.contains_key(&peer_ssb_id) { - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SessionInitiated( - req_no, - peer_ssb_id.to_owned(), - SessionRole::Requester, - )), - )) - .await?; - } + // TODO: What if we initiated a session as requester when sending + // replicate request? That might simply things. + let session_role = match self.session_role(connection_id) { + Some(role) => role, + None => { + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::SessionInitiated( + connection_id, + req_no, + peer_ssb_id.to_owned(), + SessionRole::Requester, + )), + )) + .await?; + + SessionRole::Requester + } + }; - // If we have not previously sent a clock, send one now. + // If we have not previously sent a clock during this connection, + // send one now. Connection is used here as a proxy for session. // // This indicates that the local peer is acting as the session // requester. - if self.sent_clocks.get(&req_no).is_none() { + if self.sent_clocks.get(&connection_id).is_none() { let local_clock = self.local_clock.to_owned(); ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SendClock(req_no, local_clock)), + BrokerMessage::Ebt(EbtEvent::SendClock( + connection_id, + req_no, + local_clock, + SessionRole::Requester, + )), )) .await?; } @@ -484,7 +533,13 @@ impl EbtManager { ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SendMessage(req_no, peer_ssb_id.to_owned(), msg)), + BrokerMessage::Ebt(EbtEvent::SendMessage( + connection_id, + req_no, + peer_ssb_id.to_owned(), + msg, + session_role.to_owned(), + )), )) .await?; } @@ -564,6 +619,10 @@ impl EbtManager { Ok(()) } + /* + TODO: Reintroduce this when we figure out the connection ID / request ID + association. + /// Look up the latest sequence number for the updated feed, encode it as /// the single entry of a vector clock and send that to any active session /// peers. @@ -599,14 +658,26 @@ impl EbtManager { Ok(()) } + */ - async fn handle_session_concluded(&mut self, peer_ssb_id: SsbId) { - trace!(target: "ebt-replication", "Session concluded with: {}", peer_ssb_id); - self.remove_session(&peer_ssb_id); + async fn handle_session_concluded(&mut self, connection_id: ConnectionId, peer_ssb_id: SsbId) { + trace!(target: "ebt-replication", "Session concluded for connection {} with {}", connection_id, peer_ssb_id); + self.remove_session(connection_id); } - async fn handle_session_timeout(&mut self, connection_data: ConnectionData) -> Result<()> { - trace!(target: "ebt-replication", "Session timeout while waiting for request"); + async fn handle_session_timeout( + &mut self, + connection_data: ConnectionData, + peer_ssb_id: SsbId, + ) -> Result<()> { + trace!(target: "ebt-replication", "Session timeout while waiting for request from {} on connection {}", peer_ssb_id, connection_data.id); + + // Session should not have been initiated in the first place, meaning + // that this removal action should be unnecessary. Keeping it here + // for now out of caution. + // + // TODO: Remove this line when it's clear that it's not needed. + self.remove_session(connection_data.id); // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); @@ -625,22 +696,42 @@ impl EbtManager { async fn handle_error( &mut self, connection_data: ConnectionData, - req_no: ReqNo, peer_ssb_id: SsbId, - err_msg: String, + error_msg: ErrorMsg, ) -> Result<()> { - trace!(target: "ebt-replication", "Session error with {} for request number {}: {}", peer_ssb_id, req_no, err_msg); + trace!(target: "ebt-replication", "Session error with {}: {}", peer_ssb_id, error_msg); + + self.remove_session(connection_data.id); // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); - // Fallback to classic replication. - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Connection(ConnectionEvent::ReplicatingClassic(connection_data)), - )) - .await?; + if error_msg.starts_with("Serde JSON error") + || error_msg.starts_with("EBT replication error") + { + // Either the received EBT replicate request was invalid or the sent + // EBT replicate request received an error response from the remote + // peer. + // + // Fallback to classic replication. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Connection(ConnectionEvent::ReplicatingClassic(connection_data)), + )) + .await?; + } else { + // Something else went wrong. Kill the connection. + // + // TODO: In the future we may want to match on other specific error + // variants. For now, this is good enough. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Connection(ConnectionEvent::Disconnecting(connection_data)), + )) + .await?; + } Ok(()) } @@ -684,17 +775,17 @@ impl EbtManager { EbtEvent::RequestSession(connection_data) => { self.handle_request_session(connection_data).await; } - EbtEvent::SessionInitiated(req_no, peer_ssb_id, session_role) => { - if let Err(err) = self.handle_session_initiated(req_no, peer_ssb_id, session_role).await { + EbtEvent::SessionInitiated(connection_id, req_no, peer_ssb_id, session_role) => { + if let Err(err) = self.handle_session_initiated(connection_id, req_no, peer_ssb_id, session_role).await { error!("Error while handling 'session initiated' event: {}", err) } } - EbtEvent::SendClock(req_no, clock) => { + EbtEvent::SendClock(connection_id, _req_no, clock, _session_role) => { trace!(target: "ebt-replication", "Sending vector clock: {:?}", clock); - let _ = self.handle_send_clock(req_no, clock); + let _ = self.handle_send_clock(connection_id, clock); } - EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock) => { - if let Err(err) = self.handle_received_clock(req_no, peer_ssb_id, clock).await { + EbtEvent::ReceivedClock(connection_id, req_no, peer_ssb_id, clock) => { + if let Err(err) = self.handle_received_clock(connection_id, req_no, peer_ssb_id, clock).await { error!("Error while handling 'received clock' event: {}", err) } } @@ -703,34 +794,39 @@ impl EbtManager { error!("Error while handling 'received message' event: {}", err) } } - EbtEvent::SendMessage(_req_no, peer_ssb_id, msg) => { + EbtEvent::SendMessage(_connection_id, _req_no, peer_ssb_id, msg, _session_role) => { trace!(target: "ebt-replication", "Sending message: {:?}...", msg); if let Err(err) = self.handle_send_message(peer_ssb_id, msg).await { error!("Error while handling 'send message' event: {}", err) } } - EbtEvent::SessionConcluded(connection_data) => { - self.handle_session_concluded(connection_data).await; + EbtEvent::SessionConcluded(connection_id, peer_ssb_id) => { + self.handle_session_concluded(connection_id, peer_ssb_id).await; } - EbtEvent::SessionTimeout(connection_data) => { - if let Err(err) = self.handle_session_timeout(connection_data).await { + EbtEvent::SessionTimeout(connection_data, peer_ssb_id) => { + if let Err(err) = self.handle_session_timeout(connection_data, peer_ssb_id).await { error!("Error while handling 'session timeout' event: {}", err) } } - EbtEvent::Error(connection_data, req_no, peer_ssb_id, err_msg) => { - if let Err(err) = self.handle_error(connection_data, req_no, peer_ssb_id, err_msg).await { + EbtEvent::Error(connection_data, peer_ssb_id, error_msg) => { + if let Err(err) = self.handle_error(connection_data, peer_ssb_id, error_msg).await { error!("Error while handling 'error' event: {}", err) } } } - } else if let Some(BrokerMessage::StoreKv(StoreKvEvent(ssb_id))) = msg { + } else if let Some(BrokerMessage::StoreKv(StoreKvEvent(_ssb_id))) = msg { debug!("Received KV store event from broker"); + /* + TODO: Reintroduce this later, once Manyverse restart + issue is solved. + // Respond to a key-value store state change for the given peer. // This is triggered when a new message is appended to the local feed. if let Err(err) = self.handle_local_store_updated(ssb_id).await { error!("Error while handling 'local store updated' event: {}", err) } + */ } } } diff --git a/solar/src/actors/replication/ebt/replicator.rs b/solar/src/actors/replication/ebt/replicator.rs index 6eec9e6..1a7cf08 100644 --- a/solar/src/actors/replication/ebt/replicator.rs +++ b/solar/src/actors/replication/ebt/replicator.rs @@ -37,6 +37,8 @@ pub async fn run( let mut ch_msg = ch_msg.ok_or(Error::OptionIsNone)?; + let connection_id = connection_data.id; + let stream_reader = connection_data.stream.clone().ok_or(Error::OptionIsNone)?; let stream_writer = connection_data.stream.clone().ok_or(Error::OptionIsNone)?; let handshake = connection_data @@ -73,7 +75,7 @@ pub async fn run( trace!(target: "ebt-session", "Initiating EBT replication session with: {}", peer_ssb_id); let mut session_initiated = false; - let mut replicate_req_no = None; + let mut active_req_no = None; // Record the time at which we begin the EBT session. // @@ -85,10 +87,9 @@ pub async fn run( // Send EBT request. let ebt_args = EbtReplicate::default(); let req_no = api.ebt_replicate_req_send(&ebt_args).await?; - // Set the request number to be passed into the MUXRPC EBT handler. - // This allows tracking of the request (ensuring we respond to - // MUXRPC responses with this request number). - replicate_req_no = Some(req_no); + + // Set the request number for this session. + active_req_no = Some(req_no); } loop { @@ -105,9 +106,10 @@ pub async fn run( }, msg = ch_msg.next().fuse() => { // Listen for a 'session initiated' event. - if let Some(BrokerMessage::Ebt(EbtEvent::SessionInitiated(_, ref ssb_id, ref session_role))) = msg { + if let Some(BrokerMessage::Ebt(EbtEvent::SessionInitiated(_connection_id, ref req_no, ref ssb_id, ref session_role))) = msg { if peer_ssb_id == *ssb_id && *session_role == SessionRole::Responder { session_initiated = true; + active_req_no = Some(*req_no); } } if let Some(msg) = msg { @@ -123,8 +125,11 @@ pub async fn run( &mut api, &input, &mut ch_broker, + // TODO: Can we remove this? + // We could look it up from the connection ID instead. peer_ssb_id.to_owned(), - replicate_req_no, + connection_data.id, + active_req_no, ) .await { @@ -132,20 +137,16 @@ pub async fn run( Err(err) => { error!("EBT replicate handler failed: {:?}", err); - if let Error::EbtReplicate((req_no, err_msg)) = err { - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::Error( - connection_data, - req_no, - peer_ssb_id.to_owned(), - err_msg, - )), - )) - .await?; - } - + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::Error( + connection_data, + peer_ssb_id.to_owned(), + err.to_string(), + )), + )) + .await?; // Break out of the input processing loop to conclude // the replication session. break; @@ -165,7 +166,10 @@ pub async fn run( ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SessionTimeout(connection_data)), + BrokerMessage::Ebt(EbtEvent::SessionTimeout( + connection_data, + peer_ssb_id.to_owned(), + )), )) .await?; @@ -175,10 +179,12 @@ pub async fn run( } } + // TODO: Consider including session role in SessionConcluded so that we can + // await another request if acting as the responder. ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SessionConcluded(peer_ssb_id)), + BrokerMessage::Ebt(EbtEvent::SessionConcluded(connection_id, peer_ssb_id)), )) .await?;