Skip to content

Commit

Permalink
Merge pull request #75 from mycognosist/extract_dialer_from_scheduler
Browse files Browse the repository at this point in the history
Extract dialer from scheduler
  • Loading branch information
mycognosist authored Jun 12, 2023
2 parents c8cdad1 + 3954a75 commit f0f39f8
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 66 deletions.
12 changes: 4 additions & 8 deletions solar/src/actors/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum TcpConnection {
/// The address of a remote peer.
addr: String,
/// The public key of a remote peer.
peer_public_key: ed25519::PublicKey,
public_key: ed25519::PublicKey,
},
/// An inbound TCP connection.
Listen { stream: TcpStream },
Expand Down Expand Up @@ -151,13 +151,10 @@ pub async fn actor_inner(
// Handle a TCP connection event (inbound or outbound).
let (stream, handshake) = match connection {
// Handle an outbound TCP connection event.
TcpConnection::Dial {
addr,
peer_public_key,
} => {
TcpConnection::Dial { addr, public_key } => {
// Update the data associated with this connection.
connection_data.peer_addr = Some(addr.to_owned());
connection_data.peer_public_key = Some(peer_public_key);
connection_data.peer_public_key = Some(public_key);

// Send 'connecting' connection event message via the broker.
ch_broker
Expand All @@ -180,8 +177,7 @@ pub async fn actor_inner(

// Attempt a secret handshake.
let handshake =
handshake_client(&mut stream, network_key.to_owned(), pk, sk, peer_public_key)
.await?;
handshake_client(&mut stream, network_key.to_owned(), pk, sk, public_key).await?;

info!("💃 connected to peer {}", handshake.peer_pk.to_ssb_id());

Expand Down
103 changes: 57 additions & 46 deletions solar/src/actors/network/connection_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,58 @@
//!
//! Peers to be dialed are added to the scheduler, with the SSB public key and an address being
//! provided for each one. These peers are initially placed into an "eager" queue by the scheduler.
//! Each peer is dialed, one by one, with a delay of x seconds between each dial attempt.
//! A dial request is emitted for each peer in the "eager" queue, one by one, with a delay of 5
//! seconds between each request.
//!
//! If the connection and handshake are successful, the peer is pushed to the back of the "eager"
//! queue once the connection is complete.
//!
//! If the connection or handshake are unsuccessful, the peer is pushed to the back of the "lazy"
//! queue once the connection is complete.
//!
//! Each peer in the "lazy" queue is dialed, one by one, with a delay of x * 10 seconds between
//! each dial attempt.
//! A dial request is emitted for each peer in the "lazy" queue, one by one, with a delay of 61
//! seconds between each request.
//!
//! The success or failure of each dial attempt is determined by listening to connection events from
//! the connection manager. This allows peers to be moved between queues when required.
use std::{collections::VecDeque, time::Duration};
use std::{collections::VecDeque, fmt::Display, time::Duration};

use async_std::stream;
use futures::{select_biased, stream::StreamExt, FutureExt};
use kuska_ssb::crypto::ed25519::PublicKey;
use futures::{select_biased, stream::StreamExt, FutureExt, SinkExt};
use kuska_ssb::crypto::{ed25519::PublicKey, ToSsbId};
use log::debug;

use crate::{
actors::network::{
connection,
connection::TcpConnection,
connection_manager::{ConnectionEvent, CONNECTION_MANAGER},
},
broker::{ActorEndpoint, Broker, BROKER},
config::SECRET_CONFIG,
actors::network::connection_manager::{ConnectionEvent, CONNECTION_MANAGER},
broker::{ActorEndpoint, BrokerEvent, Destination, BROKER},
Result,
};

/// A request to dial the peer identified by the given public key and address.
pub struct DialRequest(pub (PublicKey, String));

// Custom `Display` implementation so we can easily log dial requests.
impl Display for DialRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let (peer_public_key, peer_addr) = match &self {
DialRequest((key, addr)) => {
let ssb_id = key.to_ssb_id();
let peer_public_key = if ssb_id.starts_with('@') {
ssb_id
} else {
format!("@{}", ssb_id)
};

let peer_addr = addr.to_string();

(peer_public_key, peer_addr)
}
};

write!(f, "<DialRequest {} / {}>", peer_public_key, peer_addr)
}
}

#[derive(Debug)]
struct ConnectionScheduler {
/// Peers with whom the last connection attempt was successful.
Expand All @@ -44,7 +66,7 @@ struct ConnectionScheduler {
/// Defaults to 5 seconds.
eager_interval: Duration,
/// The interval in seconds between dial attempts for lazy peers.
/// Defaults to 60 seconds.
/// Defaults to 61 seconds.
lazy_interval: Duration,
}

Expand All @@ -54,7 +76,7 @@ impl Default for ConnectionScheduler {
eager_peers: VecDeque::new(),
lazy_peers: VecDeque::new(),
eager_interval: Duration::from_secs(5),
lazy_interval: Duration::from_secs(60),
lazy_interval: Duration::from_secs(61),
}
}
}
Expand Down Expand Up @@ -96,14 +118,14 @@ impl ConnectionScheduler {
/// Start the connection scheduler.
///
/// Register the connection scheduler with the broker (as an actor), start
/// the eager and lazy dialers and listen for connection events emitted by
/// the connection manager. Update the eager and lazy peer queues according
/// to connection outcomes.
pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) -> Result<()> {
/// the eager and lazy dial request emitters and listen for connection events
/// emitted by the connection manager. Update the eager and lazy peer queues
/// according to connection outcomes.
pub async fn actor(peers: Vec<(PublicKey, String)>) -> Result<()> {
// Register the connection scheduler actor with the broker.
let ActorEndpoint {
ch_terminate,
ch_broker: _,
mut ch_broker,
ch_msg,
actor_id: _,
..
Expand All @@ -125,7 +147,7 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool)

// Create the tickers (aka. metronomes) which will emit messages at
// the predetermined interval. These tickers control the rates at which
// we dial peers.
// we emit dial requests for peers.
let mut eager_ticker = stream::interval(scheduler.eager_interval).fuse();
let mut lazy_ticker = stream::interval(scheduler.lazy_interval).fuse();

Expand All @@ -148,22 +170,16 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool)
eager_tick = eager_ticker.next() => {
if let Some(_tick) = eager_tick {
// Pop a peer from the queue of eager peers.
if let Some((peer_public_key, addr)) = scheduler.eager_peers.pop_front() {
if let Some((public_key, addr)) = scheduler.eager_peers.pop_front() {
// Check if we're already connected to this peer. If so,
// push them to the back of the eager queue.
if CONNECTION_MANAGER.read().await.contains_connected_peer(&peer_public_key) {
scheduler.eager_peers.push_back((peer_public_key, addr))
if CONNECTION_MANAGER.read().await.contains_connected_peer(&public_key) {
scheduler.eager_peers.push_back((public_key, addr))
} else {
// Otherwise, dial the peer.
Broker::spawn(connection::actor(
// TODO: make this neater once config-sharing story has improved.
SECRET_CONFIG.get().unwrap().to_owned_identity()?,
TcpConnection::Dial {
addr,
peer_public_key,
},
selective_replication,
));
// Otherwise, send a dial request to the dialer.
let dial_request = DialRequest((public_key, addr));
debug!("{}", dial_request);
ch_broker.send(BrokerEvent::new(Destination::Broadcast, dial_request)).await?
}
}
}
Expand All @@ -172,21 +188,16 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool)
lazy_tick = lazy_ticker.next() => {
if let Some(_tick) = lazy_tick {
// Pop a peer from the queue of lazy peers.
if let Some((peer_public_key, addr)) = scheduler.lazy_peers.pop_front() {
if let Some((public_key, addr)) = scheduler.lazy_peers.pop_front() {
// Check if we're already connected to this peer. If so,
// push them to the back of the eager queue.
if CONNECTION_MANAGER.read().await.contains_connected_peer(&peer_public_key) {
scheduler.eager_peers.push_back((peer_public_key, addr))
if CONNECTION_MANAGER.read().await.contains_connected_peer(&public_key) {
scheduler.eager_peers.push_back((public_key, addr))
} else {
// Otherwise, dial the peer.
Broker::spawn(connection::actor(
SECRET_CONFIG.get().unwrap().to_owned_identity()?,
TcpConnection::Dial {
addr,
peer_public_key,
},
selective_replication,
));
// Otherwise, send a dial request to the dialer.
let dial_request = DialRequest((public_key, addr));
debug!("{}", dial_request);
ch_broker.send(BrokerEvent::new(Destination::Broadcast, dial_request)).await?
}
}
}
Expand Down
64 changes: 64 additions & 0 deletions solar/src/actors/network/dialer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//! Dialer
//!
//! Dial requests are received from the connection scheduler via the broker
//! message bus. Each request includes the public key and address of the peer
//! to be dialed. Upon receiving a request, the dialer spawns the connection actor.
use futures::{select_biased, FutureExt, StreamExt};

use crate::{
actors::network::{connection, connection::TcpConnection, connection_scheduler::DialRequest},
broker::{ActorEndpoint, Broker, BROKER},
config::SECRET_CONFIG,
Result,
};

/// Start the dialer.
///
/// Register the connection dialer with the broker (as an actor) and listen
/// for dial requests from the scheduler. Once received, use the attached
/// public key and outbound address to dial the peer by spawning the connection
/// actor.
pub async fn actor(selective_replication: bool) -> Result<()> {
// Register the connection dialer actor with the broker.
let ActorEndpoint {
ch_terminate,
ch_broker: _,
ch_msg,
actor_id: _,
..
} = BROKER.lock().await.register("dialer", true).await?;

// Fuse internal termination channel with external channel.
// This allows termination of the dialer loop to be initiated from
// outside this function.
let mut ch_terminate_fuse = ch_terminate.fuse();

let mut broker_msg_ch = ch_msg.unwrap();

// Listen for dial request events via the broker message bus and dial peers.
loop {
select_biased! {
// Received termination signal. Break out of the loop.
_value = ch_terminate_fuse => {
break;
},
// Received a message from the connection scheduler via the broker.
msg = broker_msg_ch.next().fuse() => {
if let Some(msg) = msg {
if let Some(DialRequest((public_key, addr))) = msg.downcast_ref::<DialRequest>() {
Broker::spawn(connection::actor(
SECRET_CONFIG.get().unwrap().to_owned_identity()?,
TcpConnection::Dial {
addr: addr.to_string(),
public_key: *public_key,
},
selective_replication,
));
}
}
}
}
}

Ok(())
}
7 changes: 2 additions & 5 deletions solar/src/actors/network/lan_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,13 @@ async fn process_broadcast(

// Attempt to parse the IP / hostname, port and public key from the received
// UDP broadcast message.
if let Some((server, port, peer_public_key)) = LanBroadcast::parse(&msg) {
if let Some((server, port, public_key)) = LanBroadcast::parse(&msg) {
let addr = format!("{server}:{port}");

// Spawn a connection actor with the given connection parameters.
Broker::spawn(connection::actor(
server_id.clone(),
TcpConnection::Dial {
addr,
peer_public_key,
},
TcpConnection::Dial { addr, public_key },
selective_replication,
));
} else {
Expand Down
1 change: 1 addition & 0 deletions solar/src/actors/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ pub mod config;
pub mod connection;
pub mod connection_manager;
pub mod connection_scheduler;
pub mod dialer;
pub mod lan_discovery;
pub mod tcp_server;
17 changes: 10 additions & 7 deletions solar/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::{
actors::{
jsonrpc,
network::{
connection_manager::CONNECTION_MANAGER, connection_scheduler, lan_discovery, tcp_server,
connection_manager::CONNECTION_MANAGER, connection_scheduler, dialer, lan_discovery,
tcp_server,
},
},
broker::*,
Expand Down Expand Up @@ -120,12 +121,14 @@ impl Node {
// Add any connection details supplied via the `--connect` CLI option.
peers_to_dial.extend(config.network.connect);

// Spawn the connection scheduler actor. Dials remote peers on an
// ongoing basis (at `eager` or `lazy` intervals).
Broker::spawn(connection_scheduler::actor(
peers_to_dial,
config.replication.selective,
));
// Spawn the connection dialer actor. Dials remote peers as dial
// requests are received from the connection scheduler.
Broker::spawn(dialer::actor(config.replication.selective));

// Spawn the connection scheduler actor. Sends dial requests to the
// dialer for remote peers on an ongoing basis (at `eager` or `lazy`
// intervals).
Broker::spawn(connection_scheduler::actor(peers_to_dial));

// Spawn the connection manager message loop.
let connection_manager_msgloop = CONNECTION_MANAGER.write().await.take_msgloop();
Expand Down

0 comments on commit f0f39f8

Please sign in to comment.