diff --git a/solar/src/actors/network/connection.rs b/solar/src/actors/network/connection.rs index bb8d885..aa74276 100644 --- a/solar/src/actors/network/connection.rs +++ b/solar/src/actors/network/connection.rs @@ -23,7 +23,7 @@ pub enum TcpConnection { /// The address of a remote peer. addr: String, /// The public key of a remote peer. - peer_public_key: ed25519::PublicKey, + public_key: ed25519::PublicKey, }, /// An inbound TCP connection. Listen { stream: TcpStream }, @@ -151,13 +151,10 @@ pub async fn actor_inner( // Handle a TCP connection event (inbound or outbound). let (stream, handshake) = match connection { // Handle an outbound TCP connection event. - TcpConnection::Dial { - addr, - peer_public_key, - } => { + TcpConnection::Dial { addr, public_key } => { // Update the data associated with this connection. connection_data.peer_addr = Some(addr.to_owned()); - connection_data.peer_public_key = Some(peer_public_key); + connection_data.peer_public_key = Some(public_key); // Send 'connecting' connection event message via the broker. ch_broker @@ -180,8 +177,7 @@ pub async fn actor_inner( // Attempt a secret handshake. let handshake = - handshake_client(&mut stream, network_key.to_owned(), pk, sk, peer_public_key) - .await?; + handshake_client(&mut stream, network_key.to_owned(), pk, sk, public_key).await?; info!("💃 connected to peer {}", handshake.peer_pk.to_ssb_id()); diff --git a/solar/src/actors/network/connection_scheduler.rs b/solar/src/actors/network/connection_scheduler.rs index 601486b..6d1c618 100644 --- a/solar/src/actors/network/connection_scheduler.rs +++ b/solar/src/actors/network/connection_scheduler.rs @@ -2,7 +2,8 @@ //! //! Peers to be dialed are added to the scheduler, with the SSB public key and an address being //! provided for each one. These peers are initially placed into an "eager" queue by the scheduler. -//! Each peer is dialed, one by one, with a delay of x seconds between each dial attempt. +//! A dial request is emitted for each peer in the "eager" queue, one by one, with a delay of 5 +//! seconds between each request. //! //! If the connection and handshake are successful, the peer is pushed to the back of the "eager" //! queue once the connection is complete. @@ -10,28 +11,49 @@ //! If the connection or handshake are unsuccessful, the peer is pushed to the back of the "lazy" //! queue once the connection is complete. //! -//! Each peer in the "lazy" queue is dialed, one by one, with a delay of x * 10 seconds between -//! each dial attempt. +//! A dial request is emitted for each peer in the "lazy" queue, one by one, with a delay of 61 +//! seconds between each request. //! //! The success or failure of each dial attempt is determined by listening to connection events from //! the connection manager. This allows peers to be moved between queues when required. -use std::{collections::VecDeque, time::Duration}; +use std::{collections::VecDeque, fmt::Display, time::Duration}; use async_std::stream; -use futures::{select_biased, stream::StreamExt, FutureExt}; -use kuska_ssb::crypto::ed25519::PublicKey; +use futures::{select_biased, stream::StreamExt, FutureExt, SinkExt}; +use kuska_ssb::crypto::{ed25519::PublicKey, ToSsbId}; +use log::debug; use crate::{ - actors::network::{ - connection, - connection::TcpConnection, - connection_manager::{ConnectionEvent, CONNECTION_MANAGER}, - }, - broker::{ActorEndpoint, Broker, BROKER}, - config::SECRET_CONFIG, + actors::network::connection_manager::{ConnectionEvent, CONNECTION_MANAGER}, + broker::{ActorEndpoint, BrokerEvent, Destination, BROKER}, Result, }; +/// A request to dial the peer identified by the given public key and address. +pub struct DialRequest(pub (PublicKey, String)); + +// Custom `Display` implementation so we can easily log dial requests. +impl Display for DialRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let (peer_public_key, peer_addr) = match &self { + DialRequest((key, addr)) => { + let ssb_id = key.to_ssb_id(); + let peer_public_key = if ssb_id.starts_with('@') { + ssb_id + } else { + format!("@{}", ssb_id) + }; + + let peer_addr = addr.to_string(); + + (peer_public_key, peer_addr) + } + }; + + write!(f, "", peer_public_key, peer_addr) + } +} + #[derive(Debug)] struct ConnectionScheduler { /// Peers with whom the last connection attempt was successful. @@ -44,7 +66,7 @@ struct ConnectionScheduler { /// Defaults to 5 seconds. eager_interval: Duration, /// The interval in seconds between dial attempts for lazy peers. - /// Defaults to 60 seconds. + /// Defaults to 61 seconds. lazy_interval: Duration, } @@ -54,7 +76,7 @@ impl Default for ConnectionScheduler { eager_peers: VecDeque::new(), lazy_peers: VecDeque::new(), eager_interval: Duration::from_secs(5), - lazy_interval: Duration::from_secs(60), + lazy_interval: Duration::from_secs(61), } } } @@ -96,14 +118,14 @@ impl ConnectionScheduler { /// Start the connection scheduler. /// /// Register the connection scheduler with the broker (as an actor), start -/// the eager and lazy dialers and listen for connection events emitted by -/// the connection manager. Update the eager and lazy peer queues according -/// to connection outcomes. -pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) -> Result<()> { +/// the eager and lazy dial request emitters and listen for connection events +/// emitted by the connection manager. Update the eager and lazy peer queues +/// according to connection outcomes. +pub async fn actor(peers: Vec<(PublicKey, String)>) -> Result<()> { // Register the connection scheduler actor with the broker. let ActorEndpoint { ch_terminate, - ch_broker: _, + mut ch_broker, ch_msg, actor_id: _, .. @@ -125,7 +147,7 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) // Create the tickers (aka. metronomes) which will emit messages at // the predetermined interval. These tickers control the rates at which - // we dial peers. + // we emit dial requests for peers. let mut eager_ticker = stream::interval(scheduler.eager_interval).fuse(); let mut lazy_ticker = stream::interval(scheduler.lazy_interval).fuse(); @@ -148,22 +170,16 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) eager_tick = eager_ticker.next() => { if let Some(_tick) = eager_tick { // Pop a peer from the queue of eager peers. - if let Some((peer_public_key, addr)) = scheduler.eager_peers.pop_front() { + if let Some((public_key, addr)) = scheduler.eager_peers.pop_front() { // Check if we're already connected to this peer. If so, // push them to the back of the eager queue. - if CONNECTION_MANAGER.read().await.contains_connected_peer(&peer_public_key) { - scheduler.eager_peers.push_back((peer_public_key, addr)) + if CONNECTION_MANAGER.read().await.contains_connected_peer(&public_key) { + scheduler.eager_peers.push_back((public_key, addr)) } else { - // Otherwise, dial the peer. - Broker::spawn(connection::actor( - // TODO: make this neater once config-sharing story has improved. - SECRET_CONFIG.get().unwrap().to_owned_identity()?, - TcpConnection::Dial { - addr, - peer_public_key, - }, - selective_replication, - )); + // Otherwise, send a dial request to the dialer. + let dial_request = DialRequest((public_key, addr)); + debug!("{}", dial_request); + ch_broker.send(BrokerEvent::new(Destination::Broadcast, dial_request)).await? } } } @@ -172,21 +188,16 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) lazy_tick = lazy_ticker.next() => { if let Some(_tick) = lazy_tick { // Pop a peer from the queue of lazy peers. - if let Some((peer_public_key, addr)) = scheduler.lazy_peers.pop_front() { + if let Some((public_key, addr)) = scheduler.lazy_peers.pop_front() { // Check if we're already connected to this peer. If so, // push them to the back of the eager queue. - if CONNECTION_MANAGER.read().await.contains_connected_peer(&peer_public_key) { - scheduler.eager_peers.push_back((peer_public_key, addr)) + if CONNECTION_MANAGER.read().await.contains_connected_peer(&public_key) { + scheduler.eager_peers.push_back((public_key, addr)) } else { - // Otherwise, dial the peer. - Broker::spawn(connection::actor( - SECRET_CONFIG.get().unwrap().to_owned_identity()?, - TcpConnection::Dial { - addr, - peer_public_key, - }, - selective_replication, - )); + // Otherwise, send a dial request to the dialer. + let dial_request = DialRequest((public_key, addr)); + debug!("{}", dial_request); + ch_broker.send(BrokerEvent::new(Destination::Broadcast, dial_request)).await? } } } diff --git a/solar/src/actors/network/dialer.rs b/solar/src/actors/network/dialer.rs new file mode 100644 index 0000000..2e91100 --- /dev/null +++ b/solar/src/actors/network/dialer.rs @@ -0,0 +1,64 @@ +//! Dialer +//! +//! Dial requests are received from the connection scheduler via the broker +//! message bus. Each request includes the public key and address of the peer +//! to be dialed. Upon receiving a request, the dialer spawns the connection actor. +use futures::{select_biased, FutureExt, StreamExt}; + +use crate::{ + actors::network::{connection, connection::TcpConnection, connection_scheduler::DialRequest}, + broker::{ActorEndpoint, Broker, BROKER}, + config::SECRET_CONFIG, + Result, +}; + +/// Start the dialer. +/// +/// Register the connection dialer with the broker (as an actor) and listen +/// for dial requests from the scheduler. Once received, use the attached +/// public key and outbound address to dial the peer by spawning the connection +/// actor. +pub async fn actor(selective_replication: bool) -> Result<()> { + // Register the connection dialer actor with the broker. + let ActorEndpoint { + ch_terminate, + ch_broker: _, + ch_msg, + actor_id: _, + .. + } = BROKER.lock().await.register("dialer", true).await?; + + // Fuse internal termination channel with external channel. + // This allows termination of the dialer loop to be initiated from + // outside this function. + let mut ch_terminate_fuse = ch_terminate.fuse(); + + let mut broker_msg_ch = ch_msg.unwrap(); + + // Listen for dial request events via the broker message bus and dial peers. + loop { + select_biased! { + // Received termination signal. Break out of the loop. + _value = ch_terminate_fuse => { + break; + }, + // Received a message from the connection scheduler via the broker. + msg = broker_msg_ch.next().fuse() => { + if let Some(msg) = msg { + if let Some(DialRequest((public_key, addr))) = msg.downcast_ref::() { + Broker::spawn(connection::actor( + SECRET_CONFIG.get().unwrap().to_owned_identity()?, + TcpConnection::Dial { + addr: addr.to_string(), + public_key: *public_key, + }, + selective_replication, + )); + } + } + } + } + } + + Ok(()) +} diff --git a/solar/src/actors/network/lan_discovery.rs b/solar/src/actors/network/lan_discovery.rs index ff5be4f..f62999c 100644 --- a/solar/src/actors/network/lan_discovery.rs +++ b/solar/src/actors/network/lan_discovery.rs @@ -88,16 +88,13 @@ async fn process_broadcast( // Attempt to parse the IP / hostname, port and public key from the received // UDP broadcast message. - if let Some((server, port, peer_public_key)) = LanBroadcast::parse(&msg) { + if let Some((server, port, public_key)) = LanBroadcast::parse(&msg) { let addr = format!("{server}:{port}"); // Spawn a connection actor with the given connection parameters. Broker::spawn(connection::actor( server_id.clone(), - TcpConnection::Dial { - addr, - peer_public_key, - }, + TcpConnection::Dial { addr, public_key }, selective_replication, )); } else { diff --git a/solar/src/actors/network/mod.rs b/solar/src/actors/network/mod.rs index e602957..d1bcfdf 100644 --- a/solar/src/actors/network/mod.rs +++ b/solar/src/actors/network/mod.rs @@ -2,5 +2,6 @@ pub mod config; pub mod connection; pub mod connection_manager; pub mod connection_scheduler; +pub mod dialer; pub mod lan_discovery; pub mod tcp_server; diff --git a/solar/src/node.rs b/solar/src/node.rs index 23112ce..8ef0463 100644 --- a/solar/src/node.rs +++ b/solar/src/node.rs @@ -9,7 +9,8 @@ use crate::{ actors::{ jsonrpc, network::{ - connection_manager::CONNECTION_MANAGER, connection_scheduler, lan_discovery, tcp_server, + connection_manager::CONNECTION_MANAGER, connection_scheduler, dialer, lan_discovery, + tcp_server, }, }, broker::*, @@ -120,12 +121,14 @@ impl Node { // Add any connection details supplied via the `--connect` CLI option. peers_to_dial.extend(config.network.connect); - // Spawn the connection scheduler actor. Dials remote peers on an - // ongoing basis (at `eager` or `lazy` intervals). - Broker::spawn(connection_scheduler::actor( - peers_to_dial, - config.replication.selective, - )); + // Spawn the connection dialer actor. Dials remote peers as dial + // requests are received from the connection scheduler. + Broker::spawn(dialer::actor(config.replication.selective)); + + // Spawn the connection scheduler actor. Sends dial requests to the + // dialer for remote peers on an ongoing basis (at `eager` or `lazy` + // intervals). + Broker::spawn(connection_scheduler::actor(peers_to_dial)); // Spawn the connection manager message loop. let connection_manager_msgloop = CONNECTION_MANAGER.write().await.take_msgloop();