Skip to content

Commit

Permalink
Merge pull request #73 from mycognosist/add_peer_method_for_scheduler
Browse files Browse the repository at this point in the history
Add `add_peer` and `remove_peer` methods to connection scheduler
  • Loading branch information
mycognosist authored Jun 8, 2023
2 parents 98dc1c8 + 88aa605 commit c8cdad1
Showing 1 changed file with 99 additions and 15 deletions.
114 changes: 99 additions & 15 deletions solar/src/actors/network/connection_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Connection Scheduler
//!
//! The scheduler takes a list of peers to connect to (including the SSB public key and an address
//! for each) and places all of them into an "eager" queue. Each peer is dialed, one by one, with
//! a delay of x seconds between each dial attempt.
//! Peers to be dialed are added to the scheduler, with the SSB public key and an address being
//! provided for each one. These peers are initially placed into an "eager" queue by the scheduler.
//! Each peer is dialed, one by one, with a delay of x seconds between each dial attempt.
//!
//! If the connection and handshake are successful, the peer is pushed to the back of the "eager"
//! queue once the connection is complete.
Expand Down Expand Up @@ -60,20 +60,43 @@ impl Default for ConnectionScheduler {
}

impl ConnectionScheduler {
/// Create a new connection scheduler and populate it with a list of peers
/// to dial.
fn new(peers: Vec<(PublicKey, String)>) -> Self {
ConnectionScheduler {
eager_peers: VecDeque::from(peers),
..Default::default()
/// Add a peer to the scheduler. The peer will be added to the queue of
/// eager peers for the initial dial attempt, as long as it is not
/// already present in either the eager or lazy queue.
fn add_peer(&mut self, peer: (PublicKey, String)) {
// Only insert the peer if it hasn't already been added to the queue
// of eager or lazy peers.
if !self.eager_peers.contains(&peer) & !self.lazy_peers.contains(&peer) {
self.eager_peers.push_back(peer)
}
}

/// Remove a peer from the scheduler, checking both the eager and lazy
/// queues.
fn _remove_peer(&mut self, peer: (PublicKey, String)) {
// First search the queue of eager peers for the given peer.
// If found, use the returned index to remove the peer.
if let Ok(index) = self
.eager_peers
.binary_search_by_key(&peer.0, |(key, _addr)| *key)
{
self.eager_peers.remove(index);
}

// Then search the queue of lazy peers and remove the peer if found.
if let Ok(index) = self
.lazy_peers
.binary_search_by_key(&peer.0, |(key, _addr)| *key)
{
self.lazy_peers.remove(index);
}
}
}

/// Start the connection scheduler.
///
/// Register the connection scheduler with the broker (as an actor), start
/// the eager and lazy dialers and listen for connection events emitted by
/// the eager and lazy dialers and listen for connection events emitted by
/// the connection manager. Update the eager and lazy peer queues according
/// to connection outcomes.
pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) -> Result<()> {
Expand All @@ -90,9 +113,15 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool)
.register("connection-scheduler", true)
.await?;

// Create a new connection scheduler and populate it with a list of peers
// to dial.
let mut scheduler = ConnectionScheduler::new(peers);
// Create a new connection scheduler.
let mut scheduler = ConnectionScheduler::default();

// Populate the scheduler with the peers to be dialed.
// These peers are added to the queue of eager peers if they have not
// previously been added to the scheduler.
for peer in peers {
scheduler.add_peer(peer)
}

// Create the tickers (aka. metronomes) which will emit messages at
// the predetermined interval. These tickers control the rates at which
Expand All @@ -118,7 +147,7 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool)
// Eager ticker emitted a tick.
eager_tick = eager_ticker.next() => {
if let Some(_tick) = eager_tick {
// Pop a peer from the list of eager peers.
// Pop a peer from the queue of eager peers.
if let Some((peer_public_key, addr)) = scheduler.eager_peers.pop_front() {
// Check if we're already connected to this peer. If so,
// push them to the back of the eager queue.
Expand All @@ -142,7 +171,7 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool)
// Lazy ticker emitted a tick.
lazy_tick = lazy_ticker.next() => {
if let Some(_tick) = lazy_tick {
// Pop a peer from the list of lazy peers.
// Pop a peer from the queue of lazy peers.
if let Some((peer_public_key, addr)) = scheduler.lazy_peers.pop_front() {
// Check if we're already connected to this peer. If so,
// push them to the back of the eager queue.
Expand Down Expand Up @@ -216,3 +245,58 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool)

Ok(())
}

#[cfg(test)]
mod test {
use super::*;

use kuska_ssb::crypto::ToSodiumObject;

#[async_std::test]
async fn test_add_and_remove_peers() -> Result<()> {
let mut connection_scheduler = ConnectionScheduler::default();

// Ensure the eager peers queue is empty.
assert!(connection_scheduler.eager_peers.len() == 0);

// Add a peer.
connection_scheduler.add_peer((
"QlQwWaj48J1Du5rHQXTPfifUFsPKLrOo6T5EfWfkqXU=.ed25519".to_ed25519_pk()?,
"ssb.mycelial.technology:8008".to_string(),
));
assert!(connection_scheduler.eager_peers.len() == 1);

// Add a second peer.
connection_scheduler.add_peer((
"HEqy940T6uB+T+d9Jaa58aNfRzLx9eRWqkZljBmnkmk=.ed25519".to_ed25519_pk()?,
"127.0.0.1:8008".to_string(),
));
assert!(connection_scheduler.eager_peers.len() == 2);

// Attempt to add the first peer again. The queue length should not
// change (no duplicate entries permitted).
connection_scheduler.add_peer((
"HEqy940T6uB+T+d9Jaa58aNfRzLx9eRWqkZljBmnkmk=.ed25519".to_ed25519_pk()?,
"127.0.0.1:8008".to_string(),
));
assert!(connection_scheduler.eager_peers.len() == 2);

// Remove the second peer.
connection_scheduler._remove_peer((
"HEqy940T6uB+T+d9Jaa58aNfRzLx9eRWqkZljBmnkmk=.ed25519".to_ed25519_pk()?,
"127.0.0.1:8008".to_string(),
));
assert!(connection_scheduler.eager_peers.len() == 1);

// Remove the first peer.
connection_scheduler._remove_peer((
"QlQwWaj48J1Du5rHQXTPfifUFsPKLrOo6T5EfWfkqXU=.ed25519".to_ed25519_pk()?,
"ssb.mycelial.technology:8008".to_string(),
));

// Ensure the eager peers queue is empty once again.
assert!(connection_scheduler.eager_peers.is_empty());

Ok(())
}
}

0 comments on commit c8cdad1

Please sign in to comment.