diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index 5200a1a..e9ac783 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -82,7 +82,7 @@ impl Display for SessionRole { #[derive(Debug)] pub struct EbtManager { /// Active EBT peer sessions. - active_sessions: HashMap, + active_sessions: HashMap, /// Duration to wait before switching feed request to a different peer. _feed_wait_timeout: u64, /// The state of the replication loop. @@ -158,7 +158,7 @@ impl EbtManager { /// Retrieve either the local vector clock or the stored vector clock /// for the peer represented by the given SSB ID. - fn _get_clock(&self, ssb_id: Option<&SsbId>) -> Option { + fn get_clock(&self, ssb_id: Option<&SsbId>) -> Option { match ssb_id { Some(id) => self.peer_clocks.get(id).cloned(), None => Some(self.local_clock.to_owned()), @@ -201,6 +201,7 @@ impl EbtManager { let mut clock_file = File::open(&clock_entry.path())?; let mut clock_file_contents = String::new(); clock_file.read_to_string(&mut clock_file_contents)?; + // TODO: Match on error and delete file. let clock: VectorClock = serde_json::from_str(&clock_file_contents)?; // Set the vector clock in memory. @@ -233,21 +234,21 @@ impl EbtManager { } /// Retrieve the stored vector clock for the first peer, check for the - /// second peer in the vector clock and return the value of the receive - /// flag. - fn _is_receiving(&self, peer_ssb_id: SsbId, ssb_id: SsbId) -> Result { + /// second peer in the vector clock. If the receive flag is set to true, + /// return the decoded sequence number. + fn is_receiving(&self, peer_ssb_id: &SsbId, ssb_id: &SsbId) -> Result> { // Retrieve the vector clock for the first peer. - if let Some(clock) = self._get_clock(Some(&peer_ssb_id)) { + if let Some(clock) = self.get_clock(Some(peer_ssb_id)) { // Check if the second peer is represented in the vector clock. - if let Some(encoded_seq_no) = clock.get(&ssb_id) { + if let Some(encoded_seq_no) = clock.get(ssb_id) { // Check if the receive flag is true. - if let (_replicate_flag, Some(true), _seq) = clock::decode(*encoded_seq_no)? { - return Ok(true); + if let (_replicate_flag, Some(true), seq) = clock::decode(*encoded_seq_no)? { + return Ok(seq); } } } - Ok(false) + Ok(None) } /// Get the sequence number of the latest message sent to the given @@ -282,30 +283,27 @@ impl EbtManager { } /// Register a new EBT session for the given peer. - //fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) { fn register_session( &mut self, connection_id: ConnectionId, - peer_ssb_id: &SsbId, + peer_ssb_id: SsbId, session_role: SessionRole, + req_no: ReqNo, ) { - self.active_sessions - .insert(connection_id, (peer_ssb_id.to_owned(), session_role)); - trace!(target: "ebt-session", "Registered new EBT session for connection {} with {}", connection_id, peer_ssb_id); + self.active_sessions + .insert(connection_id, (peer_ssb_id, session_role, req_no)); } /// Remove the given peer from the list of active session. fn remove_session(&mut self, connection_id: ConnectionId) { - // TODO: Clean-up the string story so we're not sprinkling additional - // allocations everywhere. let _ = self.active_sessions.remove(&connection_id); } /// Return the role of the local peer for the active session (represented /// by connection ID). fn session_role(&self, connection_id: ConnectionId) -> Option { - if let Some((_ssb_id, session_role)) = self.active_sessions.get(&connection_id) { + if let Some((_ssb_id, session_role, _req_no)) = self.active_sessions.get(&connection_id) { Some(session_role.to_owned()) } else { None @@ -431,7 +429,7 @@ impl EbtManager { ) -> Result<()> { trace!(target: "ebt-replication", "Initiated EBT session with {} as {}", peer_ssb_id, session_role); - self.register_session(connection_id, &peer_ssb_id, session_role.to_owned()); + self.register_session(connection_id, peer_ssb_id, session_role.to_owned(), req_no); let local_clock = self.local_clock.to_owned(); match session_role { @@ -619,46 +617,41 @@ impl EbtManager { Ok(()) } - /* - TODO: Reintroduce this when we figure out the connection ID / request ID - association. + /// Check if any active session peers are interested in the updated feed. + /// If so, send them the appended message. + async fn handle_local_store_updated(&self, ssb_id: SsbId, msg_seq: u64) -> Result<()> { + // TODO: This is all radically inefficient, but it's a start. - /// Look up the latest sequence number for the updated feed, encode it as - /// the single entry of a vector clock and send that to any active session - /// peers. - async fn handle_local_store_updated(&self, ssb_id: SsbId) -> Result<()> { // Iterate over all active EBT sessions. - for (_peer_ssb_id, (req_no, _session_role)) in self.active_sessions.iter() { - // Look up the latest sequence for the given ID. - if let Some(seq) = KV_STORE.read().await.get_latest_seq(&ssb_id)? { - // Encode the replicate flag, receive flag and sequence. - let encoded_value: EncodedClockValue = clock::encode(true, Some(true), Some(seq))?; - - // Update the entry for `ssb_id` in the local vector clock. - if let Some(mut local_clock) = self.get_clock(None) { - local_clock.insert(ssb_id.to_owned(), encoded_value); + for (connection_id, (peer_ssb_id, session_role, req_no)) in self.active_sessions.iter() { + // Check if `peer_ssb_id` wants to replicate `ssb_id`. + if let Some(seq) = self.is_receiving(peer_ssb_id, &ssb_id)? { + if msg_seq > seq { + // Retrieve the message from the key-value store. + if let Some(msg_kvt) = KV_STORE.read().await.get_msg_kvt(&ssb_id, msg_seq)? { + // Create channel to send messages to broker. + let mut ch_broker = BROKER.lock().await.create_sender(); + + // Send the single-entry vector clock to the active session. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::SendMessage( + *connection_id, + *req_no, + peer_ssb_id.to_owned(), + msg_kvt.value, + session_role.to_owned(), + )), + )) + .await?; + } } - - // Create a vector clock with a single entry. - let mut updated_clock = HashMap::new(); - updated_clock.insert(ssb_id.to_owned(), encoded_value); - - // Create channel to send messages to broker. - let mut ch_broker = BROKER.lock().await.create_sender(); - - // Send the single-entry vector clock to the active session. - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SendClock(*req_no, updated_clock)), - )) - .await?; } } Ok(()) } - */ async fn handle_session_concluded(&mut self, connection_id: ConnectionId, peer_ssb_id: SsbId) { trace!(target: "ebt-replication", "Session concluded for connection {} with {}", connection_id, peer_ssb_id); @@ -814,19 +807,14 @@ impl EbtManager { } } } - } else if let Some(BrokerMessage::StoreKv(StoreKvEvent(_ssb_id))) = msg { + } else if let Some(BrokerMessage::StoreKv(StoreKvEvent((ssb_id, seq)))) = msg { debug!("Received KV store event from broker"); - /* - TODO: Reintroduce this later, once Manyverse restart - issue is solved. - // Respond to a key-value store state change for the given peer. // This is triggered when a new message is appended to the local feed. - if let Err(err) = self.handle_local_store_updated(ssb_id).await { + if let Err(err) = self.handle_local_store_updated(ssb_id, seq).await { error!("Error while handling 'local store updated' event: {}", err) } - */ } } }