From e0f2670c32b36216fa3b31f487659f70197340d6 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Fri, 9 Jun 2023 09:22:26 +0200 Subject: [PATCH 1/6] rename peer_public_key to public_key and add dial request type --- solar/src/actors/network/connection.rs | 12 ++++-------- .../actors/network/connection_scheduler.rs | 19 +++++++++++-------- solar/src/actors/network/lan_discovery.rs | 7 ++----- 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/solar/src/actors/network/connection.rs b/solar/src/actors/network/connection.rs index bb8d885..aa74276 100644 --- a/solar/src/actors/network/connection.rs +++ b/solar/src/actors/network/connection.rs @@ -23,7 +23,7 @@ pub enum TcpConnection { /// The address of a remote peer. addr: String, /// The public key of a remote peer. - peer_public_key: ed25519::PublicKey, + public_key: ed25519::PublicKey, }, /// An inbound TCP connection. Listen { stream: TcpStream }, @@ -151,13 +151,10 @@ pub async fn actor_inner( // Handle a TCP connection event (inbound or outbound). let (stream, handshake) = match connection { // Handle an outbound TCP connection event. - TcpConnection::Dial { - addr, - peer_public_key, - } => { + TcpConnection::Dial { addr, public_key } => { // Update the data associated with this connection. connection_data.peer_addr = Some(addr.to_owned()); - connection_data.peer_public_key = Some(peer_public_key); + connection_data.peer_public_key = Some(public_key); // Send 'connecting' connection event message via the broker. ch_broker @@ -180,8 +177,7 @@ pub async fn actor_inner( // Attempt a secret handshake. let handshake = - handshake_client(&mut stream, network_key.to_owned(), pk, sk, peer_public_key) - .await?; + handshake_client(&mut stream, network_key.to_owned(), pk, sk, public_key).await?; info!("💃 connected to peer {}", handshake.peer_pk.to_ssb_id()); diff --git a/solar/src/actors/network/connection_scheduler.rs b/solar/src/actors/network/connection_scheduler.rs index 601486b..d155dd7 100644 --- a/solar/src/actors/network/connection_scheduler.rs +++ b/solar/src/actors/network/connection_scheduler.rs @@ -32,6 +32,9 @@ use crate::{ Result, }; +/// A request to dial the peer identified by the given public key. +pub struct DialRequest(pub PublicKey); + #[derive(Debug)] struct ConnectionScheduler { /// Peers with whom the last connection attempt was successful. @@ -148,11 +151,11 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) eager_tick = eager_ticker.next() => { if let Some(_tick) = eager_tick { // Pop a peer from the queue of eager peers. - if let Some((peer_public_key, addr)) = scheduler.eager_peers.pop_front() { + if let Some((public_key, addr)) = scheduler.eager_peers.pop_front() { // Check if we're already connected to this peer. If so, // push them to the back of the eager queue. - if CONNECTION_MANAGER.read().await.contains_connected_peer(&peer_public_key) { - scheduler.eager_peers.push_back((peer_public_key, addr)) + if CONNECTION_MANAGER.read().await.contains_connected_peer(&public_key) { + scheduler.eager_peers.push_back((public_key, addr)) } else { // Otherwise, dial the peer. Broker::spawn(connection::actor( @@ -160,7 +163,7 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) SECRET_CONFIG.get().unwrap().to_owned_identity()?, TcpConnection::Dial { addr, - peer_public_key, + public_key, }, selective_replication, )); @@ -172,18 +175,18 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) lazy_tick = lazy_ticker.next() => { if let Some(_tick) = lazy_tick { // Pop a peer from the queue of lazy peers. - if let Some((peer_public_key, addr)) = scheduler.lazy_peers.pop_front() { + if let Some((public_key, addr)) = scheduler.lazy_peers.pop_front() { // Check if we're already connected to this peer. If so, // push them to the back of the eager queue. - if CONNECTION_MANAGER.read().await.contains_connected_peer(&peer_public_key) { - scheduler.eager_peers.push_back((peer_public_key, addr)) + if CONNECTION_MANAGER.read().await.contains_connected_peer(&public_key) { + scheduler.eager_peers.push_back((public_key, addr)) } else { // Otherwise, dial the peer. Broker::spawn(connection::actor( SECRET_CONFIG.get().unwrap().to_owned_identity()?, TcpConnection::Dial { addr, - peer_public_key, + public_key, }, selective_replication, )); diff --git a/solar/src/actors/network/lan_discovery.rs b/solar/src/actors/network/lan_discovery.rs index ff5be4f..f62999c 100644 --- a/solar/src/actors/network/lan_discovery.rs +++ b/solar/src/actors/network/lan_discovery.rs @@ -88,16 +88,13 @@ async fn process_broadcast( // Attempt to parse the IP / hostname, port and public key from the received // UDP broadcast message. - if let Some((server, port, peer_public_key)) = LanBroadcast::parse(&msg) { + if let Some((server, port, public_key)) = LanBroadcast::parse(&msg) { let addr = format!("{server}:{port}"); // Spawn a connection actor with the given connection parameters. Broker::spawn(connection::actor( server_id.clone(), - TcpConnection::Dial { - addr, - peer_public_key, - }, + TcpConnection::Dial { addr, public_key }, selective_replication, )); } else { From 8ffcf06e75b27d2d8762650ba75a426dfb0a2561 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Fri, 9 Jun 2023 09:22:45 +0200 Subject: [PATCH 2/6] add dialer actor --- solar/src/actors/network/dialer.rs | 77 ++++++++++++++++++++++++++++++ solar/src/actors/network/mod.rs | 1 + 2 files changed, 78 insertions(+) create mode 100644 solar/src/actors/network/dialer.rs diff --git a/solar/src/actors/network/dialer.rs b/solar/src/actors/network/dialer.rs new file mode 100644 index 0000000..47524ea --- /dev/null +++ b/solar/src/actors/network/dialer.rs @@ -0,0 +1,77 @@ +//! Dialer +//! +//! Dial requests are received from the connection scheduler via the broker +//! message bus. Each request includes the public key of the peer to be dialed. +//! Upon receiving a request, the dialer queries the address book to determine +//! the associated address for the peer and then spawns the connection actor. + +use futures::{select_biased, FutureExt, StreamExt}; +use kuska_ssb::crypto::ed25519::PublicKey; + +use crate::{ + actors::network::{connection, connection::TcpConnection, connection_scheduler::DialRequest}, + broker::{ActorEndpoint, Broker, BROKER}, + config::SECRET_CONFIG, + Result, +}; + +/// Start the dialer. +/// +/// Register the connection dialer with the broker (as an actor) and listen +/// for dial requests from the scheduler. Once received, use the attached +/// public key to lookup the outbound address from the address book and dial +/// the peer by spawning the connection actor. +pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) -> Result<()> { + // Register the connection dialer actor with the broker. + let ActorEndpoint { + ch_terminate, + ch_broker: _, + ch_msg, + actor_id: _, + .. + } = BROKER.lock().await.register("dialer", true).await?; + + // Fuse internal termination channel with external channel. + // This allows termination of the dialer loop to be initiated from + // outside this function. + let mut ch_terminate_fuse = ch_terminate.fuse(); + + let mut broker_msg_ch = ch_msg.unwrap(); + + // Listen for dial-peer events via the broker message bus, lookup + // the associated address and dial the peer. + loop { + select_biased! { + // Received termination signal. Break out of the loop. + _value = ch_terminate_fuse => { + break; + }, + // Received a message from the connection scheduler via the broker. + msg = broker_msg_ch.next().fuse() => { + if let Some(msg) = msg { + if let Some(dial_request) = msg.downcast_ref::() { + match dial_request { + DialRequest(public_key) => { + // Retrieve the address associated with this key. + if let addr = AddressBook::get(public_key) { + // Spawn the connection actor. + Broker::spawn(connection::actor( + SECRET_CONFIG.get().unwrap().to_owned_identity()?, + TcpConnection::Dial { + addr, + public_key, + }, + selective_replication, + )); + } + } + _ => (), + } + } + } + } + } + } + + Ok(()) +} diff --git a/solar/src/actors/network/mod.rs b/solar/src/actors/network/mod.rs index e602957..d1bcfdf 100644 --- a/solar/src/actors/network/mod.rs +++ b/solar/src/actors/network/mod.rs @@ -2,5 +2,6 @@ pub mod config; pub mod connection; pub mod connection_manager; pub mod connection_scheduler; +pub mod dialer; pub mod lan_discovery; pub mod tcp_server; From 5a695edea0ce4638ae3f6b25ea1a48b87fb804a4 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Mon, 12 Jun 2023 13:58:18 +0200 Subject: [PATCH 3/6] emit dial requests instead of dialing peers directly --- .../actors/network/connection_scheduler.rs | 92 ++++++++++--------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/solar/src/actors/network/connection_scheduler.rs b/solar/src/actors/network/connection_scheduler.rs index d155dd7..6d1c618 100644 --- a/solar/src/actors/network/connection_scheduler.rs +++ b/solar/src/actors/network/connection_scheduler.rs @@ -2,7 +2,8 @@ //! //! Peers to be dialed are added to the scheduler, with the SSB public key and an address being //! provided for each one. These peers are initially placed into an "eager" queue by the scheduler. -//! Each peer is dialed, one by one, with a delay of x seconds between each dial attempt. +//! A dial request is emitted for each peer in the "eager" queue, one by one, with a delay of 5 +//! seconds between each request. //! //! If the connection and handshake are successful, the peer is pushed to the back of the "eager" //! queue once the connection is complete. @@ -10,30 +11,48 @@ //! If the connection or handshake are unsuccessful, the peer is pushed to the back of the "lazy" //! queue once the connection is complete. //! -//! Each peer in the "lazy" queue is dialed, one by one, with a delay of x * 10 seconds between -//! each dial attempt. +//! A dial request is emitted for each peer in the "lazy" queue, one by one, with a delay of 61 +//! seconds between each request. //! //! The success or failure of each dial attempt is determined by listening to connection events from //! the connection manager. This allows peers to be moved between queues when required. -use std::{collections::VecDeque, time::Duration}; +use std::{collections::VecDeque, fmt::Display, time::Duration}; use async_std::stream; -use futures::{select_biased, stream::StreamExt, FutureExt}; -use kuska_ssb::crypto::ed25519::PublicKey; +use futures::{select_biased, stream::StreamExt, FutureExt, SinkExt}; +use kuska_ssb::crypto::{ed25519::PublicKey, ToSsbId}; +use log::debug; use crate::{ - actors::network::{ - connection, - connection::TcpConnection, - connection_manager::{ConnectionEvent, CONNECTION_MANAGER}, - }, - broker::{ActorEndpoint, Broker, BROKER}, - config::SECRET_CONFIG, + actors::network::connection_manager::{ConnectionEvent, CONNECTION_MANAGER}, + broker::{ActorEndpoint, BrokerEvent, Destination, BROKER}, Result, }; -/// A request to dial the peer identified by the given public key. -pub struct DialRequest(pub PublicKey); +/// A request to dial the peer identified by the given public key and address. +pub struct DialRequest(pub (PublicKey, String)); + +// Custom `Display` implementation so we can easily log dial requests. +impl Display for DialRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let (peer_public_key, peer_addr) = match &self { + DialRequest((key, addr)) => { + let ssb_id = key.to_ssb_id(); + let peer_public_key = if ssb_id.starts_with('@') { + ssb_id + } else { + format!("@{}", ssb_id) + }; + + let peer_addr = addr.to_string(); + + (peer_public_key, peer_addr) + } + }; + + write!(f, "", peer_public_key, peer_addr) + } +} #[derive(Debug)] struct ConnectionScheduler { @@ -47,7 +66,7 @@ struct ConnectionScheduler { /// Defaults to 5 seconds. eager_interval: Duration, /// The interval in seconds between dial attempts for lazy peers. - /// Defaults to 60 seconds. + /// Defaults to 61 seconds. lazy_interval: Duration, } @@ -57,7 +76,7 @@ impl Default for ConnectionScheduler { eager_peers: VecDeque::new(), lazy_peers: VecDeque::new(), eager_interval: Duration::from_secs(5), - lazy_interval: Duration::from_secs(60), + lazy_interval: Duration::from_secs(61), } } } @@ -99,14 +118,14 @@ impl ConnectionScheduler { /// Start the connection scheduler. /// /// Register the connection scheduler with the broker (as an actor), start -/// the eager and lazy dialers and listen for connection events emitted by -/// the connection manager. Update the eager and lazy peer queues according -/// to connection outcomes. -pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) -> Result<()> { +/// the eager and lazy dial request emitters and listen for connection events +/// emitted by the connection manager. Update the eager and lazy peer queues +/// according to connection outcomes. +pub async fn actor(peers: Vec<(PublicKey, String)>) -> Result<()> { // Register the connection scheduler actor with the broker. let ActorEndpoint { ch_terminate, - ch_broker: _, + mut ch_broker, ch_msg, actor_id: _, .. @@ -128,7 +147,7 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) // Create the tickers (aka. metronomes) which will emit messages at // the predetermined interval. These tickers control the rates at which - // we dial peers. + // we emit dial requests for peers. let mut eager_ticker = stream::interval(scheduler.eager_interval).fuse(); let mut lazy_ticker = stream::interval(scheduler.lazy_interval).fuse(); @@ -157,16 +176,10 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) if CONNECTION_MANAGER.read().await.contains_connected_peer(&public_key) { scheduler.eager_peers.push_back((public_key, addr)) } else { - // Otherwise, dial the peer. - Broker::spawn(connection::actor( - // TODO: make this neater once config-sharing story has improved. - SECRET_CONFIG.get().unwrap().to_owned_identity()?, - TcpConnection::Dial { - addr, - public_key, - }, - selective_replication, - )); + // Otherwise, send a dial request to the dialer. + let dial_request = DialRequest((public_key, addr)); + debug!("{}", dial_request); + ch_broker.send(BrokerEvent::new(Destination::Broadcast, dial_request)).await? } } } @@ -181,15 +194,10 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) if CONNECTION_MANAGER.read().await.contains_connected_peer(&public_key) { scheduler.eager_peers.push_back((public_key, addr)) } else { - // Otherwise, dial the peer. - Broker::spawn(connection::actor( - SECRET_CONFIG.get().unwrap().to_owned_identity()?, - TcpConnection::Dial { - addr, - public_key, - }, - selective_replication, - )); + // Otherwise, send a dial request to the dialer. + let dial_request = DialRequest((public_key, addr)); + debug!("{}", dial_request); + ch_broker.send(BrokerEvent::new(Destination::Broadcast, dial_request)).await? } } } From 38f2814c7c8ab6a95ad1ad849efe2d350d962c7f Mon Sep 17 00:00:00 2001 From: mycognosist Date: Mon, 12 Jun 2023 13:59:20 +0200 Subject: [PATCH 4/6] fix dial request matching and update actor signature --- solar/src/actors/network/dialer.rs | 41 +++++++++++++----------------- 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/solar/src/actors/network/dialer.rs b/solar/src/actors/network/dialer.rs index 47524ea..e30f4b6 100644 --- a/solar/src/actors/network/dialer.rs +++ b/solar/src/actors/network/dialer.rs @@ -1,12 +1,9 @@ //! Dialer //! //! Dial requests are received from the connection scheduler via the broker -//! message bus. Each request includes the public key of the peer to be dialed. -//! Upon receiving a request, the dialer queries the address book to determine -//! the associated address for the peer and then spawns the connection actor. - +//! message bus. Each request includes the public key and address of the peer +//! to be dialed. Upon receiving a request, the dialer spawns the connection actor. use futures::{select_biased, FutureExt, StreamExt}; -use kuska_ssb::crypto::ed25519::PublicKey; use crate::{ actors::network::{connection, connection::TcpConnection, connection_scheduler::DialRequest}, @@ -19,9 +16,9 @@ use crate::{ /// /// Register the connection dialer with the broker (as an actor) and listen /// for dial requests from the scheduler. Once received, use the attached -/// public key to lookup the outbound address from the address book and dial -/// the peer by spawning the connection actor. -pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) -> Result<()> { +/// public key and outbound address to dial the peer by spawning the connection +/// actor. +pub async fn actor(selective_replication: bool) -> Result<()> { // Register the connection dialer actor with the broker. let ActorEndpoint { ch_terminate, @@ -38,8 +35,7 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) let mut broker_msg_ch = ch_msg.unwrap(); - // Listen for dial-peer events via the broker message bus, lookup - // the associated address and dial the peer. + // Listen for dial request events via the broker message bus and dial peers. loop { select_biased! { // Received termination signal. Break out of the loop. @@ -49,23 +45,20 @@ pub async fn actor(peers: Vec<(PublicKey, String)>, selective_replication: bool) // Received a message from the connection scheduler via the broker. msg = broker_msg_ch.next().fuse() => { if let Some(msg) = msg { + // TODO: see if it's possible to downcast (ie. no ref). if let Some(dial_request) = msg.downcast_ref::() { match dial_request { - DialRequest(public_key) => { - // Retrieve the address associated with this key. - if let addr = AddressBook::get(public_key) { - // Spawn the connection actor. - Broker::spawn(connection::actor( - SECRET_CONFIG.get().unwrap().to_owned_identity()?, - TcpConnection::Dial { - addr, - public_key, - }, - selective_replication, - )); - } + DialRequest((public_key, addr)) => { + // Spawn the connection actor. + Broker::spawn(connection::actor( + SECRET_CONFIG.get().unwrap().to_owned_identity()?, + TcpConnection::Dial { + addr: addr.to_string(), + public_key: *public_key, + }, + selective_replication, + )); } - _ => (), } } } From 8cccb7063b3d719f73699dab22cef0bf1690980a Mon Sep 17 00:00:00 2001 From: mycognosist Date: Mon, 12 Jun 2023 13:59:49 +0200 Subject: [PATCH 5/6] spawn the dialer actor in the node --- solar/src/node.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/solar/src/node.rs b/solar/src/node.rs index 23112ce..8ef0463 100644 --- a/solar/src/node.rs +++ b/solar/src/node.rs @@ -9,7 +9,8 @@ use crate::{ actors::{ jsonrpc, network::{ - connection_manager::CONNECTION_MANAGER, connection_scheduler, lan_discovery, tcp_server, + connection_manager::CONNECTION_MANAGER, connection_scheduler, dialer, lan_discovery, + tcp_server, }, }, broker::*, @@ -120,12 +121,14 @@ impl Node { // Add any connection details supplied via the `--connect` CLI option. peers_to_dial.extend(config.network.connect); - // Spawn the connection scheduler actor. Dials remote peers on an - // ongoing basis (at `eager` or `lazy` intervals). - Broker::spawn(connection_scheduler::actor( - peers_to_dial, - config.replication.selective, - )); + // Spawn the connection dialer actor. Dials remote peers as dial + // requests are received from the connection scheduler. + Broker::spawn(dialer::actor(config.replication.selective)); + + // Spawn the connection scheduler actor. Sends dial requests to the + // dialer for remote peers on an ongoing basis (at `eager` or `lazy` + // intervals). + Broker::spawn(connection_scheduler::actor(peers_to_dial)); // Spawn the connection manager message loop. let connection_manager_msgloop = CONNECTION_MANAGER.write().await.take_msgloop(); From 3954a752ab224350aff1b91f7df01e4a7943ab9b Mon Sep 17 00:00:00 2001 From: mycognosist Date: Mon, 12 Jun 2023 14:10:04 +0200 Subject: [PATCH 6/6] make dial request matching more concise --- solar/src/actors/network/dialer.rs | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/solar/src/actors/network/dialer.rs b/solar/src/actors/network/dialer.rs index e30f4b6..2e91100 100644 --- a/solar/src/actors/network/dialer.rs +++ b/solar/src/actors/network/dialer.rs @@ -45,21 +45,15 @@ pub async fn actor(selective_replication: bool) -> Result<()> { // Received a message from the connection scheduler via the broker. msg = broker_msg_ch.next().fuse() => { if let Some(msg) = msg { - // TODO: see if it's possible to downcast (ie. no ref). - if let Some(dial_request) = msg.downcast_ref::() { - match dial_request { - DialRequest((public_key, addr)) => { - // Spawn the connection actor. - Broker::spawn(connection::actor( - SECRET_CONFIG.get().unwrap().to_owned_identity()?, - TcpConnection::Dial { - addr: addr.to_string(), - public_key: *public_key, - }, - selective_replication, - )); - } - } + if let Some(DialRequest((public_key, addr))) = msg.downcast_ref::() { + Broker::spawn(connection::actor( + SECRET_CONFIG.get().unwrap().to_owned_identity()?, + TcpConnection::Dial { + addr: addr.to_string(), + public_key: *public_key, + }, + selective_replication, + )); } } }