From cdb56428abd9db324a1f90ba0f140182deecfa4a Mon Sep 17 00:00:00 2001 From: mycognosist Date: Fri, 13 Jan 2023 19:38:34 +0200 Subject: [PATCH 1/8] inital mapping with TODO comments --- src/actors/connection_manager.rs | 54 ++++++++++++++++++++++++++++++++ src/actors/peer.rs | 26 +++++++++++++++ src/actors/tcp_server.rs | 4 +++ 3 files changed, 84 insertions(+) create mode 100644 src/actors/connection_manager.rs diff --git a/src/actors/connection_manager.rs b/src/actors/connection_manager.rs new file mode 100644 index 0000000..450467e --- /dev/null +++ b/src/actors/connection_manager.rs @@ -0,0 +1,54 @@ +// We need to get data out of the `peer_loop` in the peer actor. +// How to achieve that? +// One option is to use a message passing channel. +// Should we use the existing broker messaging system? +// Or create a secondary event channel? + +// I think we should start by leveraging the existing system. + +// The peer loop sends a `BrokerEvent::Message` to the connection +// manager actor. The type of that message value should be `ConnectionEvent`. + +/// All possible errors while negotiating connections. +pub enum ConnectionError { + Io(std::io::Error), + Handshake, +} + +/// Connection data for a peer. +pub struct Connection { + /// Peer data. + peer: PeerData, + + /// Connection state. + state: ConnectionState, +} + +#[derive(Debug)] +/// The state of the connection. +pub enum ConnectionState { + Ready, + // Does the message passing approach negate the need for a stateful + // struct? At least in terms of the TcpConnection, TcpStream and + // HandshakeComplete? + Connecting(TcpConnection), + Connected(TcpStream, HandshakeComplete), + Handshaking, + Replicating, + Disconnecting(PublicKey), + Disconnected, + Finished, + Error(Option, Option), +} + +#[derive(Debug)] +/// Connection events. +pub enum ConnectionEvent { + Connecting, + Connected, + Handshaking, + Replicating, + Disconnecting, + Disconnected, + Error(ConnectionError), +} diff --git a/src/actors/peer.rs b/src/actors/peer.rs index 7de1e99..2b14994 100644 --- a/src/actors/peer.rs +++ b/src/actors/peer.rs @@ -67,6 +67,9 @@ pub async fn actor_inner( // Define the network key to be used for the secret handshake. let network_key = NETWORK_KEY.get().unwrap().to_owned(); + // TODO: pass message to the broker for the connection manager: + // ConnectionEvent::Connecting + // Handle a TCP connection event (inbound or outbound). let (stream, handshake) = match connect { // Handle an outgoing TCP connection event. @@ -86,6 +89,13 @@ pub async fn actor_inner( let server_port = format!("{}:{}", server, port); // Attempt a TCP connection. let mut stream = TcpStream::connect(server_port).await?; + + // TODO: pass message to the broker for the connection manager: + // ConnectionEvent::Connected + + // TODO: pass message to the broker for the connection manager: + // ConnectionEvent::Handshaking + // Attempt a secret handshake. let handshake = handshake_client(&mut stream, network_key, pk, sk, peer_pk).await?; @@ -95,6 +105,9 @@ pub async fn actor_inner( } // Handle an incoming TCP connection event. Connect::ClientStream { mut stream } => { + // TODO: pass message to the broker for the connection manager: + // ConnectionEvent::Handshaking + // Attempt a secret handshake. let handshake = handshake_server(&mut stream, network_key, pk, sk).await?; @@ -157,6 +170,9 @@ pub async fn actor_inner( // Parse the peer public key from the handshake. let peer_pk = handshake.peer_pk; + // TODO: + // Remember to change the placement of this. + // Add the peer to the list of connected peers. CONNECTED_PEERS.write().await.insert(peer_pk); @@ -176,8 +192,14 @@ pub async fn actor_inner( if let Err(err) = res { warn!("💀 client terminated with error {:?}", err); + + // TODO: pass message to the connection manager via the broker + // ConnectionEvent::Error(err) + // HINT: Use the `ConnectionError` as the type for `err`. } else { info!("👋 finished connection with {}", &peer_pk.to_ssb_id()); + // TODO: pass message to the connection manager via the broker + // ConnectionEvent::Disconnected } let _ = ch_broker.send(BrokerEvent::Disconnect { actor_id }).await; @@ -231,6 +253,10 @@ async fn peer_loop break, stream = incoming.next().fuse() => { + // TODO: pass message to the broker for the connection manager: + // ConnectionEvent::Connecting if let Some(stream) = stream { if let Ok(stream) = stream { + // TODO: pass message to the broker for the connection manager: + // ConnectionEvent::Connected Broker::spawn(super::peer::actor(server_id.clone(), super::peer::Connect::ClientStream{stream}, selective_replication)); } } else { From 4e887996219056e7a33164fbac35c2e1052dd490 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Mon, 30 Jan 2023 10:39:45 +0200 Subject: [PATCH 2/8] add basic connection manager with event logging --- src/actors/connection_manager.rs | 239 ++++++++++++++++++++++++++----- src/actors/mod.rs | 1 + src/actors/peer.rs | 161 ++++++++++++++++----- src/actors/rpc/history_stream.rs | 2 +- src/main.rs | 9 +- 5 files changed, 340 insertions(+), 72 deletions(-) diff --git a/src/actors/connection_manager.rs b/src/actors/connection_manager.rs index 450467e..27d088c 100644 --- a/src/actors/connection_manager.rs +++ b/src/actors/connection_manager.rs @@ -1,54 +1,227 @@ -// We need to get data out of the `peer_loop` in the peer actor. -// How to achieve that? -// One option is to use a message passing channel. -// Should we use the existing broker messaging system? -// Or create a secondary event channel? +use async_std::{sync::Mutex, task, task::JoinHandle}; +use futures::{select_biased, stream::StreamExt, FutureExt}; +use log::trace; +use once_cell::sync::Lazy; -// I think we should start by leveraging the existing system. - -// The peer loop sends a `BrokerEvent::Message` to the connection -// manager actor. The type of that message value should be `ConnectionEvent`. +use crate::{ActorEndpoint, BROKER}; +/* /// All possible errors while negotiating connections. pub enum ConnectionError { Io(std::io::Error), Handshake, } -/// Connection data for a peer. +/// Peer connection data. pub struct Connection { /// Peer data. peer: PeerData, /// Connection state. state: ConnectionState, + + /// Connection identifier. + id: usize, } -#[derive(Debug)] -/// The state of the connection. -pub enum ConnectionState { - Ready, - // Does the message passing approach negate the need for a stateful - // struct? At least in terms of the TcpConnection, TcpStream and - // HandshakeComplete? - Connecting(TcpConnection), - Connected(TcpStream, HandshakeComplete), - Handshaking, - Replicating, - Disconnecting(PublicKey), - Disconnected, - Finished, - Error(Option, Option), +impl Connection { + pub fn new() { + } } +*/ +/// Connection events. The `usize` represents the connection ID. #[derive(Debug)] -/// Connection events. pub enum ConnectionEvent { - Connecting, - Connected, - Handshaking, - Replicating, - Disconnecting, - Disconnected, - Error(ConnectionError), + Connecting(usize), + Handshaking(usize), + Connected(usize), + Replicating(usize), + Disconnecting(usize), + Disconnected(usize), + // TODO: use `ConnectionError` instead of `String`. + Error(usize, String), +} + +/// Connection manager (broker). +#[derive(Debug)] +pub struct ConnectionManager { + /// ID number of the most recently registered connection. + last_connection_id: usize, + /// Message loop handle. + msgloop: Option>, + //active_connections: usize, +} + +pub static CONNECTION_MANAGER: Lazy> = + Lazy::new(|| Mutex::new(ConnectionManager::new())); + +impl ConnectionManager { + /// Instantiate a new `ConnectionManager` instance. + pub fn new() -> Self { + // Spawn the connection event message loop. + let msgloop = task::spawn(Self::msg_loop()); + + Self { + last_connection_id: 0, + msgloop: Some(msgloop), + } + } + + /// Return a handle for the connection event message loop. + pub fn take_msgloop(&mut self) -> JoinHandle<()> { + self.msgloop.take().unwrap() + } + + /// Register a new connection with the connection manager. + pub fn register(&mut self) -> usize { + // Increment the last connection ID value. + self.last_connection_id += 1; + + trace!(target: "connection-manager", "registering new connection: {}", self.last_connection_id); + + self.last_connection_id + } + + /// Start the connection manager event loop. + /// + /// Listen for connection event messages via the broker and update + /// connection state accordingly. + pub async fn msg_loop() { + // Register the connection manager actor with the broker. + let ActorEndpoint { + ch_terminate, + ch_broker: _, + ch_msg, + actor_id: _, + .. + } = BROKER + .lock() + .await + .register("connection-manager", true) + .await + .unwrap(); + + // Fuse internal termination channel with external channel. + // This allows termination of the peer loop to be initiated from outside + // this function. + let mut ch_terminate_fuse = ch_terminate.fuse(); + + let mut broker_msg_ch = ch_msg.unwrap(); + + // Listen for connection events via the broker message bus. + loop { + select_biased! { + _value = ch_terminate_fuse => { + break; + }, + msg = broker_msg_ch.next().fuse() => { + if let Some(msg) = msg { + if let Some(conn_event) = msg.downcast_ref::() { + match conn_event { + ConnectionEvent::Connecting(id) => { + trace!(target: "connection-manager", "connecting: {}", id); + } + ConnectionEvent::Handshaking(id) => { + trace!(target: "connection-manager", "handshaking: {}", id); + } + ConnectionEvent::Connected(id) => { + trace!(target: "connection-manager", "connected: {}", id); + } + ConnectionEvent::Replicating(id) => { + trace!(target: "connection-manager", "replicating: {}", id); + } + ConnectionEvent::Disconnecting(id) => { + trace!(target: "connection-manager", "disconnecting: {}", id); + } + ConnectionEvent::Disconnected(id) => { + trace!(target: "connection-manager", "disconnected: {}", id); + } + ConnectionEvent::Error(id, err) => { + trace!(target: "connection-manager", "error: {}: {}", id, err); + } + } + } + } + }, + }; + } + } +} + +/* + +// LEFT-OVER CODE FROM ACTOR APPROCH + +// TODO: remove this code when certain of the msgloop approach used above + +pub async fn actor() -> Result<()> { + if let Err(err) = actor_inner().await { + warn!("connection manager failed: {}", err); + } + Ok(()) +} + +pub async fn actor_inner() -> Result<()> { + // Register the connection manager actor with the broker. + let ActorEndpoint { + ch_terminate, + ch_broker: _, + ch_msg, + actor_id: _, + .. + } = BROKER + .lock() + .await + .register("connection-manager", true) + .await?; + + let _connection_manager = ConnectionManager::new(); + + // Fuse internal termination channel with external channel. + // This allows termination of the peer loop to be initiated from outside + // this function. + let mut ch_terminate_fuse = ch_terminate.fuse(); + + let mut broker_msg_ch = ch_msg.unwrap(); + + // Listen for connection events via the broker message bus. + loop { + select_biased! { + _value = ch_terminate_fuse => { + break; + }, + msg = broker_msg_ch.next().fuse() => { + if let Some(msg) = msg { + if let Some(conn_event) = msg.downcast_ref::() { + match conn_event { + ConnectionEvent::Connecting(id) => { + trace!(target: "connection-manager", "Connecting: {}", id); + } + ConnectionEvent::Handshaking(id) => { + trace!(target: "connection-manager", "Handshaking: {}", id); + } + ConnectionEvent::Connected(id) => { + trace!(target: "connection-manager", "Connected: {}", id); + } ConnectionEvent::Replicating(id) => { + trace!(target: "connection-manager", "Replicating: {}", id); + } + ConnectionEvent::Disconnecting(id) => { + trace!(target: "connection-manager", "Disconnecting: {}", id); + } + ConnectionEvent::Disconnected(id) => { + trace!(target: "connection-manager", "Disconnected: {}", id); + } + ConnectionEvent::Error(id, err) => { + trace!(target: "connection-manager", "Error: {}: {}", id, err); + } + } + } + } + }, + }; + } + + Ok(()) } +*/ diff --git a/src/actors/mod.rs b/src/actors/mod.rs index 8f4b5c4..1562870 100644 --- a/src/actors/mod.rs +++ b/src/actors/mod.rs @@ -1,3 +1,4 @@ +pub mod connection_manager; pub mod ctrlc; pub mod jsonrpc_server; pub mod lan_discovery; diff --git a/src/actors/peer.rs b/src/actors/peer.rs index 2b14994..c45f32e 100644 --- a/src/actors/peer.rs +++ b/src/actors/peer.rs @@ -21,9 +21,12 @@ use log::{error, info, trace, warn}; use once_cell::sync::Lazy; use crate::{ - actors::rpc::{ - BlobsGetHandler, BlobsWantsHandler, GetHandler, HistoryStreamHandler, RpcHandler, RpcInput, - WhoAmIHandler, + actors::{ + connection_manager::{ConnectionEvent, CONNECTION_MANAGER}, + rpc::{ + BlobsGetHandler, BlobsWantsHandler, GetHandler, HistoryStreamHandler, RpcHandler, + RpcInput, WhoAmIHandler, + }, }, broker::*, config::{NETWORK_KEY, REPLICATION_CONFIG}, @@ -41,15 +44,32 @@ pub enum Connect { }, } +// TODO: move this to the connection manager /// A list (`HashSet`) of public keys representing peers to whom we are /// currently connected. pub static CONNECTED_PEERS: Lazy>>> = Lazy::new(|| Arc::new(RwLock::new(HashSet::new()))); pub async fn actor(id: OwnedIdentity, connect: Connect, selective_replication: bool) -> Result<()> { - if let Err(err) = actor_inner(id, connect, selective_replication).await { + // Register a new connection with the connection manager. + let connection_id = CONNECTION_MANAGER.lock().await.register(); + + // Catch any errors which occur during the peer connection and replication. + if let Err(err) = actor_inner(id, connect, selective_replication, connection_id).await { warn!("peer failed: {:?}", err); + + let mut ch_broker = BROKER.lock().await.create_sender(); + + // Send a connection event. + let connecting_msg = BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Error(connection_id, err.to_string()), + ); + + // Send connection event message via the broker. + ch_broker.send(connecting_msg).await.unwrap(); } + Ok(()) } @@ -60,19 +80,38 @@ pub async fn actor_inner( id: OwnedIdentity, connect: Connect, selective_replication: bool, -) -> Result<()> { + connection_id: usize, +) -> Result { + // Register the "peer" actor endpoint with the broker. + let ActorEndpoint { + ch_terminate, + mut ch_broker, + ch_msg, + actor_id, + .. + } = BROKER.lock().await.register("peer", true).await?; + // Parse the public key and secret key from the identity. let OwnedIdentity { pk, sk, .. } = id; // Define the network key to be used for the secret handshake. let network_key = NETWORK_KEY.get().unwrap().to_owned(); - // TODO: pass message to the broker for the connection manager: - // ConnectionEvent::Connecting + // TODO: think of ways to send a directed message instead of broadcast. + // All we need is the actor ID of the connection manager. + // + // Send a connection event. + let connecting_msg = BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Connecting(connection_id), + ); + + // Send connection event message via the broker. + ch_broker.send(connecting_msg).await.unwrap(); // Handle a TCP connection event (inbound or outbound). let (stream, handshake) = match connect { - // Handle an outgoing TCP connection event. + // Handle an outbound TCP connection event. Connect::TcpServer { server, port, @@ -82,7 +121,7 @@ pub async fn actor_inner( // If yes, return immediately. // If no, continue with the connection attempt. if CONNECTED_PEERS.read().await.contains(&peer_pk) { - return Ok(()); + return Ok(connection_id); } // Define the server address and port. @@ -90,27 +129,50 @@ pub async fn actor_inner( // Attempt a TCP connection. let mut stream = TcpStream::connect(server_port).await?; - // TODO: pass message to the broker for the connection manager: - // ConnectionEvent::Connected + let handshaking_msg = BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Handshaking(connection_id), + ); - // TODO: pass message to the broker for the connection manager: - // ConnectionEvent::Handshaking + // Send connection event message via the broker. + ch_broker.send(handshaking_msg).await.unwrap(); // Attempt a secret handshake. let handshake = handshake_client(&mut stream, network_key, pk, sk, peer_pk).await?; info!("💃 connected to peer {}", handshake.peer_pk.to_ssb_id()); + let connected_msg = BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Connected(connection_id), + ); + + // Send connection event message via the broker. + ch_broker.send(connected_msg).await.unwrap(); + (stream, handshake) } // Handle an incoming TCP connection event. Connect::ClientStream { mut stream } => { - // TODO: pass message to the broker for the connection manager: - // ConnectionEvent::Handshaking + let handshaking_msg = BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Handshaking(connection_id), + ); + + // Send connection event message via the broker. + ch_broker.send(handshaking_msg).await.unwrap(); // Attempt a secret handshake. let handshake = handshake_server(&mut stream, network_key, pk, sk).await?; + let connected_msg = BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Connected(connection_id), + ); + + // Send connection event message via the broker. + ch_broker.send(connected_msg).await.unwrap(); + // Convert the public key to a `String`. let ssb_id = handshake.peer_pk.to_ssb_id(); @@ -127,7 +189,17 @@ pub async fn actor_inner( if CONNECTED_PEERS.read().await.contains(&handshake.peer_pk) { info!("peer {} is already connected", &peer_pk); - return Ok(()); + // Since we already have an active connection to this peer, + // we can disconnect the redundant connection. + let disconnecting_msg = BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Disconnecting(connection_id), + ); + + // Send connection event message via the broker. + ch_broker.send(disconnecting_msg).await.unwrap(); + + return Ok(connection_id); } info!("💃 received connection from peer {}", &peer_pk); @@ -147,26 +219,25 @@ pub async fn actor_inner( peer_pk ); + let disconnecting_msg = BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Disconnecting(connection_id), + ); + + // Send connection event message via the broker. + ch_broker.send(disconnecting_msg).await.unwrap(); + // This may not be necessary; the connection should close when // the stream is dropped. stream.shutdown(Shutdown::Both)?; - return Ok(()); + return Ok(connection_id); } (stream, handshake) } }; - // Register the "peer" actor endpoint with the broker. - let ActorEndpoint { - ch_terminate, - mut ch_broker, - ch_msg, - actor_id, - .. - } = BROKER.lock().await.register("peer", true).await?; - // Parse the peer public key from the handshake. let peer_pk = handshake.peer_pk; @@ -176,6 +247,14 @@ pub async fn actor_inner( // Add the peer to the list of connected peers. CONNECTED_PEERS.write().await.insert(peer_pk); + let replicating_msg = BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Replicating(connection_id), + ); + + // Send connection event message via the broker. + ch_broker.send(replicating_msg).await.unwrap(); + // Spawn the peer loop (responsible for negotiating RPC requests). let res = peer_loop( actor_id, @@ -193,18 +272,32 @@ pub async fn actor_inner( if let Err(err) = res { warn!("💀 client terminated with error {:?}", err); - // TODO: pass message to the connection manager via the broker - // ConnectionEvent::Error(err) - // HINT: Use the `ConnectionError` as the type for `err`. + // TODO: Use the `ConnectionError` as the type for `err`. + // + // Communicate the disconnection to the connection manager. + let error_msg = BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Error(connection_id, err.to_string()), + ); + + // Send connection event message via the broker. + ch_broker.send(error_msg).await.unwrap(); } else { info!("👋 finished connection with {}", &peer_pk.to_ssb_id()); - // TODO: pass message to the connection manager via the broker - // ConnectionEvent::Disconnected + + // Communicate the disconnection to the connection manager. + let disconnected_msg = BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Disconnected(connection_id), + ); + + // Send connection event message via the broker. + ch_broker.send(disconnected_msg).await.unwrap(); } let _ = ch_broker.send(BrokerEvent::Disconnect { actor_id }).await; - Ok(()) + Ok(connection_id) } async fn peer_loop( @@ -253,10 +346,6 @@ async fn peer_loop Result<()> { )); } + // Spawn the connection manager message loop. + let connection_manager_msgloop = CONNECTION_MANAGER.lock().await.take_msgloop(); + connection_manager_msgloop.await; + // Spawn the broker message loop. - let msgloop = BROKER.lock().await.take_msgloop(); - msgloop.await; + let broker_msgloop = BROKER.lock().await.take_msgloop(); + broker_msgloop.await; println!("Gracefully finished"); From 3b5c58423d291fd9a14c99fe87656a6578a8c01e Mon Sep 17 00:00:00 2001 From: mycognosist Date: Thu, 2 Feb 2023 10:11:50 +0200 Subject: [PATCH 3/8] add trace logging to peer loop --- src/actors/peer.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/actors/peer.rs b/src/actors/peer.rs index c45f32e..e9464fa 100644 --- a/src/actors/peer.rs +++ b/src/actors/peer.rs @@ -311,6 +311,8 @@ async fn peer_loop Date: Thu, 2 Feb 2023 10:12:29 +0200 Subject: [PATCH 4/8] add tracing to history stream handler and update comments --- src/actors/rpc/history_stream.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/actors/rpc/history_stream.rs b/src/actors/rpc/history_stream.rs index 910d1d4..2f3b2ea 100644 --- a/src/actors/rpc/history_stream.rs +++ b/src/actors/rpc/history_stream.rs @@ -8,7 +8,7 @@ use kuska_ssb::{ feed::{Feed as MessageKvt, Message}, rpc, }; -use log::{debug, info, warn}; +use log::{debug, info, trace, warn}; use once_cell::sync::Lazy; use regex::Regex; @@ -69,15 +69,16 @@ where _ => Ok(false), } } - // Handle an outgoing MUXRPC response. + // Handle an incoming MUXRPC response. RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => { self.recv_rpc_response(api, ch_broker, *req_no, res).await } - // Handle an outgoing MUXRPC 'cancel stream' response. + // Handle an incoming MUXRPC 'cancel stream' response. RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamRespose()) => { + trace!(target: "history-stream-handler", "matched cancel stream response with req no: {}", req_no); self.recv_cancelstream(api, *req_no).await } - // Handle an outgoing MUXRPC error response. + // Handle an incoming MUXRPC error response. RpcInput::Network(req_no, rpc::RecvMsg::ErrorResponse(err)) => { self.recv_error_response(api, *req_no, err).await } @@ -293,11 +294,20 @@ where /// Close the stream and remove the public key of the peer from the list /// of active streams (`reqs`). async fn recv_cancelstream(&mut self, api: &mut ApiCaller, req_no: i32) -> Result { + trace!(target: "history-stream-handler", "recv_cancelstream called for req no: {}", req_no); if let Some(key) = self.find_key_by_req_no(req_no) { + trace!(target: "history-stream-handler", "key found for req no: {}", req_no); + + trace!(target: "history-stream-handler", "sending eof rpc message to peer for req no: {}", req_no); api.rpc().send_stream_eof(-req_no).await?; + + trace!(target: "history-stream-handler", "removing {} from list of active peers", key); self.reqs.remove(&key); + + trace!(target: "history-stream-handler", "cancel stream event has been handled; returning true"); Ok(true) } else { + trace!(target: "history-stream-handler", "no key found for req no: {}", req_no); Ok(false) } } From bc6836ecb2c00b66d4dfea92ddb6efd9fa4dc6f8 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Thu, 2 Feb 2023 10:13:01 +0200 Subject: [PATCH 5/8] add test for feed query response length --- src/storage/kv.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/storage/kv.rs b/src/storage/kv.rs index 3d382ef..75f2e10 100644 --- a/src/storage/kv.rs +++ b/src/storage/kv.rs @@ -401,6 +401,12 @@ mod test { // and signed message. assert_eq!(msg_val.unwrap(), msg_2_clone); + // Get all messages comprising the feed. + let feed = kv.get_feed(&keypair.id).unwrap(); + + // Ensure that two messages are returned. + assert_eq!(feed.len(), 2); + Ok(()) } From cb0af6bc5d12cde161a8c9c26e48e7d50cffde49 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Wed, 8 Feb 2023 08:29:58 +0200 Subject: [PATCH 6/8] implement conn manager methods and tests --- src/actors/connection_manager.rs | 219 ++++++++++++++++--------- src/actors/peer.rs | 264 ++++++++++++++++++------------- src/actors/rpc/history_stream.rs | 14 +- src/actors/tcp_server.rs | 4 - src/main.rs | 2 +- 5 files changed, 302 insertions(+), 201 deletions(-) diff --git a/src/actors/connection_manager.rs b/src/actors/connection_manager.rs index 27d088c..c193cd6 100644 --- a/src/actors/connection_manager.rs +++ b/src/actors/connection_manager.rs @@ -1,5 +1,12 @@ -use async_std::{sync::Mutex, task, task::JoinHandle}; +use std::collections::HashSet; + +use async_std::{ + sync::{Arc, RwLock}, + task, + task::JoinHandle, +}; use futures::{select_biased, stream::StreamExt, FutureExt}; +use kuska_ssb::crypto::ed25519; use log::trace; use once_cell::sync::Lazy; @@ -46,18 +53,25 @@ pub enum ConnectionEvent { /// Connection manager (broker). #[derive(Debug)] pub struct ConnectionManager { + /// The public keys of all peers to whom we are currently connected. + pub connected_peers: HashSet, + /// Idle connection timeout limit. + pub idle_timeout_limit: u8, /// ID number of the most recently registered connection. last_connection_id: usize, /// Message loop handle. msgloop: Option>, - //active_connections: usize, + // TODO: keep a list of active connections. + // Then we can query total active connections using `.len()`. + //active_connections: HashSet, } -pub static CONNECTION_MANAGER: Lazy> = - Lazy::new(|| Mutex::new(ConnectionManager::new())); +/// The connection manager for the solar node. +pub static CONNECTION_MANAGER: Lazy>> = + Lazy::new(|| Arc::new(RwLock::new(ConnectionManager::new()))); impl ConnectionManager { - /// Instantiate a new `ConnectionManager` instance. + /// Instantiate a new `ConnectionManager`. pub fn new() -> Self { // Spawn the connection event message loop. let msgloop = task::spawn(Self::msg_loop()); @@ -65,9 +79,37 @@ impl ConnectionManager { Self { last_connection_id: 0, msgloop: Some(msgloop), + idle_timeout_limit: 30, + connected_peers: HashSet::new(), } } + /// Query the number of active peer connections. + pub fn count_connections(&self) -> usize { + self.connected_peers.len() + } + + /// Query whether the list of connected peers contains the given peer. + /// Returns `true` if the peer is in the list, otherwise a `false` value is + /// returned. + pub fn contains_connected_peer(&self, peer_id: &ed25519::PublicKey) -> bool { + self.connected_peers.contains(peer_id) + } + + /// Add a peer to the list of connected peers. + /// Returns `true` if the peer was not already in the list, otherwise a + /// `false` value is returned. + pub fn insert_connected_peer(&mut self, peer_id: ed25519::PublicKey) -> bool { + self.connected_peers.insert(peer_id) + } + + /// Remove a peer from the list of connected peers. + /// Returns `true` if the peer was in the list, otherwise a `false` value + /// is returned. + pub fn remove_connected_peer(&mut self, peer_id: ed25519::PublicKey) -> bool { + self.connected_peers.remove(&peer_id) + } + /// Return a handle for the connection event message loop. pub fn take_msgloop(&mut self) -> JoinHandle<()> { self.msgloop.take().unwrap() @@ -78,7 +120,7 @@ impl ConnectionManager { // Increment the last connection ID value. self.last_connection_id += 1; - trace!(target: "connection-manager", "registering new connection: {}", self.last_connection_id); + trace!(target: "connection-manager", "registered new connection: {}", self.last_connection_id); self.last_connection_id } @@ -149,79 +191,108 @@ impl ConnectionManager { } } -/* +#[cfg(test)] +mod test { + use super::*; + + use crate::{config::SecretConfig, Result}; + + // A helper function to instantiate a new connection manager for each test. + // + // If we don't use this approach and simply use CONNECTION_MANAGER + // instead, we end up sharing state across tests (since they all end up + // operating on the same instance). + fn instantiate_new_connection_manager() -> Lazy>> { + Lazy::new(|| Arc::new(RwLock::new(ConnectionManager::new()))) + } + + #[async_std::test] + async fn test_connection_manager_defaults() -> Result<()> { + let connection_manager = instantiate_new_connection_manager(); + + let connection_idle_timeout_limit = connection_manager.read().await.idle_timeout_limit; + assert_eq!(connection_idle_timeout_limit, 30); -// LEFT-OVER CODE FROM ACTOR APPROCH + let last_connection_id = connection_manager.read().await.last_connection_id; + assert_eq!(last_connection_id, 0); -// TODO: remove this code when certain of the msgloop approach used above + let msgloop = &connection_manager.read().await.msgloop; + assert!(msgloop.is_some()); -pub async fn actor() -> Result<()> { - if let Err(err) = actor_inner().await { - warn!("connection manager failed: {}", err); + let connected_peers = &connection_manager.read().await.connected_peers; + assert!(connected_peers.is_empty()); + + Ok(()) } - Ok(()) -} -pub async fn actor_inner() -> Result<()> { - // Register the connection manager actor with the broker. - let ActorEndpoint { - ch_terminate, - ch_broker: _, - ch_msg, - actor_id: _, - .. - } = BROKER - .lock() - .await - .register("connection-manager", true) - .await?; - - let _connection_manager = ConnectionManager::new(); - - // Fuse internal termination channel with external channel. - // This allows termination of the peer loop to be initiated from outside - // this function. - let mut ch_terminate_fuse = ch_terminate.fuse(); - - let mut broker_msg_ch = ch_msg.unwrap(); - - // Listen for connection events via the broker message bus. - loop { - select_biased! { - _value = ch_terminate_fuse => { - break; - }, - msg = broker_msg_ch.next().fuse() => { - if let Some(msg) = msg { - if let Some(conn_event) = msg.downcast_ref::() { - match conn_event { - ConnectionEvent::Connecting(id) => { - trace!(target: "connection-manager", "Connecting: {}", id); - } - ConnectionEvent::Handshaking(id) => { - trace!(target: "connection-manager", "Handshaking: {}", id); - } - ConnectionEvent::Connected(id) => { - trace!(target: "connection-manager", "Connected: {}", id); - } ConnectionEvent::Replicating(id) => { - trace!(target: "connection-manager", "Replicating: {}", id); - } - ConnectionEvent::Disconnecting(id) => { - trace!(target: "connection-manager", "Disconnecting: {}", id); - } - ConnectionEvent::Disconnected(id) => { - trace!(target: "connection-manager", "Disconnected: {}", id); - } - ConnectionEvent::Error(id, err) => { - trace!(target: "connection-manager", "Error: {}: {}", id, err); - } - } - } - } - }, - }; + #[async_std::test] + async fn test_register_new_connection() -> Result<()> { + let connection_manager = instantiate_new_connection_manager(); + + for i in 1..=4 { + // Register a new connection. + let connection_id = connection_manager.write().await.register(); + + // Ensure the connection ID is incremented for each new connection. + assert_eq!(connection_id, i as usize); + } + + Ok(()) + } + + #[async_std::test] + async fn test_count_connections() -> Result<()> { + let connection_manager = instantiate_new_connection_manager(); + + let active_connections = connection_manager.read().await.count_connections(); + assert_eq!(active_connections, 0); + + Ok(()) } - Ok(()) + #[async_std::test] + async fn test_connected_peers() -> Result<()> { + let connection_manager = instantiate_new_connection_manager(); + + // Create a unique keypair to sign messages. + let keypair = SecretConfig::create().owned_identity().unwrap(); + + // Insert a new connected peer. + let insert_result = connection_manager + .write() + .await + .insert_connected_peer(keypair.pk); + assert_eq!(insert_result, true); + + // Query the list of connected peers. + let query_result = connection_manager + .read() + .await + .contains_connected_peer(&keypair.pk); + assert_eq!(query_result, true); + + // Attempt to insert the same peer ID for a second time. + let reinsert_result = connection_manager + .write() + .await + .insert_connected_peer(keypair.pk); + assert_eq!(reinsert_result, false); + + // Count the active connections. + let connections = connection_manager.read().await.count_connections(); + assert_eq!(connections, 1); + + // Remove a peer from the list of connected peers. + let remove_result = connection_manager + .write() + .await + .remove_connected_peer(keypair.pk); + assert_eq!(remove_result, true); + + // Count the active connections. + let conns = connection_manager.read().await.count_connections(); + assert_eq!(conns, 0); + + Ok(()) + } } -*/ diff --git a/src/actors/peer.rs b/src/actors/peer.rs index 3e18d0b..8fb8a91 100644 --- a/src/actors/peer.rs +++ b/src/actors/peer.rs @@ -1,9 +1,8 @@ -use std::{collections::HashSet, net::Shutdown, time::Duration}; +use std::{net::Shutdown, time::Duration}; use async_std::{ io::{Read, Write}, net::TcpStream, - sync::{Arc, RwLock}, task, }; use futures::{pin_mut, select_biased, stream::StreamExt, FutureExt, SinkExt}; @@ -18,7 +17,6 @@ use kuska_ssb::{ rpc::{RpcReader, RpcWriter}, }; use log::{error, info, trace, warn}; -use once_cell::sync::Lazy; use crate::{ actors::{ @@ -44,30 +42,37 @@ pub enum Connect { }, } -// TODO: move this to the connection manager -/// A list (`HashSet`) of public keys representing peers to whom we are -/// currently connected. -pub static CONNECTED_PEERS: Lazy>>> = - Lazy::new(|| Arc::new(RwLock::new(HashSet::new()))); - pub async fn actor(id: OwnedIdentity, connect: Connect, selective_replication: bool) -> Result<()> { // Register a new connection with the connection manager. - let connection_id = CONNECTION_MANAGER.lock().await.register(); + let connection_id = CONNECTION_MANAGER.write().await.register(); + + // Set the connection idle timeout limit according to the connection + // manager configuration. This value is used to break out of the peer loop + // after n consecutive idle seconds. + let connection_idle_timeout_limit = CONNECTION_MANAGER.read().await.idle_timeout_limit; // Catch any errors which occur during the peer connection and replication. - if let Err(err) = actor_inner(id, connect, selective_replication, connection_id).await { + if let Err(err) = actor_inner( + id, + connect, + selective_replication, + connection_id, + connection_idle_timeout_limit, + ) + .await + { warn!("peer failed: {:?}", err); let mut ch_broker = BROKER.lock().await.create_sender(); - // Send a connection event. - let connecting_msg = BrokerEvent::new( - Destination::Broadcast, - ConnectionEvent::Error(connection_id, err.to_string()), - ); - - // Send connection event message via the broker. - ch_broker.send(connecting_msg).await.unwrap(); + // Send 'error' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Error(connection_id, err.to_string()), + )) + .await + .unwrap(); } Ok(()) @@ -81,6 +86,7 @@ pub async fn actor_inner( connect: Connect, selective_replication: bool, connection_id: usize, + connection_idle_timeout_limit: u8, ) -> Result { // Register the "peer" actor endpoint with the broker. let ActorEndpoint { @@ -97,17 +103,14 @@ pub async fn actor_inner( // Define the network key to be used for the secret handshake. let network_key = NETWORK_KEY.get().unwrap().to_owned(); - // TODO: think of ways to send a directed message instead of broadcast. - // All we need is the actor ID of the connection manager. - // - // Send a connection event. - let connecting_msg = BrokerEvent::new( - Destination::Broadcast, - ConnectionEvent::Connecting(connection_id), - ); - - // Send connection event message via the broker. - ch_broker.send(connecting_msg).await.unwrap(); + // Send 'connecting' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Connecting(connection_id), + )) + .await + .unwrap(); // Handle a TCP connection event (inbound or outbound). let (stream, handshake) = match connect { @@ -117,10 +120,16 @@ pub async fn actor_inner( port, peer_pk, } => { + // TODO: move this check into the scheduler. + // // First check if we are already connected to the selected peer. // If yes, return immediately. // If no, continue with the connection attempt. - if CONNECTED_PEERS.read().await.contains(&peer_pk) { + if CONNECTION_MANAGER + .read() + .await + .contains_connected_peer(&peer_pk) + { return Ok(connection_id); } @@ -129,49 +138,53 @@ pub async fn actor_inner( // Attempt a TCP connection. let mut stream = TcpStream::connect(server_port).await?; - let handshaking_msg = BrokerEvent::new( - Destination::Broadcast, - ConnectionEvent::Handshaking(connection_id), - ); - - // Send connection event message via the broker. - ch_broker.send(handshaking_msg).await.unwrap(); + // Send 'handshaking' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Handshaking(connection_id), + )) + .await + .unwrap(); // Attempt a secret handshake. let handshake = handshake_client(&mut stream, network_key, pk, sk, peer_pk).await?; info!("💃 connected to peer {}", handshake.peer_pk.to_ssb_id()); - let connected_msg = BrokerEvent::new( - Destination::Broadcast, - ConnectionEvent::Connected(connection_id), - ); - - // Send connection event message via the broker. - ch_broker.send(connected_msg).await.unwrap(); + // Send 'connected' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Connected(connection_id), + )) + .await + .unwrap(); (stream, handshake) } // Handle an incoming TCP connection event. Connect::ClientStream { mut stream } => { - let handshaking_msg = BrokerEvent::new( - Destination::Broadcast, - ConnectionEvent::Handshaking(connection_id), - ); - - // Send connection event message via the broker. - ch_broker.send(handshaking_msg).await.unwrap(); + // Send 'handshaking' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Handshaking(connection_id), + )) + .await + .unwrap(); // Attempt a secret handshake. let handshake = handshake_server(&mut stream, network_key, pk, sk).await?; - let connected_msg = BrokerEvent::new( - Destination::Broadcast, - ConnectionEvent::Connected(connection_id), - ); - - // Send connection event message via the broker. - ch_broker.send(connected_msg).await.unwrap(); + // Send 'connected' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Connected(connection_id), + )) + .await + .unwrap(); // Convert the public key to a `String`. let ssb_id = handshake.peer_pk.to_ssb_id(); @@ -186,18 +199,24 @@ pub async fn actor_inner( // Check if we are already connected to the selected peer. // If yes, return immediately. // If no, return the stream and handshake. - if CONNECTED_PEERS.read().await.contains(&handshake.peer_pk) { + if CONNECTION_MANAGER + .read() + .await + .contains_connected_peer(&handshake.peer_pk) + { info!("peer {} is already connected", &peer_pk); // Since we already have an active connection to this peer, // we can disconnect the redundant connection. - let disconnecting_msg = BrokerEvent::new( - Destination::Broadcast, - ConnectionEvent::Disconnecting(connection_id), - ); - // Send connection event message via the broker. - ch_broker.send(disconnecting_msg).await.unwrap(); + // Send 'disconnecting' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Disconnecting(connection_id), + )) + .await + .unwrap(); return Ok(connection_id); } @@ -219,13 +238,14 @@ pub async fn actor_inner( peer_pk ); - let disconnecting_msg = BrokerEvent::new( - Destination::Broadcast, - ConnectionEvent::Disconnecting(connection_id), - ); - // Send connection event message via the broker. - ch_broker.send(disconnecting_msg).await.unwrap(); + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Disconnecting(connection_id), + )) + .await + .unwrap(); // This may not be necessary; the connection should close when // the stream is dropped. @@ -241,19 +261,20 @@ pub async fn actor_inner( // Parse the peer public key from the handshake. let peer_pk = handshake.peer_pk; - // TODO: - // Remember to change the placement of this. - // Add the peer to the list of connected peers. - CONNECTED_PEERS.write().await.insert(peer_pk); - - let replicating_msg = BrokerEvent::new( - Destination::Broadcast, - ConnectionEvent::Replicating(connection_id), - ); - - // Send connection event message via the broker. - ch_broker.send(replicating_msg).await.unwrap(); + CONNECTION_MANAGER + .write() + .await + .insert_connected_peer(peer_pk); + + // Send 'replicating' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Replicating(connection_id), + )) + .await + .unwrap(); // Spawn the peer loop (responsible for negotiating RPC requests). let res = peer_loop( @@ -263,36 +284,40 @@ pub async fn actor_inner( handshake, ch_terminate, ch_msg.unwrap(), + connection_idle_timeout_limit, ) .await; // Remove the peer from the list of connected peers. - CONNECTED_PEERS.write().await.remove(&peer_pk); + CONNECTION_MANAGER + .write() + .await + .remove_connected_peer(peer_pk); if let Err(err) = res { warn!("💀 client terminated with error {:?}", err); // TODO: Use the `ConnectionError` as the type for `err`. // - // Communicate the disconnection to the connection manager. - let error_msg = BrokerEvent::new( - Destination::Broadcast, - ConnectionEvent::Error(connection_id, err.to_string()), - ); - - // Send connection event message via the broker. - ch_broker.send(error_msg).await.unwrap(); + // Send 'error' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Error(connection_id, err.to_string()), + )) + .await + .unwrap(); } else { info!("👋 finished connection with {}", &peer_pk.to_ssb_id()); - // Communicate the disconnection to the connection manager. - let disconnected_msg = BrokerEvent::new( - Destination::Broadcast, - ConnectionEvent::Disconnected(connection_id), - ); - - // Send connection event message via the broker. - ch_broker.send(disconnected_msg).await.unwrap(); + // Send 'disconnected' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + ConnectionEvent::Disconnected(connection_id), + )) + .await + .unwrap(); } let _ = ch_broker.send(BrokerEvent::Disconnect { actor_id }).await; @@ -307,12 +332,11 @@ async fn peer_loop Result<()> { // Parse the peer public key from the handshake. let peer_ssb_id = handshake.peer_pk.to_ssb_id(); - trace!(target: "peer-loop", "initiating peer loop with: {}", peer_ssb_id); - // Instantiate a box stream and split it into reader and writer streams. let (box_stream_read, box_stream_write) = BoxStream::from_handshake(reader, writer, handshake, 0x8000).split_read_write(); @@ -348,9 +372,17 @@ async fn peer_loop { + // Reset the timer counter. + timer_counter = 0; let (rpc_id, packet) = packet; - RpcInput::Network(rpc_id,packet) + RpcInput::Network(rpc_id, packet) }, msg = ch_msg.next().fuse() => { + // Reset the timer counter. + timer_counter = 0; if let Some(msg) = msg { RpcInput::Message(msg) } else { @@ -370,12 +406,18 @@ async fn peer_loop { - RpcInput::Timer - }, + // Break out of the peer loop if the connection idle timeout + // limit has been reached. + if timer_counter >= connection_idle_timeout_limit { + break + } else { + // Increment the timer counter. + timer_counter += 1; + RpcInput::Timer + } + } }; - trace!(target: "peer-loop", "peer loop input: {:?}", input); - let mut handled = false; for handler in handlers.iter_mut() { match handler.handle(&mut api, &input, &mut ch_broker).await { @@ -395,5 +437,7 @@ async fn peer_loop { - trace!(target: "history-stream-handler", "matched cancel stream response with req no: {}", req_no); self.recv_cancelstream(api, *req_no).await } // Handle an incoming MUXRPC error response. @@ -146,7 +145,7 @@ where // Instantiate the history stream request args for the given peer. // The `live` arg means: keep the connection open after initial // replication. - let mut args = dto::CreateHistoryStreamIn::new(peer_pk.to_string()).live(false); + let mut args = dto::CreateHistoryStreamIn::new(peer_pk.to_string()).live(true); // Retrieve the sequence number of the most recent message for // this peer from the local key-value store. @@ -294,20 +293,11 @@ where /// Close the stream and remove the public key of the peer from the list /// of active streams (`reqs`). async fn recv_cancelstream(&mut self, api: &mut ApiCaller, req_no: i32) -> Result { - trace!(target: "history-stream-handler", "recv_cancelstream called for req no: {}", req_no); if let Some(key) = self.find_key_by_req_no(req_no) { - trace!(target: "history-stream-handler", "key found for req no: {}", req_no); - - trace!(target: "history-stream-handler", "sending eof rpc message to peer for req no: {}", req_no); api.rpc().send_stream_eof(-req_no).await?; - - trace!(target: "history-stream-handler", "removing {} from list of active peers", key); self.reqs.remove(&key); - - trace!(target: "history-stream-handler", "cancel stream event has been handled; returning true"); Ok(true) } else { - trace!(target: "history-stream-handler", "no key found for req no: {}", req_no); Ok(false) } } diff --git a/src/actors/tcp_server.rs b/src/actors/tcp_server.rs index 0db9339..f36294f 100644 --- a/src/actors/tcp_server.rs +++ b/src/actors/tcp_server.rs @@ -23,12 +23,8 @@ pub async fn actor( select_biased! { _ = ch_terminate => break, stream = incoming.next().fuse() => { - // TODO: pass message to the broker for the connection manager: - // ConnectionEvent::Connecting if let Some(stream) = stream { if let Ok(stream) = stream { - // TODO: pass message to the broker for the connection manager: - // ConnectionEvent::Connected Broker::spawn(super::peer::actor(server_id.clone(), super::peer::Connect::ClientStream{stream}, selective_replication)); } } else { diff --git a/src/main.rs b/src/main.rs index a37db8a..c411098 100644 --- a/src/main.rs +++ b/src/main.rs @@ -106,7 +106,7 @@ async fn main() -> Result<()> { } // Spawn the connection manager message loop. - let connection_manager_msgloop = CONNECTION_MANAGER.lock().await.take_msgloop(); + let connection_manager_msgloop = CONNECTION_MANAGER.write().await.take_msgloop(); connection_manager_msgloop.await; // Spawn the broker message loop. From eb60adf125ff144d0e0aca16f0b8b93b60aaac1a Mon Sep 17 00:00:00 2001 From: mycognosist Date: Wed, 10 May 2023 13:28:40 +0200 Subject: [PATCH 7/8] satisfy clippy --- src/actors/connection_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/actors/connection_manager.rs b/src/actors/connection_manager.rs index c193cd6..3939fdd 100644 --- a/src/actors/connection_manager.rs +++ b/src/actors/connection_manager.rs @@ -85,7 +85,7 @@ impl ConnectionManager { } /// Query the number of active peer connections. - pub fn count_connections(&self) -> usize { + pub fn _count_connections(&self) -> usize { self.connected_peers.len() } From c19adcbb4d4459e7013f9f021480faabb66de98d Mon Sep 17 00:00:00 2001 From: mycognosist Date: Wed, 10 May 2023 13:35:32 +0200 Subject: [PATCH 8/8] add underscore to count_connections method in tests --- src/actors/connection_manager.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/actors/connection_manager.rs b/src/actors/connection_manager.rs index 3939fdd..ecfd2fb 100644 --- a/src/actors/connection_manager.rs +++ b/src/actors/connection_manager.rs @@ -244,7 +244,7 @@ mod test { async fn test_count_connections() -> Result<()> { let connection_manager = instantiate_new_connection_manager(); - let active_connections = connection_manager.read().await.count_connections(); + let active_connections = connection_manager.read().await._count_connections(); assert_eq!(active_connections, 0); Ok(()) @@ -279,7 +279,7 @@ mod test { assert_eq!(reinsert_result, false); // Count the active connections. - let connections = connection_manager.read().await.count_connections(); + let connections = connection_manager.read().await._count_connections(); assert_eq!(connections, 1); // Remove a peer from the list of connected peers. @@ -290,7 +290,7 @@ mod test { assert_eq!(remove_result, true); // Count the active connections. - let conns = connection_manager.read().await.count_connections(); + let conns = connection_manager.read().await._count_connections(); assert_eq!(conns, 0); Ok(())