diff --git a/src/actors/connection_manager.rs b/src/actors/connection_manager.rs new file mode 100644 index 0000000..ecfd2fb --- /dev/null +++ b/src/actors/connection_manager.rs @@ -0,0 +1,298 @@ +use std::collections::HashSet; + +use async_std::{ + sync::{Arc, RwLock}, + task, + task::JoinHandle, +}; +use futures::{select_biased, stream::StreamExt, FutureExt}; +use kuska_ssb::crypto::ed25519; +use log::trace; +use once_cell::sync::Lazy; + +use crate::{ActorEndpoint, BROKER}; + +/* +/// All possible errors while negotiating connections. +pub enum ConnectionError { + Io(std::io::Error), + Handshake, +} + +/// Peer connection data. +pub struct Connection { + /// Peer data. + peer: PeerData, + + /// Connection state. + state: ConnectionState, + + /// Connection identifier. + id: usize, +} + +impl Connection { + pub fn new() { + } +} +*/ + +/// Connection events. The `usize` represents the connection ID. +#[derive(Debug)] +pub enum ConnectionEvent { + Connecting(usize), + Handshaking(usize), + Connected(usize), + Replicating(usize), + Disconnecting(usize), + Disconnected(usize), + // TODO: use `ConnectionError` instead of `String`. + Error(usize, String), +} + +/// Connection manager (broker). +#[derive(Debug)] +pub struct ConnectionManager { + /// The public keys of all peers to whom we are currently connected. + pub connected_peers: HashSet, + /// Idle connection timeout limit. + pub idle_timeout_limit: u8, + /// ID number of the most recently registered connection. + last_connection_id: usize, + /// Message loop handle. + msgloop: Option>, + // TODO: keep a list of active connections. + // Then we can query total active connections using `.len()`. + //active_connections: HashSet, +} + +/// The connection manager for the solar node. +pub static CONNECTION_MANAGER: Lazy>> = + Lazy::new(|| Arc::new(RwLock::new(ConnectionManager::new()))); + +impl ConnectionManager { + /// Instantiate a new `ConnectionManager`. + pub fn new() -> Self { + // Spawn the connection event message loop. + let msgloop = task::spawn(Self::msg_loop()); + + Self { + last_connection_id: 0, + msgloop: Some(msgloop), + idle_timeout_limit: 30, + connected_peers: HashSet::new(), + } + } + + /// Query the number of active peer connections. + pub fn _count_connections(&self) -> usize { + self.connected_peers.len() + } + + /// Query whether the list of connected peers contains the given peer. + /// Returns `true` if the peer is in the list, otherwise a `false` value is + /// returned. + pub fn contains_connected_peer(&self, peer_id: &ed25519::PublicKey) -> bool { + self.connected_peers.contains(peer_id) + } + + /// Add a peer to the list of connected peers. + /// Returns `true` if the peer was not already in the list, otherwise a + /// `false` value is returned. + pub fn insert_connected_peer(&mut self, peer_id: ed25519::PublicKey) -> bool { + self.connected_peers.insert(peer_id) + } + + /// Remove a peer from the list of connected peers. + /// Returns `true` if the peer was in the list, otherwise a `false` value + /// is returned. + pub fn remove_connected_peer(&mut self, peer_id: ed25519::PublicKey) -> bool { + self.connected_peers.remove(&peer_id) + } + + /// Return a handle for the connection event message loop. + pub fn take_msgloop(&mut self) -> JoinHandle<()> { + self.msgloop.take().unwrap() + } + + /// Register a new connection with the connection manager. + pub fn register(&mut self) -> usize { + // Increment the last connection ID value. + self.last_connection_id += 1; + + trace!(target: "connection-manager", "registered new connection: {}", self.last_connection_id); + + self.last_connection_id + } + + /// Start the connection manager event loop. + /// + /// Listen for connection event messages via the broker and update + /// connection state accordingly. + pub async fn msg_loop() { + // Register the connection manager actor with the broker. + let ActorEndpoint { + ch_terminate, + ch_broker: _, + ch_msg, + actor_id: _, + .. + } = BROKER + .lock() + .await + .register("connection-manager", true) + .await + .unwrap(); + + // Fuse internal termination channel with external channel. + // This allows termination of the peer loop to be initiated from outside + // this function. + let mut ch_terminate_fuse = ch_terminate.fuse(); + + let mut broker_msg_ch = ch_msg.unwrap(); + + // Listen for connection events via the broker message bus. + loop { + select_biased! { + _value = ch_terminate_fuse => { + break; + }, + msg = broker_msg_ch.next().fuse() => { + if let Some(msg) = msg { + if let Some(conn_event) = msg.downcast_ref::() { + match conn_event { + ConnectionEvent::Connecting(id) => { + trace!(target: "connection-manager", "connecting: {}", id); + } + ConnectionEvent::Handshaking(id) => { + trace!(target: "connection-manager", "handshaking: {}", id); + } + ConnectionEvent::Connected(id) => { + trace!(target: "connection-manager", "connected: {}", id); + } + ConnectionEvent::Replicating(id) => { + trace!(target: "connection-manager", "replicating: {}", id); + } + ConnectionEvent::Disconnecting(id) => { + trace!(target: "connection-manager", "disconnecting: {}", id); + } + ConnectionEvent::Disconnected(id) => { + trace!(target: "connection-manager", "disconnected: {}", id); + } + ConnectionEvent::Error(id, err) => { + trace!(target: "connection-manager", "error: {}: {}", id, err); + } + } + } + } + }, + }; + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::{config::SecretConfig, Result}; + + // A helper function to instantiate a new connection manager for each test. + // + // If we don't use this approach and simply use CONNECTION_MANAGER + // instead, we end up sharing state across tests (since they all end up + // operating on the same instance). + fn instantiate_new_connection_manager() -> Lazy>> { + Lazy::new(|| Arc::new(RwLock::new(ConnectionManager::new()))) + } + + #[async_std::test] + async fn test_connection_manager_defaults() -> Result<()> { + let connection_manager = instantiate_new_connection_manager(); + + let connection_idle_timeout_limit = connection_manager.read().await.idle_timeout_limit; + assert_eq!(connection_idle_timeout_limit, 30); + + let last_connection_id = connection_manager.read().await.last_connection_id; + assert_eq!(last_connection_id, 0); + + let msgloop = &connection_manager.read().await.msgloop; + assert!(msgloop.is_some()); + + let connected_peers = &connection_manager.read().await.connected_peers; + assert!(connected_peers.is_empty()); + + Ok(()) + } + + #[async_std::test] + async fn test_register_new_connection() -> Result<()> { + let connection_manager = instantiate_new_connection_manager(); + + for i in 1..=4 { + // Register a new connection. + let connection_id = connection_manager.write().await.register(); + + // Ensure the connection ID is incremented for each new connection. + assert_eq!(connection_id, i as usize); + } + + Ok(()) + } + + #[async_std::test] + async fn test_count_connections() -> Result<()> { + let connection_manager = instantiate_new_connection_manager(); + + let active_connections = connection_manager.read().await._count_connections(); + assert_eq!(active_connections, 0); + + Ok(()) + } + + #[async_std::test] + async fn test_connected_peers() -> Result<()> { + let connection_manager = instantiate_new_connection_manager(); + + // Create a unique keypair to sign messages. + let keypair = SecretConfig::create().owned_identity().unwrap(); + + // Insert a new connected peer. + let insert_result = connection_manager + .write() + .await + .insert_connected_peer(keypair.pk); + assert_eq!(insert_result, true); + + // Query the list of connected peers. + let query_result = connection_manager + .read() + .await + .contains_connected_peer(&keypair.pk); + assert_eq!(query_result, true); + + // Attempt to insert the same peer ID for a second time. + let reinsert_result = connection_manager + .write() + .await + .insert_connected_peer(keypair.pk); + assert_eq!(reinsert_result, false); + + // Count the active connections. + let connections = connection_manager.read().await._count_connections(); + assert_eq!(connections, 1); + + // Remove a peer from the list of connected peers. + let remove_result = connection_manager + .write() + .await + .remove_connected_peer(keypair.pk); + assert_eq!(remove_result, true); + + // Count the active connections. + let conns = connection_manager.read().await._count_connections(); + assert_eq!(conns, 0); + + Ok(()) + } +} diff --git a/src/actors/mod.rs b/src/actors/mod.rs index 8f4b5c4..1562870 100644 --- a/src/actors/mod.rs +++ b/src/actors/mod.rs @@ -1,3 +1,4 @@ +pub mod connection_manager; pub mod ctrlc; pub mod jsonrpc_server; pub mod lan_discovery; diff --git a/src/actors/peer.rs b/src/actors/peer.rs index 814aea0..8fb8a91 100644 --- a/src/actors/peer.rs +++ b/src/actors/peer.rs @@ -1,9 +1,8 @@ -use std::{collections::HashSet, net::Shutdown, time::Duration}; +use std::{net::Shutdown, time::Duration}; use async_std::{ io::{Read, Write}, net::TcpStream, - sync::{Arc, RwLock}, task, }; use futures::{pin_mut, select_biased, stream::StreamExt, FutureExt, SinkExt}; @@ -18,12 +17,14 @@ use kuska_ssb::{ rpc::{RpcReader, RpcWriter}, }; use log::{error, info, trace, warn}; -use once_cell::sync::Lazy; use crate::{ - actors::rpc::{ - BlobsGetHandler, BlobsWantsHandler, GetHandler, HistoryStreamHandler, RpcHandler, RpcInput, - WhoAmIHandler, + actors::{ + connection_manager::{ConnectionEvent, CONNECTION_MANAGER}, + rpc::{ + BlobsGetHandler, BlobsWantsHandler, GetHandler, HistoryStreamHandler, RpcHandler, + RpcInput, WhoAmIHandler, + }, }, broker::*, config::{NETWORK_KEY, REPLICATION_CONFIG}, @@ -41,15 +42,39 @@ pub enum Connect { }, } -/// A list (`HashSet`) of public keys representing peers to whom we are -/// currently connected. -pub static CONNECTED_PEERS: Lazy>>> = - Lazy::new(|| Arc::new(RwLock::new(HashSet::new()))); - pub async fn actor(id: OwnedIdentity, connect: Connect, selective_replication: bool) -> Result<()> { - if let Err(err) = actor_inner(id, connect, selective_replication).await { + // Register a new connection with the connection manager. + let connection_id = CONNECTION_MANAGER.write().await.register(); + + // Set the connection idle timeout limit according to the connection + // manager configuration. This value is used to break out of the peer loop + // after n consecutive idle seconds. + let connection_idle_timeout_limit = CONNECTION_MANAGER.read().await.idle_timeout_limit; + + // Catch any errors which occur during the peer connection and replication. + if let Err(err) = actor_inner( + id, + connect, + selective_replication, + connection_id, + connection_idle_timeout_limit, + ) + .await + { warn!("peer failed: {:?}", err); + + let mut ch_broker = BROKER.lock().await.create_sender(); + + // Send 'error' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Error(connection_id, err.to_string()), + )) + .await + .unwrap(); } + Ok(()) } @@ -60,44 +85,107 @@ pub async fn actor_inner( id: OwnedIdentity, connect: Connect, selective_replication: bool, -) -> Result<()> { + connection_id: usize, + connection_idle_timeout_limit: u8, +) -> Result { + // Register the "peer" actor endpoint with the broker. + let ActorEndpoint { + ch_terminate, + mut ch_broker, + ch_msg, + actor_id, + .. + } = BROKER.lock().await.register("peer", true).await?; + // Parse the public key and secret key from the identity. let OwnedIdentity { pk, sk, .. } = id; // Define the network key to be used for the secret handshake. let network_key = NETWORK_KEY.get().unwrap().to_owned(); + // Send 'connecting' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Connecting(connection_id), + )) + .await + .unwrap(); + // Handle a TCP connection event (inbound or outbound). let (stream, handshake) = match connect { - // Handle an outgoing TCP connection event. + // Handle an outbound TCP connection event. Connect::TcpServer { server, port, peer_pk, } => { + // TODO: move this check into the scheduler. + // // First check if we are already connected to the selected peer. // If yes, return immediately. // If no, continue with the connection attempt. - if CONNECTED_PEERS.read().await.contains(&peer_pk) { - return Ok(()); + if CONNECTION_MANAGER + .read() + .await + .contains_connected_peer(&peer_pk) + { + return Ok(connection_id); } // Define the server address and port. let server_port = format!("{server}:{port}"); // Attempt a TCP connection. let mut stream = TcpStream::connect(server_port).await?; + + // Send 'handshaking' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Handshaking(connection_id), + )) + .await + .unwrap(); + // Attempt a secret handshake. let handshake = handshake_client(&mut stream, network_key, pk, sk, peer_pk).await?; info!("💃 connected to peer {}", handshake.peer_pk.to_ssb_id()); + // Send 'connected' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Connected(connection_id), + )) + .await + .unwrap(); + (stream, handshake) } // Handle an incoming TCP connection event. Connect::ClientStream { mut stream } => { + // Send 'handshaking' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Handshaking(connection_id), + )) + .await + .unwrap(); + // Attempt a secret handshake. let handshake = handshake_server(&mut stream, network_key, pk, sk).await?; + // Send 'connected' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Connected(connection_id), + )) + .await + .unwrap(); + // Convert the public key to a `String`. let ssb_id = handshake.peer_pk.to_ssb_id(); @@ -111,10 +199,26 @@ pub async fn actor_inner( // Check if we are already connected to the selected peer. // If yes, return immediately. // If no, return the stream and handshake. - if CONNECTED_PEERS.read().await.contains(&handshake.peer_pk) { + if CONNECTION_MANAGER + .read() + .await + .contains_connected_peer(&handshake.peer_pk) + { info!("peer {} is already connected", &peer_pk); - return Ok(()); + // Since we already have an active connection to this peer, + // we can disconnect the redundant connection. + + // Send 'disconnecting' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Disconnecting(connection_id), + )) + .await + .unwrap(); + + return Ok(connection_id); } info!("💃 received connection from peer {}", &peer_pk); @@ -134,31 +238,43 @@ pub async fn actor_inner( peer_pk ); + // Send connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Disconnecting(connection_id), + )) + .await + .unwrap(); + // This may not be necessary; the connection should close when // the stream is dropped. stream.shutdown(Shutdown::Both)?; - return Ok(()); + return Ok(connection_id); } (stream, handshake) } }; - // Register the "peer" actor endpoint with the broker. - let ActorEndpoint { - ch_terminate, - mut ch_broker, - ch_msg, - actor_id, - .. - } = BROKER.lock().await.register("peer", true).await?; - // Parse the peer public key from the handshake. let peer_pk = handshake.peer_pk; // Add the peer to the list of connected peers. - CONNECTED_PEERS.write().await.insert(peer_pk); + CONNECTION_MANAGER + .write() + .await + .insert_connected_peer(peer_pk); + + // Send 'replicating' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Replicating(connection_id), + )) + .await + .unwrap(); // Spawn the peer loop (responsible for negotiating RPC requests). let res = peer_loop( @@ -168,21 +284,45 @@ pub async fn actor_inner( handshake, ch_terminate, ch_msg.unwrap(), + connection_idle_timeout_limit, ) .await; // Remove the peer from the list of connected peers. - CONNECTED_PEERS.write().await.remove(&peer_pk); + CONNECTION_MANAGER + .write() + .await + .remove_connected_peer(peer_pk); if let Err(err) = res { warn!("💀 client terminated with error {:?}", err); + + // TODO: Use the `ConnectionError` as the type for `err`. + // + // Send 'error' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Error(connection_id, err.to_string()), + )) + .await + .unwrap(); } else { info!("👋 finished connection with {}", &peer_pk.to_ssb_id()); + + // Send 'disconnected' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Disconnected(connection_id), + )) + .await + .unwrap(); } let _ = ch_broker.send(BrokerEvent::Disconnect { actor_id }).await; - Ok(()) + Ok(connection_id) } async fn peer_loop( @@ -192,6 +332,7 @@ async fn peer_loop Result<()> { // Parse the peer public key from the handshake. let peer_ssb_id = handshake.peer_pk.to_ssb_id(); @@ -231,6 +372,16 @@ async fn peer_loop { + // Reset the timer counter. + timer_counter = 0; let (rpc_id, packet) = packet; - RpcInput::Network(rpc_id,packet) + RpcInput::Network(rpc_id, packet) }, msg = ch_msg.next().fuse() => { + // Reset the timer counter. + timer_counter = 0; if let Some(msg) = msg { RpcInput::Message(msg) } else { @@ -251,8 +406,16 @@ async fn peer_loop { - RpcInput::Timer - }, + // Break out of the peer loop if the connection idle timeout + // limit has been reached. + if timer_counter >= connection_idle_timeout_limit { + break + } else { + // Increment the timer counter. + timer_counter += 1; + RpcInput::Timer + } + } }; let mut handled = false; @@ -270,9 +433,11 @@ async fn peer_loop Ok(false), } } - // Handle an outgoing MUXRPC response. + // Handle an incoming MUXRPC response. RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => { self.recv_rpc_response(api, ch_broker, *req_no, res).await } - // Handle an outgoing MUXRPC 'cancel stream' response. + // Handle an incoming MUXRPC 'cancel stream' response. RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamRespose()) => { self.recv_cancelstream(api, *req_no).await } - // Handle an outgoing MUXRPC error response. + // Handle an incoming MUXRPC error response. RpcInput::Network(req_no, rpc::RecvMsg::ErrorResponse(err)) => { self.recv_error_response(api, *req_no, err).await } diff --git a/src/main.rs b/src/main.rs index e8c8c53..c411098 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ mod config; mod error; mod storage; +use actors::connection_manager::CONNECTION_MANAGER; use broker::*; use config::ApplicationConfig; use storage::{blob::BlobStorage, kv::KvStorage}; @@ -104,9 +105,13 @@ async fn main() -> Result<()> { )); } + // Spawn the connection manager message loop. + let connection_manager_msgloop = CONNECTION_MANAGER.write().await.take_msgloop(); + connection_manager_msgloop.await; + // Spawn the broker message loop. - let msgloop = BROKER.lock().await.take_msgloop(); - msgloop.await; + let broker_msgloop = BROKER.lock().await.take_msgloop(); + broker_msgloop.await; println!("Gracefully finished"); diff --git a/src/storage/kv.rs b/src/storage/kv.rs index 503bc93..3724bf3 100644 --- a/src/storage/kv.rs +++ b/src/storage/kv.rs @@ -488,6 +488,12 @@ mod test { // and signed message. assert_eq!(msg_val.unwrap(), msg_2_clone); + // Get all messages comprising the feed. + let feed = kv.get_feed(&keypair.id).unwrap(); + + // Ensure that two messages are returned. + assert_eq!(feed.len(), 2); + Ok(()) }