Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract dialer from scheduler #75

Merged
merged 6 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions solar/src/actors/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum TcpConnection {
/// The address of a remote peer.
addr: String,
/// The public key of a remote peer.
peer_public_key: ed25519::PublicKey,
public_key: ed25519::PublicKey,
},
/// An inbound TCP connection.
Listen { stream: TcpStream },
Expand Down Expand Up @@ -151,13 +151,10 @@ pub async fn actor_inner(
// Handle a TCP connection event (inbound or outbound).
let (stream, handshake) = match connection {
// Handle an outbound TCP connection event.
TcpConnection::Dial {
addr,
peer_public_key,
} => {
TcpConnection::Dial { addr, public_key } => {
// Update the data associated with this connection.
connection_data.peer_addr = Some(addr.to_owned());
connection_data.peer_public_key = Some(peer_public_key);
connection_data.peer_public_key = Some(public_key);

// Send 'connecting' connection event message via the broker.
ch_broker
Expand All @@ -180,8 +177,7 @@ pub async fn actor_inner(

// Attempt a secret handshake.
let handshake =
handshake_client(&mut stream, network_key.to_owned(), pk, sk, peer_public_key)
.await?;
handshake_client(&mut stream, network_key.to_owned(), pk, sk, public_key).await?;

info!("💃 connected to peer {}", handshake.peer_pk.to_ssb_id());

Expand Down
103 changes: 57 additions & 46 deletions solar/src/actors/network/connection_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,58 @@
//!
//! Peers to be dialed are added to the scheduler, with the SSB public key and an address being
//! provided for each one. These peers are initially placed into an "eager" queue by the scheduler.
//! Each peer is dialed, one by one, with a delay of x seconds between each dial attempt.
//! A dial request is emitted for each peer in the "eager" queue, one by one, with a delay of 5
//! seconds between each request.
//!
//! If the connection and handshake are successful, the peer is pushed to the back of the "eager"
//! queue once the connection is complete.
//!
//! If the connection or handshake are unsuccessful, the peer is pushed to the back of the "lazy"
//! queue once the connection is complete.
//!
//! Each peer in the "lazy" queue is dialed, one by one, with a delay of x * 10 seconds between
//! each dial attempt.
//! A dial request is emitted for each peer in the "lazy" queue, one by one, with a delay of 61
//! seconds between each request.
//!
//! The success or failure of each dial attempt is determined by listening to connection events from
//! the connection manager. This allows peers to be moved between queues when required.
use std::{collections::VecDeque, time::Duration};
use std::{collections::VecDeque, fmt::Display, time::Duration};

use async_std::stream;
use futures::{select_biased, stream::StreamExt, FutureExt};
use kuska_ssb::crypto::ed25519::PublicKey;
use futures::{select_biased, stream::StreamExt, FutureExt, SinkExt};
use kuska_ssb::crypto::{ed25519::PublicKey, ToSsbId};
use log::debug;

use crate::{
actors::network::{
connection,
connection::TcpConnection,
connection_manager::{ConnectionEvent, CONNECTION_MANAGER},
},
broker::{ActorEndpoint, Broker, BROKER},
config::SECRET_CONFIG,
actors::network::connection_manager::{ConnectionEvent, CONNECTION_MANAGER},
broker::{ActorEndpoint, BrokerEvent, Destination, BROKER},
Result,
};

/// A request to dial the peer identified by the given public key and address.
pub struct DialRequest(pub (PublicKey, String));

// Custom `Display` implementation so we can easily log dial requests.
impl Display for DialRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let (peer_public_key, peer_addr) = match &self {
DialRequest((key, addr)) => {
let ssb_id = key.to_ssb_id();
let peer_public_key = if ssb_id.starts_with('@') {
ssb_id
} else {
format!("@{}", ssb_id)
};

let peer_addr = addr.to_string();

(peer_public_key, peer_addr)
}
};

write!(f, "<DialRequest {} / {}>", peer_public_key, peer_addr)
}
}

#[derive(Debug)]
struct ConnectionScheduler {
/// Peers with whom the last connection attempt was successful.
Expand All @@ -44,7 +66,7 @@ struct ConnectionScheduler {
/// Defaults to 5 seconds.
eager_interval: Duration,
/// The interval in seconds between dial attempts for lazy peers.
/// Defaults to 60 seconds.
/// Defaults to 61 seconds.
lazy_interval: Duration,
}

Expand All @@ -54,7 +76,7 @@ impl Default for ConnectionScheduler {
eager_peers: VecDeque::new(),
lazy_peers: VecDeque::new(),
eager_interval: Duration::from_secs(5),
lazy_interval: Duration::from_secs(60),
lazy_interval: Duration::from_secs(61),
}
}
}
Expand Down Expand Up @@ -96,14 +118,14 @@ impl ConnectionScheduler {
/// Start the connection scheduler.
///
/// Register the connection scheduler with the broker (as an actor), start
/// the eager and lazy dialers and listen for connection events emitted by
/// the connection manager. Update the eager and lazy peer queues according
/// to connection outcomes.
pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) -> Result<()> {
/// the eager and lazy dial request emitters and listen for connection events
/// emitted by the connection manager. Update the eager and lazy peer queues
/// according to connection outcomes.
pub async fn actor(peers: Vec<(PublicKey, String)>) -> Result<()> {
// Register the connection scheduler actor with the broker.
let ActorEndpoint {
ch_terminate,
ch_broker: _,
mut ch_broker,
ch_msg,
actor_id: _,
..
Expand All @@ -125,7 +147,7 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool)

// Create the tickers (aka. metronomes) which will emit messages at
// the predetermined interval. These tickers control the rates at which
// we dial peers.
// we emit dial requests for peers.
let mut eager_ticker = stream::interval(scheduler.eager_interval).fuse();
let mut lazy_ticker = stream::interval(scheduler.lazy_interval).fuse();

Expand All @@ -148,22 +170,16 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool)
eager_tick = eager_ticker.next() => {
if let Some(_tick) = eager_tick {
// Pop a peer from the queue of eager peers.
if let Some((peer_public_key, addr)) = scheduler.eager_peers.pop_front() {
if let Some((public_key, addr)) = scheduler.eager_peers.pop_front() {
// Check if we're already connected to this peer. If so,
// push them to the back of the eager queue.
if CONNECTION_MANAGER.read().await.contains_connected_peer(&peer_public_key) {
scheduler.eager_peers.push_back((peer_public_key, addr))
if CONNECTION_MANAGER.read().await.contains_connected_peer(&public_key) {
scheduler.eager_peers.push_back((public_key, addr))
} else {
// Otherwise, dial the peer.
Broker::spawn(connection::actor(
// TODO: make this neater once config-sharing story has improved.
SECRET_CONFIG.get().unwrap().to_owned_identity()?,
TcpConnection::Dial {
addr,
peer_public_key,
},
selective_replication,
));
// Otherwise, send a dial request to the dialer.
let dial_request = DialRequest((public_key, addr));
debug!("{}", dial_request);
ch_broker.send(BrokerEvent::new(Destination::Broadcast, dial_request)).await?
}
}
}
Expand All @@ -172,21 +188,16 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool)
lazy_tick = lazy_ticker.next() => {
if let Some(_tick) = lazy_tick {
// Pop a peer from the queue of lazy peers.
if let Some((peer_public_key, addr)) = scheduler.lazy_peers.pop_front() {
if let Some((public_key, addr)) = scheduler.lazy_peers.pop_front() {
// Check if we're already connected to this peer. If so,
// push them to the back of the eager queue.
if CONNECTION_MANAGER.read().await.contains_connected_peer(&peer_public_key) {
scheduler.eager_peers.push_back((peer_public_key, addr))
if CONNECTION_MANAGER.read().await.contains_connected_peer(&public_key) {
scheduler.eager_peers.push_back((public_key, addr))
} else {
// Otherwise, dial the peer.
Broker::spawn(connection::actor(
SECRET_CONFIG.get().unwrap().to_owned_identity()?,
TcpConnection::Dial {
addr,
peer_public_key,
},
selective_replication,
));
// Otherwise, send a dial request to the dialer.
let dial_request = DialRequest((public_key, addr));
debug!("{}", dial_request);
ch_broker.send(BrokerEvent::new(Destination::Broadcast, dial_request)).await?
}
}
}
Expand Down
64 changes: 64 additions & 0 deletions solar/src/actors/network/dialer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//! Dialer
//!
//! Dial requests are received from the connection scheduler via the broker
//! message bus. Each request includes the public key and address of the peer
//! to be dialed. Upon receiving a request, the dialer spawns the connection actor.
use futures::{select_biased, FutureExt, StreamExt};

use crate::{
actors::network::{connection, connection::TcpConnection, connection_scheduler::DialRequest},
broker::{ActorEndpoint, Broker, BROKER},
config::SECRET_CONFIG,
Result,
};

/// Start the dialer.
///
/// Register the connection dialer with the broker (as an actor) and listen
/// for dial requests from the scheduler. Once received, use the attached
/// public key and outbound address to dial the peer by spawning the connection
/// actor.
pub async fn actor(selective_replication: bool) -> Result<()> {
// Register the connection dialer actor with the broker.
let ActorEndpoint {
ch_terminate,
ch_broker: _,
ch_msg,
actor_id: _,
..
} = BROKER.lock().await.register("dialer", true).await?;

// Fuse internal termination channel with external channel.
// This allows termination of the dialer loop to be initiated from
// outside this function.
let mut ch_terminate_fuse = ch_terminate.fuse();

let mut broker_msg_ch = ch_msg.unwrap();

// Listen for dial request events via the broker message bus and dial peers.
loop {
select_biased! {
// Received termination signal. Break out of the loop.
_value = ch_terminate_fuse => {
break;
},
// Received a message from the connection scheduler via the broker.
msg = broker_msg_ch.next().fuse() => {
if let Some(msg) = msg {
if let Some(DialRequest((public_key, addr))) = msg.downcast_ref::<DialRequest>() {
Broker::spawn(connection::actor(
SECRET_CONFIG.get().unwrap().to_owned_identity()?,
TcpConnection::Dial {
addr: addr.to_string(),
public_key: *public_key,
},
selective_replication,
));
}
}
}
}
}

Ok(())
}
7 changes: 2 additions & 5 deletions solar/src/actors/network/lan_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,13 @@ async fn process_broadcast(

// Attempt to parse the IP / hostname, port and public key from the received
// UDP broadcast message.
if let Some((server, port, peer_public_key)) = LanBroadcast::parse(&msg) {
if let Some((server, port, public_key)) = LanBroadcast::parse(&msg) {
let addr = format!("{server}:{port}");

// Spawn a connection actor with the given connection parameters.
Broker::spawn(connection::actor(
server_id.clone(),
TcpConnection::Dial {
addr,
peer_public_key,
},
TcpConnection::Dial { addr, public_key },
selective_replication,
));
} else {
Expand Down
1 change: 1 addition & 0 deletions solar/src/actors/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ pub mod config;
pub mod connection;
pub mod connection_manager;
pub mod connection_scheduler;
pub mod dialer;
pub mod lan_discovery;
pub mod tcp_server;
17 changes: 10 additions & 7 deletions solar/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::{
actors::{
jsonrpc,
network::{
connection_manager::CONNECTION_MANAGER, connection_scheduler, lan_discovery, tcp_server,
connection_manager::CONNECTION_MANAGER, connection_scheduler, dialer, lan_discovery,
tcp_server,
},
},
broker::*,
Expand Down Expand Up @@ -120,12 +121,14 @@ impl Node {
// Add any connection details supplied via the `--connect` CLI option.
peers_to_dial.extend(config.network.connect);

// Spawn the connection scheduler actor. Dials remote peers on an
// ongoing basis (at `eager` or `lazy` intervals).
Broker::spawn(connection_scheduler::actor(
peers_to_dial,
config.replication.selective,
));
// Spawn the connection dialer actor. Dials remote peers as dial
// requests are received from the connection scheduler.
Broker::spawn(dialer::actor(config.replication.selective));

// Spawn the connection scheduler actor. Sends dial requests to the
// dialer for remote peers on an ongoing basis (at `eager` or `lazy`
// intervals).
Broker::spawn(connection_scheduler::actor(peers_to_dial));

// Spawn the connection manager message loop.
let connection_manager_msgloop = CONNECTION_MANAGER.write().await.take_msgloop();
Expand Down