From d3b33eeebd3c2e89c25e2f89a697cbf741fb7cc2 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Wed, 27 Dec 2023 08:53:09 +0200 Subject: [PATCH 01/13] remove kv store change listener and handler --- solar/src/actors/muxrpc/ebt.rs | 59 ++-------------------------------- 1 file changed, 2 insertions(+), 57 deletions(-) diff --git a/solar/src/actors/muxrpc/ebt.rs b/solar/src/actors/muxrpc/ebt.rs index a7bbfd2..db682e8 100644 --- a/solar/src/actors/muxrpc/ebt.rs +++ b/solar/src/actors/muxrpc/ebt.rs @@ -88,7 +88,7 @@ where } // Handle an incoming MUXRPC response. RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => { - self.recv_rpc_response(api, ch_broker, *req_no, res, peer_ssb_id) + self.recv_rpc_response(ch_broker, *req_no, res, peer_ssb_id) .await } // Handle an incoming MUXRPC 'cancel stream' response. @@ -119,21 +119,6 @@ where } _ => Ok(false), }, - /* - RpcInput::Message(msg) => { - if let Some(kv_event) = msg.downcast_ref::() { - match kv_event { - // Notification from the key-value store indicating that - // a new message has just been appended to the feed - // identified by `id`. - StoreKvEvent::IdChanged(id) => { - return self.recv_storageevent_idchanged(api, id).await - } - } - } - Ok(false) - } - */ _ => Ok(false), } } @@ -207,7 +192,6 @@ where /// contain a vector clock or an SSB message. async fn recv_rpc_response( &mut self, - _api: &mut ApiCaller, ch_broker: &mut ChBrokerSend, req_no: ReqNo, res: &[u8], @@ -263,6 +247,7 @@ where async fn recv_cancelstream(&mut self, api: &mut ApiCaller, req_no: ReqNo) -> Result { api.rpc().send_stream_eof(-req_no).await?; self.active_requests.remove(&req_no); + Ok(true) } @@ -277,22 +262,6 @@ where } /* - /// Extract blob references from post-type messages. - fn extract_blob_refs(&mut self, msg: &Message) -> Vec { - let mut refs = Vec::new(); - - let msg = serde_json::from_value(msg.content().clone()); - - if let Ok(dto::content::TypedMessage::Post { text, .. }) = msg { - for cap in BLOB_REGEX.captures_iter(&text) { - let key = cap.get(0).unwrap().as_str().to_owned(); - refs.push(key); - } - } - - refs - } - /// Process an incoming MUXRPC response. The response is expected to /// contain an SSB message. async fn recv_rpc_response( @@ -365,28 +334,4 @@ where } } */ - - /* - /// Respond to a key-value store state change for the given peer. - /// This is triggered when a new message is appended to the local feed. - /// Remove the peer from the list of active streams, send the requested - /// messages from the local feed to the peer and then reinsert the public - /// key of the peer to the list of active streams. - async fn recv_storageevent_idchanged( - &mut self, - api: &mut ApiCaller, - id: &str, - ) -> Result { - // Attempt to remove the peer from the list of active streams. - if let Some(mut req) = self.reqs.remove(id) { - // Send local messages to the peer. - self.send_history(api, &mut req).await?; - // Reinsert the peer into the list of active streams. - self.reqs.insert(id.to_string(), req); - Ok(true) - } else { - Ok(false) - } - } - */ } From 5b1bd0f33410d917b1281e9dc68e14b29f3be654 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Wed, 27 Dec 2023 08:53:46 +0200 Subject: [PATCH 02/13] track state of sent messages and listen for kv store change event --- solar/src/actors/replication/ebt/manager.rs | 65 +++++++++++++++++---- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index 7172a8a..87d34ce 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -34,7 +34,8 @@ use crate::{ broker::{ActorEndpoint, BrokerEvent, BrokerMessage, Destination, BROKER}, config::PEERS_TO_REPLICATE, node::{BLOB_STORE, KV_STORE}, - Result, + storage::kv::StoreKvEvent, + Error, Result, }; /// EBT replication events. @@ -89,6 +90,9 @@ pub struct EbtManager { _requested_feeds: HashSet, /// Duration to wait for a connected peer to initiate an EBT session. session_wait_timeout: u64, + /// The sequence number of the latest message sent to each peer + /// for each requested feed. + sent_messages: HashMap>, } impl Default for EbtManager { @@ -102,6 +106,7 @@ impl Default for EbtManager { peer_clocks: HashMap::new(), _requested_feeds: HashSet::new(), session_wait_timeout: 5, + sent_messages: HashMap::new(), } } } @@ -209,13 +214,17 @@ impl EbtManager { for (feed_id, encoded_seq_no) in clock.iter() { if *encoded_seq_no != -1 { // Decode the encoded vector clock sequence number. - // TODO: Match properly on the values of replicate_flag and receive_flag. - let (_replicate_flag, _receive_flag, sequence) = clock::decode(*encoded_seq_no)?; - if let Some(last_seq) = KV_STORE.read().await.get_latest_seq(feed_id)? { - if let Some(seq) = sequence { - for n in seq..(last_seq + 1) { - if let Some(msg_kvt) = KV_STORE.read().await.get_msg_kvt(feed_id, n)? { - messages_to_be_sent.push(msg_kvt.value) + let (_replicate_flag, receive_flag, sequence) = clock::decode(*encoded_seq_no)?; + // Only send messages if the receive flag is true. + if let Some(true) = receive_flag { + if let Some(last_seq) = KV_STORE.read().await.get_latest_seq(feed_id)? { + if let Some(seq) = sequence { + for n in seq..(last_seq + 1) { + if let Some(msg_kvt) = + KV_STORE.read().await.get_msg_kvt(feed_id, n)? + { + messages_to_be_sent.push(msg_kvt.value) + } } } } @@ -340,6 +349,31 @@ impl EbtManager { Ok(()) } + async fn handle_send_message(&mut self, peer_ssb_id: SsbId, msg: Value) -> Result<()> { + // Update the hashmap of sent messages. + // + // For each peer, keep a list of feed ID's and the sequence of the + // latest sent message for each. This is useful to consult when a new + // message is appended to the local store and may need to be sent to + // peers with whom we have an active EBT session. + + let msg_author = msg["author"] + .as_str() + .ok_or(Error::OptionIsNone)? + .to_string(); + let msg_sequence = msg["sequence"].as_u64().ok_or(Error::OptionIsNone)?; + + if let Some(feeds) = self.sent_messages.get_mut(&peer_ssb_id) { + feeds.insert(msg_author, msg_sequence); + } else { + let mut feeds = HashMap::new(); + feeds.insert(msg_author, msg_sequence); + self.sent_messages.insert(peer_ssb_id, feeds); + } + + Ok(()) + } + async fn handle_received_message(&mut self, msg: Message) -> Result<()> { trace!(target: "ebt-replication", "Received message: {:?}", msg); @@ -490,9 +524,11 @@ impl EbtManager { error!("Error while handling 'received message' event: {}", err) } } - EbtEvent::SendMessage(_req_no, _peer_ssb_id, _msg) => { + EbtEvent::SendMessage(_req_no, peer_ssb_id, msg) => { trace!(target: "ebt-replication", "Sending message..."); - // TODO: Update sent messages. + if let Err(err) = self.handle_send_message(peer_ssb_id, msg).await { + error!("Error while handling 'send message' event: {}", err) + } } EbtEvent::SessionConcluded(connection_data) => { self.handle_session_concluded(connection_data).await; @@ -508,6 +544,15 @@ impl EbtManager { } } } + } else if let Some(BrokerMessage::StoreKv(StoreKvEvent(_ssb_id))) = msg { + // Notification from the key-value store indicating that + // a new message has just been appended to the feed + // identified by `ssb_id`. + debug!("Received KV store event from broker"); + todo!() + // Respond to a key-value store state change for the given peer. + // This is triggered when a new message is appended to the local feed. + //self.handle_local_store_updated(ssb_id).await; } } } From 94380d1a73103635745917f0073be88cd5fb2455 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Thu, 28 Dec 2023 09:01:21 +0200 Subject: [PATCH 03/13] add the json msg value to the store-updated event type --- solar/src/storage/kv.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/solar/src/storage/kv.rs b/solar/src/storage/kv.rs index e92249c..c48f726 100644 --- a/solar/src/storage/kv.rs +++ b/solar/src/storage/kv.rs @@ -2,6 +2,7 @@ use futures::SinkExt; use kuska_ssb::feed::{Feed as MessageKvt, Message as MessageValue}; use log::{debug, warn}; use serde::{Deserialize, Serialize}; +use serde_json::Value; use sled::{Config as DbConfig, Db}; use crate::{ @@ -25,8 +26,10 @@ const PREFIX_PEER: u8 = 4u8; /// The feed belonging to the given SSB ID has changed /// (ie. a new message has been appended to the feed). +/// +/// The JSON value of the appended message is included. #[derive(Debug, Clone)] -pub struct StoreKvEvent(pub String); +pub struct StoreKvEvent(pub (String, Value)); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BlobStatus { @@ -285,7 +288,7 @@ impl KvStorage { // key has been updated. let broker_msg = BrokerEvent::new( Destination::Broadcast, - BrokerMessage::StoreKv(StoreKvEvent(author)), + BrokerMessage::StoreKv(StoreKvEvent((author, msg_kvt.value))), ); // Matching on the error here (instead of unwrapping) allows us to From edfabb7f1d04370a8b685dba1e8a25efa5e76984 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Thu, 28 Dec 2023 09:02:05 +0200 Subject: [PATCH 04/13] handle the msg in the store-updated event tuple --- solar/src/actors/muxrpc/history_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solar/src/actors/muxrpc/history_stream.rs b/solar/src/actors/muxrpc/history_stream.rs index 5fcae43..f68cc02 100644 --- a/solar/src/actors/muxrpc/history_stream.rs +++ b/solar/src/actors/muxrpc/history_stream.rs @@ -85,7 +85,7 @@ where self.recv_error_response(api, *req_no, err).await } // Handle a broker message. - RpcInput::Message(BrokerMessage::StoreKv(StoreKvEvent(id))) => { + RpcInput::Message(BrokerMessage::StoreKv(StoreKvEvent((id, _msg)))) => { // Notification from the key-value store indicating that // a new message has just been appended to the feed // identified by `id`. From 8aa1bce514a3eff06be70c285cd249445db84b6e Mon Sep 17 00:00:00 2001 From: mycognosist Date: Thu, 28 Dec 2023 09:03:45 +0200 Subject: [PATCH 05/13] add tracking of req_no to active sessions and implement store-updated event handler --- solar/src/actors/replication/ebt/manager.rs | 164 +++++++++++++++----- 1 file changed, 125 insertions(+), 39 deletions(-) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index 87d34ce..568ca0e 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -72,7 +72,7 @@ impl Display for SessionRole { #[derive(Debug)] pub struct EbtManager { /// Active EBT peer sessions. - active_sessions: HashSet, + active_sessions: HashMap, /// Duration to wait before switching feed request to a different peer. _feed_wait_timeout: u64, /// The state of the replication loop. @@ -98,7 +98,7 @@ pub struct EbtManager { impl Default for EbtManager { fn default() -> Self { EbtManager { - active_sessions: HashSet::new(), + active_sessions: HashMap::new(), _feed_wait_timeout: 3, _is_replication_loop_active: false, local_clock: HashMap::new(), @@ -143,9 +143,9 @@ impl EbtManager { } /// Retrieve the vector clock for the given SSB ID. - fn _get_clock(self, peer_id: &SsbId) -> Option { + fn get_clock(&self, peer_id: &SsbId) -> Option { if peer_id == &self.local_id { - Some(self.local_clock) + Some(self.local_clock.to_owned()) } else { self.peer_clocks.get(peer_id).cloned() } @@ -160,6 +160,37 @@ impl EbtManager { } } + /// Retrieve the stored vector clock for the first peer, check for the + /// second peer in the vector clock and return the value of the receive + /// flag. + fn is_receiving(&self, peer_ssb_id: SsbId, ssb_id: SsbId) -> Result { + // Retrieve the vector clock for the first peer. + if let Some(clock) = self.get_clock(&peer_ssb_id) { + // Check if the second peer is represented in the vector clock. + if let Some(encoded_seq_no) = clock.get(&ssb_id) { + // Check if the receive flag is true. + if let (_replicate_flag, Some(true), _seq) = clock::decode(*encoded_seq_no)? { + return Ok(true); + } + } + } + + Ok(false) + } + + /// Get the sequence number of the latest message sent to the given + /// peer SSB ID for the feed represented by the given SSB ID. + fn _get_latest_sent_seq(self, peer_ssb_id: &SsbId, ssb_id: &SsbId) -> Option { + // Get the state of the messages sent to `peer_ssb_id`. + if let Some(sent_state) = self.sent_messages.get(peer_ssb_id) { + // Get the sequence number of the latest message sent for feed + // `ssb_id`. + sent_state.get(ssb_id).copied() + } else { + None + } + } + /// Request that the feed represented by the given SSB ID be replicated. async fn replicate(&mut self, peer_id: &SsbId) -> Result<()> { // Look up the latest sequence for the given ID. @@ -179,10 +210,10 @@ impl EbtManager { } /// Register a new EBT session for the given peer. - fn register_session(&mut self, peer_ssb_id: &SsbId) { - self.active_sessions.insert(peer_ssb_id.to_owned()); + fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo) { + self.active_sessions.insert(peer_ssb_id.to_owned(), req_no); - trace!(target: "ebt-session", "Registered new EBT session for {}", peer_ssb_id); + trace!(target: "ebt-session", "Registered new EBT session {} for {}", req_no, peer_ssb_id); } /// Remove the given peer from the list of active session. @@ -201,34 +232,63 @@ impl EbtManager { self._requested_feeds.insert(peer_id.to_owned()); } + /// Decode the encoded sequence number from a vector clock and push + /// the latest desired messages to the given vector of messages. + /// + /// This method will only push messages to the vector if the replicate + /// flag is set to `true`. + async fn retrieve_latest_messages( + encoded_seq_no: i64, + feed_id: &SsbId, + messages: &mut Vec, + ) -> Result<()> { + if encoded_seq_no != -1 { + if let (_replicate_flag, Some(true), Some(seq)) = clock::decode(encoded_seq_no)? { + if let Some(last_seq) = KV_STORE.read().await.get_latest_seq(feed_id)? { + for n in seq..(last_seq + 1) { + if let Some(msg_kvt) = KV_STORE.read().await.get_msg_kvt(feed_id, n)? { + messages.push(msg_kvt.value) + } + } + } + } + } + + Ok(()) + } + /// Decode a peer's vector clock and retrieve all requested messages. + /// + /// If an SSB ID is supplied, retrieve only the lastest requested + /// messages authored by that ID. + /// + /// If no SSB ID is supplied, retrieve the latest requested messages + /// for all authors listed in the vector clock. async fn retrieve_requested_messages( - // TODO: Do we need these two parameters? - _req_no: &ReqNo, - _peer_ssb_id: &SsbId, + peer_ssb_id: Option<&SsbId>, clock: VectorClock, ) -> Result> { let mut messages_to_be_sent = Vec::new(); - // Iterate over all key-value pairs in the vector clock. - for (feed_id, encoded_seq_no) in clock.iter() { - if *encoded_seq_no != -1 { - // Decode the encoded vector clock sequence number. - let (_replicate_flag, receive_flag, sequence) = clock::decode(*encoded_seq_no)?; - // Only send messages if the receive flag is true. - if let Some(true) = receive_flag { - if let Some(last_seq) = KV_STORE.read().await.get_latest_seq(feed_id)? { - if let Some(seq) = sequence { - for n in seq..(last_seq + 1) { - if let Some(msg_kvt) = - KV_STORE.read().await.get_msg_kvt(feed_id, n)? - { - messages_to_be_sent.push(msg_kvt.value) - } - } - } - } - } + // We only want to retrieve messages authored by `peer_ssb_id`. + if let Some(feed_id) = peer_ssb_id { + if let Some(encoded_seq_no) = clock.get(feed_id) { + EbtManager::retrieve_latest_messages( + *encoded_seq_no, + feed_id, + &mut messages_to_be_sent, + ) + .await?; + } + } else { + // We want to retrieve messages for all feeds in the vector clock. + for (feed_id, encoded_seq_no) in clock.iter() { + EbtManager::retrieve_latest_messages( + *encoded_seq_no, + feed_id, + &mut messages_to_be_sent, + ) + .await?; } } @@ -256,7 +316,7 @@ impl EbtManager { // Only proceed with session initiation if there // is no currently active session with the given peer. - if !self.active_sessions.contains(&peer_ssb_id) { + if !self.active_sessions.contains_key(&peer_ssb_id) { trace!( target: "ebt", "Requesting an EBT session with {:?}", @@ -281,7 +341,7 @@ impl EbtManager { ) -> Result<()> { trace!(target: "ebt-replication", "Initiated EBT session with {} as {}", peer_ssb_id, session_role); - self.register_session(&peer_ssb_id); + self.register_session(&peer_ssb_id, req_no); let local_clock = self.local_clock.to_owned(); match session_role { @@ -321,7 +381,7 @@ impl EbtManager { // assumed that the clock was sent in response to a locally-sent // EBT replicate request. Ie. The session was requested by the // local peer. - if !self.active_sessions.contains(&peer_ssb_id) { + if !self.active_sessions.contains_key(&peer_ssb_id) { ch_broker .send(BrokerEvent::new( Destination::Broadcast, @@ -336,7 +396,9 @@ impl EbtManager { self.set_clock(&peer_ssb_id, clock.to_owned()); - let msgs = EbtManager::retrieve_requested_messages(&req_no, &peer_ssb_id, clock).await?; + // We want messages for all feeds in the clock, therefore the + // `peer_ssb_id` parameter is set to `None`. + let msgs = EbtManager::retrieve_requested_messages(None, clock).await?; for msg in msgs { ch_broker .send(BrokerEvent::new( @@ -421,6 +483,31 @@ impl EbtManager { Ok(()) } + async fn handle_local_store_updated(&self, ssb_id: SsbId, msg: Value) -> Result<()> { + // Create channel to send messages to broker. + let mut ch_broker = BROKER.lock().await.create_sender(); + + // Iterate over all active EBT sessions. + for (peer_ssb_id, req_no) in self.active_sessions.iter() { + // If `ssb_id` appears in the vector clock of `peer_ssb_id` and + // the receive flag is set to `true`, send the message. + if self.is_receiving(peer_ssb_id.to_owned(), ssb_id.to_owned())? { + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::SendMessage( + *req_no, + peer_ssb_id.to_owned(), + msg.to_owned(), + )), + )) + .await?; + } + } + + Ok(()) + } + async fn handle_session_concluded(&mut self, peer_ssb_id: SsbId) { trace!(target: "ebt-replication", "Session concluded with: {}", peer_ssb_id); self.remove_session(&peer_ssb_id); @@ -544,15 +631,14 @@ impl EbtManager { } } } - } else if let Some(BrokerMessage::StoreKv(StoreKvEvent(_ssb_id))) = msg { - // Notification from the key-value store indicating that - // a new message has just been appended to the feed - // identified by `ssb_id`. + } else if let Some(BrokerMessage::StoreKv(StoreKvEvent((ssb_id, msg)))) = msg { debug!("Received KV store event from broker"); - todo!() + // Respond to a key-value store state change for the given peer. // This is triggered when a new message is appended to the local feed. - //self.handle_local_store_updated(ssb_id).await; + if let Err(err) = self.handle_local_store_updated(ssb_id, msg).await { + error!("Error while handling 'local store updated' event: {}", err) + } } } } From 8a57484c230cdd34a756b14e9a6a03ad9e32188f Mon Sep 17 00:00:00 2001 From: mycognosist Date: Thu, 28 Dec 2023 09:10:19 +0200 Subject: [PATCH 06/13] add warning comment about potential race condition --- solar/src/actors/replication/ebt/manager.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index 568ca0e..a62b033 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -484,6 +484,15 @@ impl EbtManager { } async fn handle_local_store_updated(&self, ssb_id: SsbId, msg: Value) -> Result<()> { + // TODO: Beware of a possible race condition in this implementation. + // + // We are not considering the sequence number of the last message sent + // to any active sessions for the given feed (ie. `ssb_id`). + // + // This means that we may inadvertently end up sending out-of-order + // messages if this message is sent before the rest of the requested + // feed history. + // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); From 01d760cf0e8352fdb13980978f3829748888e271 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Fri, 29 Dec 2023 09:47:27 +0200 Subject: [PATCH 07/13] remove msg value from store kv event --- solar/src/storage/kv.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/solar/src/storage/kv.rs b/solar/src/storage/kv.rs index c48f726..f208f33 100644 --- a/solar/src/storage/kv.rs +++ b/solar/src/storage/kv.rs @@ -2,7 +2,6 @@ use futures::SinkExt; use kuska_ssb::feed::{Feed as MessageKvt, Message as MessageValue}; use log::{debug, warn}; use serde::{Deserialize, Serialize}; -use serde_json::Value; use sled::{Config as DbConfig, Db}; use crate::{ @@ -29,7 +28,7 @@ const PREFIX_PEER: u8 = 4u8; /// /// The JSON value of the appended message is included. #[derive(Debug, Clone)] -pub struct StoreKvEvent(pub (String, Value)); +pub struct StoreKvEvent(pub String); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BlobStatus { @@ -288,7 +287,7 @@ impl KvStorage { // key has been updated. let broker_msg = BrokerEvent::new( Destination::Broadcast, - BrokerMessage::StoreKv(StoreKvEvent((author, msg_kvt.value))), + BrokerMessage::StoreKv(StoreKvEvent(author)), ); // Matching on the error here (instead of unwrapping) allows us to From 3090f79c166f0515608a479d44e3b33d02907ef6 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Fri, 29 Dec 2023 09:47:53 +0200 Subject: [PATCH 08/13] remove msg value from store kv event and change variable name for clarity --- solar/src/actors/muxrpc/history_stream.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/solar/src/actors/muxrpc/history_stream.rs b/solar/src/actors/muxrpc/history_stream.rs index f68cc02..406f5d7 100644 --- a/solar/src/actors/muxrpc/history_stream.rs +++ b/solar/src/actors/muxrpc/history_stream.rs @@ -85,11 +85,11 @@ where self.recv_error_response(api, *req_no, err).await } // Handle a broker message. - RpcInput::Message(BrokerMessage::StoreKv(StoreKvEvent((id, _msg)))) => { + RpcInput::Message(BrokerMessage::StoreKv(StoreKvEvent(ssb_id))) => { // Notification from the key-value store indicating that // a new message has just been appended to the feed - // identified by `id`. - return self.recv_storageevent_idchanged(api, id).await; + // identified by `ssb_id`. + return self.recv_storageevent_idchanged(api, ssb_id).await; } // Handle a timer event. RpcInput::Timer => self.on_timer(api).await, @@ -315,14 +315,14 @@ where async fn recv_storageevent_idchanged( &mut self, api: &mut ApiCaller, - id: &str, + ssb_id: &str, ) -> Result { // Attempt to remove the peer from the list of active streams. - if let Some(mut req) = self.reqs.remove(id) { + if let Some(mut req) = self.reqs.remove(ssb_id) { // Send local messages to the peer. self.send_history(api, &mut req).await?; // Reinsert the peer into the list of active streams. - self.reqs.insert(id.to_string(), req); + self.reqs.insert(ssb_id.to_string(), req); Ok(true) } else { Ok(false) From c782423faff74e2c49e726fe08e9761c113acbd3 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Fri, 29 Dec 2023 09:48:37 +0200 Subject: [PATCH 09/13] send partial vector clock when a feed is updated in the local store --- solar/src/actors/replication/ebt/manager.rs | 81 ++++++++++++--------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index a62b033..2f56bad 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -142,30 +142,30 @@ impl EbtManager { Ok(()) } - /// Retrieve the vector clock for the given SSB ID. - fn get_clock(&self, peer_id: &SsbId) -> Option { - if peer_id == &self.local_id { - Some(self.local_clock.to_owned()) - } else { - self.peer_clocks.get(peer_id).cloned() + /// Retrieve either the local vector clock or the stored vector clock + /// for the peer represented by the given SSB ID. + fn get_clock(&self, ssb_id: Option<&SsbId>) -> Option { + match ssb_id { + Some(id) => self.peer_clocks.get(id).cloned(), + None => Some(self.local_clock.to_owned()), } } /// Set or update the vector clock for the given SSB ID. - fn set_clock(&mut self, peer_id: &SsbId, clock: VectorClock) { - if peer_id == &self.local_id { + fn set_clock(&mut self, ssb_id: &SsbId, clock: VectorClock) { + if ssb_id == &self.local_id { self.local_clock = clock } else { - self.peer_clocks.insert(peer_id.to_owned(), clock); + self.peer_clocks.insert(ssb_id.to_owned(), clock); } } /// Retrieve the stored vector clock for the first peer, check for the /// second peer in the vector clock and return the value of the receive /// flag. - fn is_receiving(&self, peer_ssb_id: SsbId, ssb_id: SsbId) -> Result { + fn _is_receiving(&self, peer_ssb_id: SsbId, ssb_id: SsbId) -> Result { // Retrieve the vector clock for the first peer. - if let Some(clock) = self.get_clock(&peer_ssb_id) { + if let Some(clock) = self.get_clock(Some(&peer_ssb_id)) { // Check if the second peer is represented in the vector clock. if let Some(encoded_seq_no) = clock.get(&ssb_id) { // Check if the receive flag is true. @@ -379,7 +379,7 @@ impl EbtManager { // If a clock is received without a prior EBT replicate // request having been received from the associated peer, it is // assumed that the clock was sent in response to a locally-sent - // EBT replicate request. Ie. The session was requested by the + // EBT replicate request. Ie. the session was requested by the // local peer. if !self.active_sessions.contains_key(&peer_ssb_id) { ch_broker @@ -392,6 +392,15 @@ impl EbtManager { )), )) .await?; + + let local_clock = self.local_clock.to_owned(); + + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::SendClock(req_no, local_clock)), + )) + .await?; } self.set_clock(&peer_ssb_id, clock.to_owned()); @@ -483,32 +492,34 @@ impl EbtManager { Ok(()) } - async fn handle_local_store_updated(&self, ssb_id: SsbId, msg: Value) -> Result<()> { - // TODO: Beware of a possible race condition in this implementation. - // - // We are not considering the sequence number of the last message sent - // to any active sessions for the given feed (ie. `ssb_id`). - // - // This means that we may inadvertently end up sending out-of-order - // messages if this message is sent before the rest of the requested - // feed history. + /// Look up the latest sequence number for the updated feed, encode it as + /// the single entry of a vector clock and send that to any active session + /// peers. + async fn handle_local_store_updated(&self, ssb_id: SsbId) -> Result<()> { + // Iterate over all active EBT sessions. + for (_peer_ssb_id, req_no) in self.active_sessions.iter() { + // Look up the latest sequence for the given ID. + if let Some(seq) = KV_STORE.read().await.get_latest_seq(&ssb_id)? { + // Encode the replicate flag, receive flag and sequence. + let encoded_value: EncodedClockValue = clock::encode(true, Some(true), Some(seq))?; + + // Update the entry for `ssb_id` in the local vector clock. + if let Some(mut local_clock) = self.get_clock(None) { + local_clock.insert(ssb_id.to_owned(), encoded_value); + } - // Create channel to send messages to broker. - let mut ch_broker = BROKER.lock().await.create_sender(); + // Create a vector clock with a single entry. + let mut updated_clock = HashMap::new(); + updated_clock.insert(ssb_id.to_owned(), encoded_value); - // Iterate over all active EBT sessions. - for (peer_ssb_id, req_no) in self.active_sessions.iter() { - // If `ssb_id` appears in the vector clock of `peer_ssb_id` and - // the receive flag is set to `true`, send the message. - if self.is_receiving(peer_ssb_id.to_owned(), ssb_id.to_owned())? { + // Create channel to send messages to broker. + let mut ch_broker = BROKER.lock().await.create_sender(); + + // Send the single-entry vector clock to the active session. ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::SendMessage( - *req_no, - peer_ssb_id.to_owned(), - msg.to_owned(), - )), + BrokerMessage::Ebt(EbtEvent::SendClock(*req_no, updated_clock)), )) .await?; } @@ -640,12 +651,12 @@ impl EbtManager { } } } - } else if let Some(BrokerMessage::StoreKv(StoreKvEvent((ssb_id, msg)))) = msg { + } else if let Some(BrokerMessage::StoreKv(StoreKvEvent(ssb_id))) = msg { debug!("Received KV store event from broker"); // Respond to a key-value store state change for the given peer. // This is triggered when a new message is appended to the local feed. - if let Err(err) = self.handle_local_store_updated(ssb_id, msg).await { + if let Err(err) = self.handle_local_store_updated(ssb_id).await { error!("Error while handling 'local store updated' event: {}", err) } } From 5b9abd1d2c831086c2688c01c4f30919c0a8e6b0 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Thu, 4 Jan 2024 13:51:08 +0200 Subject: [PATCH 10/13] rename trace logging module to quieten ebt logs --- solar/src/actors/muxrpc/ebt.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solar/src/actors/muxrpc/ebt.rs b/solar/src/actors/muxrpc/ebt.rs index db682e8..285af15 100644 --- a/solar/src/actors/muxrpc/ebt.rs +++ b/solar/src/actors/muxrpc/ebt.rs @@ -47,7 +47,7 @@ where peer_ssb_id: String, active_request: Option, ) -> Result { - trace!(target: "ebt-handler", "Received MUXRPC input: {:?}", op); + trace!(target: "muxrpc-ebt-handler", "Received MUXRPC input: {:?}", op); // An outbound EBT replicate request was made before the handler was // called; add it to the map of active requests. From a46b3be3e483a022b01759ff39adcad681c0a58b Mon Sep 17 00:00:00 2001 From: mycognosist Date: Thu, 4 Jan 2024 13:58:18 +0200 Subject: [PATCH 11/13] track sent clocks and log messages when sending --- solar/src/actors/replication/ebt/manager.rs | 39 +++++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index 2f56bad..e265b74 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -72,7 +72,7 @@ impl Display for SessionRole { #[derive(Debug)] pub struct EbtManager { /// Active EBT peer sessions. - active_sessions: HashMap, + active_sessions: HashMap, /// Duration to wait before switching feed request to a different peer. _feed_wait_timeout: u64, /// The state of the replication loop. @@ -90,6 +90,11 @@ pub struct EbtManager { _requested_feeds: HashSet, /// Duration to wait for a connected peer to initiate an EBT session. session_wait_timeout: u64, + /// The latest vector clock sent for each session, identified by the + /// request number. + // + // TODO: Do we want to remove each entry when the session concludes? + sent_clocks: HashMap, /// The sequence number of the latest message sent to each peer /// for each requested feed. sent_messages: HashMap>, @@ -106,6 +111,7 @@ impl Default for EbtManager { peer_clocks: HashMap::new(), _requested_feeds: HashSet::new(), session_wait_timeout: 5, + sent_clocks: HashMap::new(), sent_messages: HashMap::new(), } } @@ -210,8 +216,9 @@ impl EbtManager { } /// Register a new EBT session for the given peer. - fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo) { - self.active_sessions.insert(peer_ssb_id.to_owned(), req_no); + fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) { + self.active_sessions + .insert(peer_ssb_id.to_owned(), (req_no, session_role)); trace!(target: "ebt-session", "Registered new EBT session {} for {}", req_no, peer_ssb_id); } @@ -341,7 +348,7 @@ impl EbtManager { ) -> Result<()> { trace!(target: "ebt-replication", "Initiated EBT session with {} as {}", peer_ssb_id, session_role); - self.register_session(&peer_ssb_id, req_no); + self.register_session(&peer_ssb_id, req_no, session_role.to_owned()); let local_clock = self.local_clock.to_owned(); match session_role { @@ -365,6 +372,10 @@ impl EbtManager { Ok(()) } + fn handle_send_clock(&mut self, req_no: ReqNo, clock: VectorClock) -> Option { + self.sent_clocks.insert(req_no, clock) + } + async fn handle_received_clock( &mut self, req_no: ReqNo, @@ -373,6 +384,9 @@ impl EbtManager { ) -> Result<()> { trace!(target: "ebt-replication", "Received vector clock: {:?}", clock); + // Update the stored vector clock for the remote peer. + self.set_clock(&peer_ssb_id, clock.to_owned()); + // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); @@ -392,9 +406,14 @@ impl EbtManager { )), )) .await?; + } + // If we have not previously sent a clock, send one now. + // + // This indicates that the local peer is acting as the session + // requester. + if self.sent_clocks.get(&req_no).is_none() { let local_clock = self.local_clock.to_owned(); - ch_broker .send(BrokerEvent::new( Destination::Broadcast, @@ -403,8 +422,6 @@ impl EbtManager { .await?; } - self.set_clock(&peer_ssb_id, clock.to_owned()); - // We want messages for all feeds in the clock, therefore the // `peer_ssb_id` parameter is set to `None`. let msgs = EbtManager::retrieve_requested_messages(None, clock).await?; @@ -497,7 +514,7 @@ impl EbtManager { /// peers. async fn handle_local_store_updated(&self, ssb_id: SsbId) -> Result<()> { // Iterate over all active EBT sessions. - for (_peer_ssb_id, req_no) in self.active_sessions.iter() { + for (_peer_ssb_id, (req_no, _session_role)) in self.active_sessions.iter() { // Look up the latest sequence for the given ID. if let Some(seq) = KV_STORE.read().await.get_latest_seq(&ssb_id)? { // Encode the replicate flag, receive flag and sequence. @@ -617,9 +634,9 @@ impl EbtManager { error!("Error while handling 'session initiated' event: {}", err) } } - EbtEvent::SendClock(_, clock) => { + EbtEvent::SendClock(req_no, clock) => { trace!(target: "ebt-replication", "Sending vector clock: {:?}", clock); - // TODO: Update sent clocks. + let _ = self.handle_send_clock(req_no, clock); } EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock) => { if let Err(err) = self.handle_received_clock(req_no, peer_ssb_id, clock).await { @@ -632,7 +649,7 @@ impl EbtManager { } } EbtEvent::SendMessage(_req_no, peer_ssb_id, msg) => { - trace!(target: "ebt-replication", "Sending message..."); + trace!(target: "ebt-replication", "Sending message: {:?}...", msg); if let Err(err) = self.handle_send_message(peer_ssb_id, msg).await { error!("Error while handling 'send message' event: {}", err) } From 07e74e0198876ae626f4330186b0b50cab713e2c Mon Sep 17 00:00:00 2001 From: mycognosist Date: Thu, 4 Jan 2024 14:24:20 +0200 Subject: [PATCH 12/13] fix message range retrieval and no-local-messages value encoding --- solar/src/actors/replication/ebt/manager.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index e265b74..6432f6a 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -207,8 +207,8 @@ impl EbtManager { self.local_clock.insert(peer_id.to_owned(), encoded_value); } else { // No messages are stored in the local database for this feed. - // Set replicate flag to `true`, receive to `false` and `seq` to 0. - let encoded_value: EncodedClockValue = clock::encode(true, Some(false), Some(0))?; + // Set replicate flag to `true`, receive to `true` and `seq` to 0. + let encoded_value: EncodedClockValue = clock::encode(true, Some(true), Some(0))?; self.local_clock.insert(peer_id.to_owned(), encoded_value); } @@ -252,7 +252,7 @@ impl EbtManager { if encoded_seq_no != -1 { if let (_replicate_flag, Some(true), Some(seq)) = clock::decode(encoded_seq_no)? { if let Some(last_seq) = KV_STORE.read().await.get_latest_seq(feed_id)? { - for n in seq..(last_seq + 1) { + for n in (seq + 1)..=last_seq { if let Some(msg_kvt) = KV_STORE.read().await.get_msg_kvt(feed_id, n)? { messages.push(msg_kvt.value) } From ed447ee2c1c126186183dbb01cd279d347ce5a88 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Thu, 11 Jan 2024 08:58:24 +0200 Subject: [PATCH 13/13] remove redundant clock receipt --- solar/src/actors/muxrpc/ebt.rs | 63 +++++++++++++--------------------- 1 file changed, 23 insertions(+), 40 deletions(-) diff --git a/solar/src/actors/muxrpc/ebt.rs b/solar/src/actors/muxrpc/ebt.rs index 285af15..13ab8fd 100644 --- a/solar/src/actors/muxrpc/ebt.rs +++ b/solar/src/actors/muxrpc/ebt.rs @@ -88,8 +88,7 @@ where } // Handle an incoming MUXRPC response. RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => { - self.recv_rpc_response(ch_broker, *req_no, res, peer_ssb_id) - .await + self.recv_rpc_response(ch_broker, *req_no, res).await } // Handle an incoming MUXRPC 'cancel stream' response. RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamResponse()) => { @@ -188,55 +187,39 @@ where Ok(false) } - /// Process an incoming MUXRPC response. The response is expected to - /// contain a vector clock or an SSB message. + /// Process an incoming MUXRPC response. + /// The response is expected to contain an SSB message. async fn recv_rpc_response( &mut self, ch_broker: &mut ChBrokerSend, req_no: ReqNo, res: &[u8], - peer_ssb_id: String, ) -> Result { + trace!(target: "ebt-handler", "Received RPC response: {}", req_no); + // Only handle the response if the associated request number is known // to us, either because we sent or received the initiating replicate // request. if self.active_requests.contains_key(&req_no) { - // The response may be a vector clock (aka. notes) or an SSB message. - // - // Since there is no explicit way to determine which was received, - // we first attempt deserialization of a vector clock and move on - // to attempting message deserialization if that fails. + // First try to deserialize the response into a message value. + // If that fails, try to deserialize into a message KVT and then + // convert that into a message value. Return an error if that fails. + // This approach allows us to handle the unlikely event that + // messages are sent as KVTs and not simply values. // - // TODO: Is matching on clock here redundant? - // We are already matching on `OtherRequest` in the handler. - if let Ok(clock) = serde_json::from_slice(res) { - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock)), - )) - .await?; - } else { - // First try to deserialize the response into a message value. - // If that fails, try to deserialize into a message KVT and then - // convert that into a message value. Return an error if that fails. - // This approach allows us to handle the unlikely event that - // messages are sent as KVTs and not simply values. - // - // Validation of the message signature and fields is also performed - // as part of the call to `from_slice`. - let msg = match Message::from_slice(res) { - Ok(msg) => msg, - Err(_) => MessageKvt::from_slice(res)?.into_message()?, - }; - - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)), - )) - .await?; - } + // Validation of the message signature and fields is also performed + // as part of the call to `from_slice`. + let msg = match Message::from_slice(res) { + Ok(msg) => msg, + Err(_) => MessageKvt::from_slice(res)?.into_message()?, + }; + + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)), + )) + .await?; } Ok(false)