diff --git a/solar/src/actors/network/connection_scheduler.rs b/solar/src/actors/network/connection_scheduler.rs index e384200..601486b 100644 --- a/solar/src/actors/network/connection_scheduler.rs +++ b/solar/src/actors/network/connection_scheduler.rs @@ -1,8 +1,8 @@ //! Connection Scheduler //! -//! The scheduler takes a list of peers to connect to (including the SSB public key and an address -//! for each) and places all of them into an "eager" queue. Each peer is dialed, one by one, with -//! a delay of x seconds between each dial attempt. +//! Peers to be dialed are added to the scheduler, with the SSB public key and an address being +//! provided for each one. These peers are initially placed into an "eager" queue by the scheduler. +//! Each peer is dialed, one by one, with a delay of x seconds between each dial attempt. //! //! If the connection and handshake are successful, the peer is pushed to the back of the "eager" //! queue once the connection is complete. @@ -60,12 +60,35 @@ impl Default for ConnectionScheduler { } impl ConnectionScheduler { - /// Create a new connection scheduler and populate it with a list of peers - /// to dial. - fn new(peers: Vec<(PublicKey, String)>) -> Self { - ConnectionScheduler { - eager_peers: VecDeque::from(peers), - ..Default::default() + /// Add a peer to the scheduler. The peer will be added to the queue of + /// eager peers for the initial dial attempt, as long as it is not + /// already present in either the eager or lazy queue. + fn add_peer(&mut self, peer: (PublicKey, String)) { + // Only insert the peer if it hasn't already been added to the queue + // of eager or lazy peers. + if !self.eager_peers.contains(&peer) & !self.lazy_peers.contains(&peer) { + self.eager_peers.push_back(peer) + } + } + + /// Remove a peer from the scheduler, checking both the eager and lazy + /// queues. + fn _remove_peer(&mut self, peer: (PublicKey, String)) { + // First search the queue of eager peers for the given peer. + // If found, use the returned index to remove the peer. + if let Ok(index) = self + .eager_peers + .binary_search_by_key(&peer.0, |(key, _addr)| *key) + { + self.eager_peers.remove(index); + } + + // Then search the queue of lazy peers and remove the peer if found. + if let Ok(index) = self + .lazy_peers + .binary_search_by_key(&peer.0, |(key, _addr)| *key) + { + self.lazy_peers.remove(index); } } } @@ -73,7 +96,7 @@ impl ConnectionScheduler { /// Start the connection scheduler. /// /// Register the connection scheduler with the broker (as an actor), start -/// the eager and lazy dialers and listen for connection events emitted by +/// the eager and lazy dialers and listen for connection events emitted by /// the connection manager. Update the eager and lazy peer queues according /// to connection outcomes. pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) -> Result<()> { @@ -90,9 +113,15 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) .register("connection-scheduler", true) .await?; - // Create a new connection scheduler and populate it with a list of peers - // to dial. - let mut scheduler = ConnectionScheduler::new(peers); + // Create a new connection scheduler. + let mut scheduler = ConnectionScheduler::default(); + + // Populate the scheduler with the peers to be dialed. + // These peers are added to the queue of eager peers if they have not + // previously been added to the scheduler. + for peer in peers { + scheduler.add_peer(peer) + } // Create the tickers (aka. metronomes) which will emit messages at // the predetermined interval. These tickers control the rates at which @@ -118,7 +147,7 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) // Eager ticker emitted a tick. eager_tick = eager_ticker.next() => { if let Some(_tick) = eager_tick { - // Pop a peer from the list of eager peers. + // Pop a peer from the queue of eager peers. if let Some((peer_public_key, addr)) = scheduler.eager_peers.pop_front() { // Check if we're already connected to this peer. If so, // push them to the back of the eager queue. @@ -142,7 +171,7 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) // Lazy ticker emitted a tick. lazy_tick = lazy_ticker.next() => { if let Some(_tick) = lazy_tick { - // Pop a peer from the list of lazy peers. + // Pop a peer from the queue of lazy peers. if let Some((peer_public_key, addr)) = scheduler.lazy_peers.pop_front() { // Check if we're already connected to this peer. If so, // push them to the back of the eager queue. @@ -216,3 +245,58 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) Ok(()) } + +#[cfg(test)] +mod test { + use super::*; + + use kuska_ssb::crypto::ToSodiumObject; + + #[async_std::test] + async fn test_add_and_remove_peers() -> Result<()> { + let mut connection_scheduler = ConnectionScheduler::default(); + + // Ensure the eager peers queue is empty. + assert!(connection_scheduler.eager_peers.len() == 0); + + // Add a peer. + connection_scheduler.add_peer(( + "QlQwWaj48J1Du5rHQXTPfifUFsPKLrOo6T5EfWfkqXU=.ed25519".to_ed25519_pk()?, + "ssb.mycelial.technology:8008".to_string(), + )); + assert!(connection_scheduler.eager_peers.len() == 1); + + // Add a second peer. + connection_scheduler.add_peer(( + "HEqy940T6uB+T+d9Jaa58aNfRzLx9eRWqkZljBmnkmk=.ed25519".to_ed25519_pk()?, + "127.0.0.1:8008".to_string(), + )); + assert!(connection_scheduler.eager_peers.len() == 2); + + // Attempt to add the first peer again. The queue length should not + // change (no duplicate entries permitted). + connection_scheduler.add_peer(( + "HEqy940T6uB+T+d9Jaa58aNfRzLx9eRWqkZljBmnkmk=.ed25519".to_ed25519_pk()?, + "127.0.0.1:8008".to_string(), + )); + assert!(connection_scheduler.eager_peers.len() == 2); + + // Remove the second peer. + connection_scheduler._remove_peer(( + "HEqy940T6uB+T+d9Jaa58aNfRzLx9eRWqkZljBmnkmk=.ed25519".to_ed25519_pk()?, + "127.0.0.1:8008".to_string(), + )); + assert!(connection_scheduler.eager_peers.len() == 1); + + // Remove the first peer. + connection_scheduler._remove_peer(( + "QlQwWaj48J1Du5rHQXTPfifUFsPKLrOo6T5EfWfkqXU=.ed25519".to_ed25519_pk()?, + "ssb.mycelial.technology:8008".to_string(), + )); + + // Ensure the eager peers queue is empty once again. + assert!(connection_scheduler.eager_peers.is_empty()); + + Ok(()) + } +}