Skip to content

Commit

Permalink
Add connection type discrimination for connected-peers protocol. (#2279)
Browse files Browse the repository at this point in the history
* networking: Add connection type to connected-peers protocol.

* networking: Add connected-peers test.

* networking: Update connected-peers protocol.
  • Loading branch information
shamil-gadelshin authored Nov 30, 2023
1 parent 6bda2af commit 184f45b
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 22 deletions.
132 changes: 111 additions & 21 deletions crates/subspace-networking/src/protocols/connected_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::utils::PeerAddress;
use futures::FutureExt;
use futures_timer::Delay;
use handler::Handler;
use libp2p::core::{Endpoint, Multiaddr};
use libp2p::core::{ConnectedPoint, Endpoint, Multiaddr};
use libp2p::swarm::behaviour::{ConnectionEstablished, FromSwarm};
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::{
Expand All @@ -46,6 +46,30 @@ use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
use tracing::{debug, trace};

#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub(super) enum ConnectionType {
Outgoing,
Incoming,
}

impl From<&ConnectedPoint> for ConnectionType {
fn from(value: &ConnectedPoint) -> Self {
match value {
ConnectedPoint::Dialer { .. } => ConnectionType::Outgoing,
ConnectedPoint::Listener { .. } => ConnectionType::Incoming,
}
}
}

impl ConnectionType {
fn stringify(&self) -> String {
match self {
Self::Outgoing => "Outgoing".to_string(),
Self::Incoming => "Incoming".to_string(),
}
}
}

/// Represents different states of a peer permanent connection.
#[derive(Debug, Clone, PartialEq, Eq)]
enum ConnectionState {
Expand All @@ -57,11 +81,17 @@ enum ConnectionState {

/// We're waiting for a decision for some time. The decision time is limited by the
/// connection timeout.
Deciding { connection_id: ConnectionId },
Deciding {
connection_id: ConnectionId,
connection_type: ConnectionType,
},

/// Indicates that the decision has been made to maintain a permanent
/// connection with the peer. No further decision-making is required for this state.
Permanent { connection_id: ConnectionId },
Permanent {
connection_id: ConnectionId,
connection_type: ConnectionType,
},

/// Shows that the system has decided not to connect with the peer.
/// No further decision-making is required for this state.
Expand All @@ -78,21 +108,42 @@ impl ConnectionState {
fn connection_id(&self) -> Option<ConnectionId> {
match self {
ConnectionState::Preparing { .. } | ConnectionState::Connecting { .. } => None,
ConnectionState::Deciding { connection_id } => Some(*connection_id),
ConnectionState::Permanent { connection_id } => Some(*connection_id),
ConnectionState::Deciding { connection_id, .. } => Some(*connection_id),
ConnectionState::Permanent { connection_id, .. } => Some(*connection_id),
ConnectionState::NotInterested => None,
}
}

/// Returns active connection type if any.
fn connection_type(&self) -> Option<ConnectionType> {
match self {
ConnectionState::Preparing { .. } | ConnectionState::Connecting { .. } => None,
ConnectionState::Deciding {
connection_type, ..
} => Some(*connection_type),
ConnectionState::Permanent {
connection_type, ..
} => Some(*connection_type),
ConnectionState::NotInterested => None,
}
}

/// Converts [`ConnectionState`] to a string with information loss.
fn stringify(&self) -> String {
match self {
let type_part = self
.connection_type()
.map(|conn_type| conn_type.stringify())
.unwrap_or("None".to_string());

let state_part = match self {
ConnectionState::Preparing { .. } => "ToConnect".to_string(),
ConnectionState::Connecting { .. } => "Connecting".to_string(),
ConnectionState::Deciding { .. } => "Deciding".to_string(),
ConnectionState::Permanent { .. } => "Permanent".to_string(),
ConnectionState::NotInterested => "NotInterested".to_string(),
}
};

format!("{0}:{1}", type_part, state_part)
}
}

Expand Down Expand Up @@ -199,14 +250,22 @@ impl<Instance> Behaviour<Instance> {
}

/// Create a connection handler for the protocol.
fn new_connection_handler(&mut self, peer_id: &PeerId, connection_id: ConnectionId) -> Handler {
fn new_connection_handler(
&mut self,
peer_id: &PeerId,
connection_id: ConnectionId,
connection_type: ConnectionType,
) -> Handler {
let default_keep_alive_until = Instant::now() + self.config.decision_timeout;
let (keep_alive, keep_alive_until) =
if let Some(connection_state) = self.known_peers.get_mut(peer_id) {
match connection_state {
ConnectionState::Preparing { .. } | ConnectionState::Connecting { .. } => {
// Connection attempt was successful.
*connection_state = ConnectionState::Deciding { connection_id };
*connection_state = ConnectionState::Deciding {
connection_id,
connection_type,
};

(true, Some(default_keep_alive_until))
}
Expand All @@ -216,8 +275,13 @@ impl<Instance> Behaviour<Instance> {
}
} else {
// Connection from other protocols.
self.known_peers
.insert(*peer_id, ConnectionState::Deciding { connection_id });
self.known_peers.insert(
*peer_id,
ConnectionState::Deciding {
connection_id,
connection_type,
},
);

(true, Some(default_keep_alive_until))
};
Expand All @@ -229,13 +293,29 @@ impl<Instance> Behaviour<Instance> {
/// Specifies the whether we should keep connections to the peer alive. The decision could
/// depend on another protocol (e.g.: PeerInfo protocol event handling).
pub fn update_keep_alive_status(&mut self, peer_id: PeerId, keep_alive: bool) {
let not_enough_connected_peers =
self.permanently_connected_peers() < self.config.max_connected_peers;
let current_connected_peers_number = self.permanently_connected_peers();

// It's a known peer.
if let Some(connection_state) = self.known_peers.get_mut(&peer_id) {
// We're connected
if let Some(connection_id) = connection_state.connection_id() {
let Some(connection_type) = connection_state.connection_type() else {
debug!(
?peer_id,
?keep_alive,
"Detected an attempt to update status of peer with unknown connection type."
);
return;
};

// We allow new permanent outgoing connections when we have total connections less
// than our desired (target) connection number or new permanent incoming
// connections when we have total connections less than the maximum allowed number.
let not_enough_connected_peers = current_connected_peers_number
< self.config.target_connected_peers
|| (connection_type == ConnectionType::Incoming
&& current_connected_peers_number < self.config.max_connected_peers);

if not_enough_connected_peers {
trace!(%peer_id, %keep_alive, "Insufficient number of connected peers detected.");
} else {
Expand All @@ -245,7 +325,13 @@ impl<Instance> Behaviour<Instance> {
// Check whether we have enough connected peers already and a positive decision
let (new_connection_state, keep_alive_handler) =
if not_enough_connected_peers && keep_alive {
(ConnectionState::Permanent { connection_id }, true)
(
ConnectionState::Permanent {
connection_id,
connection_type,
},
true,
)
} else {
(ConnectionState::NotInterested, false)
};
Expand Down Expand Up @@ -313,19 +399,19 @@ impl<Instance> Behaviour<Instance> {
let status_count =
self.known_peers
.iter()
.fold(HashMap::new(), |mut result, (_, connection_state)| {
.fold(HashMap::new(), |mut result, (_, connection_info)| {
result
.entry(connection_state.stringify())
.entry(connection_info.stringify())
.and_modify(|count| *count += 1)
.or_insert(1);
result
});

let peer_status = self.known_peers.iter().fold(
HashMap::<String, Vec<PeerId>>::new(),
|mut result, (peer_id, connection_state)| {
|mut result, (peer_id, connection_info)| {
result
.entry(connection_state.stringify())
.entry(connection_info.stringify())
.and_modify(|peers| peers.push(*peer_id))
.or_insert(vec![*peer_id]);
result
Expand All @@ -350,7 +436,7 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(self.new_connection_handler(&peer_id, connection_id))
Ok(self.new_connection_handler(&peer_id, connection_id, ConnectionType::Incoming))
}

fn handle_established_outbound_connection(
Expand All @@ -360,20 +446,24 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(self.new_connection_handler(&peer_id, connection_id))
Ok(self.new_connection_handler(&peer_id, connection_id, ConnectionType::Outgoing))
}

fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
endpoint,
..
}) => {
match self.known_peers.entry(peer_id) {
// Connection was established without dialing from this protocol
Entry::Vacant(entry) => {
entry.insert(ConnectionState::Deciding { connection_id });
entry.insert(ConnectionState::Deciding {
connection_id,
connection_type: endpoint.into(),
});

trace!(%peer_id, "Pending peer decision...");
self.wake();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ async fn test_target_connected_peer_limit_number() {
let mut peer1 = new_ephemeral(
DECISION_TIMEOUT,
Behaviour::<ConnectedPeersInstance>::new(Config {
target_connected_peers: 0,
target_connected_peers: 1,
max_connected_peers,
decision_timeout: DECISION_TIMEOUT,
..Default::default()
Expand Down Expand Up @@ -287,3 +287,75 @@ fn new_ephemeral<NB: NetworkBehaviour>(connection_timeout: Duration, behaviour:
.unwrap()
.build()
}

#[tokio::test()]
async fn test_connection_type_difference() {
let mut peer1 = new_ephemeral(
DECISION_TIMEOUT,
Behaviour::<ConnectedPeersInstance>::new(Config {
target_connected_peers: 0,
max_connected_peers: 1,
decision_timeout: DECISION_TIMEOUT,
..Default::default()
}),
);

let mut peer2 = new_ephemeral(
DECISION_TIMEOUT,
Behaviour::<ConnectedPeersInstance>::new(Config {
target_connected_peers: 1,
max_connected_peers: 1,
decision_timeout: DECISION_TIMEOUT,
..Default::default()
}),
);

peer1.listen().with_memory_addr_external().await;
peer2.listen().with_memory_addr_external().await;

peer1.connect(&mut peer2).await;

peer1
.behaviour_mut()
.update_keep_alive_status(*peer2.local_peer_id(), true);
peer2
.behaviour_mut()
.update_keep_alive_status(*peer1.local_peer_id(), true);

loop {
select! {
_ = peer1.next_swarm_event().fuse() => {},
_ = peer2.next_swarm_event().fuse() => {},
_ = sleep(LONG_DELAY).fuse() => {
break;
}
}
}

// Peer1 doesn't have enough slots for outgoing connections
assert!(!peer1.is_connected(peer2.local_peer_id()));
assert!(!peer2.is_connected(peer1.local_peer_id()));

peer2.connect(&mut peer1).await;

peer1
.behaviour_mut()
.update_keep_alive_status(*peer2.local_peer_id(), true);
peer2
.behaviour_mut()
.update_keep_alive_status(*peer1.local_peer_id(), true);

loop {
select! {
_ = peer1.next_swarm_event().fuse() => {},
_ = peer2.next_swarm_event().fuse() => {},
_ = sleep(LONG_DELAY).fuse() => {
break;
}
}
}

// Peer2 has enough slots for outgoing connections
assert!(peer1.is_connected(peer2.local_peer_id()));
assert!(peer2.is_connected(peer1.local_peer_id()));
}

0 comments on commit 184f45b

Please sign in to comment.