Skip to content

Commit

Permalink
create NetworkGlobalsWrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Jan 24, 2025
1 parent 6d04036 commit 7b58820
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 101 deletions.
114 changes: 16 additions & 98 deletions beacon_node/lighthouse_network/src/peer_manager/connectivity.rs
Original file line number Diff line number Diff line change
@@ -1,84 +1,10 @@
use crate::{EnrExt, NetworkGlobals, PeerId};
use crate::peer_manager::network_globals_wrapper::NetworkGlobalsWrapper;
use crate::EnrExt;
use discv5::Enr;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use types::EthSpec;

pub trait NetworkGlobalsConnectivity {
/// Returns the number of libp2p connected peers with outbound-only connections.
fn connected_outbound_only_peers(&self) -> usize;

/// Returns the number of libp2p peers that are either connected or being dialed.
fn connected_or_dialing_peers(&self) -> usize;

/// Update min ttl of a peer.
fn update_min_ttl(&self, peer_id: &PeerId, min_ttl: Instant);

/// Returns true if the peer should be dialed. This checks the connection state and the
/// score state and determines if the peer manager should dial this peer.
fn should_dial(&self, peer_id: &PeerId) -> bool;
}

pub struct LHNetworkGlobalsConnectivity<E: EthSpec> {
/// Storage of network globals to access the `PeerDB`.
network_globals: Arc<NetworkGlobals<E>>,
}

impl<E: EthSpec> LHNetworkGlobalsConnectivity<E> {
pub fn new(network_globals: Arc<NetworkGlobals<E>>) -> Self {
Self { network_globals }
}

/// Returns the number of libp2p connected peers with outbound-only connections.
fn connected_outbound_only_peers(&self) -> usize {
self.network_globals.connected_outbound_only_peers()
}

/// Returns the number of libp2p peers that are either connected or being dialed.
fn connected_or_dialing_peers(&self) -> usize {
self.network_globals.connected_or_dialing_peers()
}

/// Update min ttl of a peer.
fn update_min_ttl(&self, peer_id: &PeerId, min_ttl: Instant) {
self.network_globals
.peers
.write()
.update_min_ttl(peer_id, min_ttl);
}

/// Returns true if the peer should be dialed. This checks the connection state and the
/// score state and determines if the peer manager should dial this peer.
fn should_dial(&self, peer_id: &PeerId) -> bool {
self.network_globals.peers.read().should_dial(peer_id)
}
}

impl<E: EthSpec> NetworkGlobalsConnectivity for LHNetworkGlobalsConnectivity<E> {
/// Returns the number of libp2p connected peers with outbound-only connections.
fn connected_outbound_only_peers(&self) -> usize {
self.connected_outbound_only_peers()
}

/// Returns the number of libp2p peers that are either connected or being dialed.
fn connected_or_dialing_peers(&self) -> usize {
self.connected_or_dialing_peers()
}

/// Update min ttl of a peer.
fn update_min_ttl(&self, peer_id: &PeerId, min_ttl: Instant) {
self.update_min_ttl(peer_id, min_ttl)
}

/// Returns true if the peer should be dialed. This checks the connection state and the
/// score state and determines if the peer manager should dial this peer.
fn should_dial(&self, peer_id: &PeerId) -> bool {
self.should_dial(peer_id)
}
}

pub struct Connectivity<N: NetworkGlobalsConnectivity> {
pub struct Connectivity<N: NetworkGlobalsWrapper> {
target_peers: usize,
peer_excess_factor: f32,
priority_peer_excess: f32,
Expand All @@ -88,10 +14,10 @@ pub struct Connectivity<N: NetworkGlobalsConnectivity> {
discovery_enabled: bool,
/// Peers queued to be dialed.
peers_to_dial: Vec<Enr>,
network_globals_connectivity: N,
network_globals_wrapper: N,
}

impl<N: NetworkGlobalsConnectivity> Connectivity<N> {
impl<N: NetworkGlobalsWrapper> Connectivity<N> {
pub fn new(
target_peers: usize,
peer_excess_factor: f32,
Expand All @@ -109,17 +35,14 @@ impl<N: NetworkGlobalsConnectivity> Connectivity<N> {
target_outbound_only_factor,
discovery_enabled,
peers_to_dial: Default::default(),
network_globals_connectivity,
network_globals_wrapper: network_globals_connectivity,
}
}

/// A peer is being dialed.
/// Returns true, if this peer will be dialed.
pub fn dial_peer(&mut self, peer: Enr) -> bool {
if self
.network_globals_connectivity
.should_dial(&peer.peer_id())
{
if self.network_globals_wrapper.should_dial(&peer.peer_id()) {
self.peers_to_dial.push(peer);
true
} else {
Expand All @@ -134,9 +57,7 @@ impl<N: NetworkGlobalsConnectivity> Connectivity<N> {
pub fn peers_discovered(&mut self, results: &HashMap<Enr, Option<Instant>>) -> usize {
let mut to_dial_peers = 0;
let results_count = results.len();
let connected_or_dialing = self
.network_globals_connectivity
.connected_or_dialing_peers();
let connected_or_dialing = self.network_globals_wrapper.connected_or_dialing_peers();
for (enr, min_ttl) in results {
// There are two conditions in deciding whether to dial this peer.
// 1. If we are less than our max connections. Discovery queries are executed to reach
Expand All @@ -155,7 +76,7 @@ impl<N: NetworkGlobalsConnectivity> Connectivity<N> {
// dialed
let peer_id = enr.peer_id();
if let Some(min_ttl) = min_ttl {
self.network_globals_connectivity
self.network_globals_wrapper
.update_min_ttl(&peer_id, *min_ttl);
}
if self.dial_peer(enr.clone()) {
Expand Down Expand Up @@ -191,17 +112,15 @@ impl<N: NetworkGlobalsConnectivity> Connectivity<N> {
/// query if we need to find more peers to maintain the current number of peers
pub fn maintain_peer_count(&mut self, dialing_peers: usize) -> usize
where
N: NetworkGlobalsConnectivity,
N: NetworkGlobalsWrapper,
{
// Check if we need to do a discovery lookup
if self.discovery_enabled {
let peer_count = self
.network_globals_connectivity
.connected_or_dialing_peers();
let outbound_only_peer_count = self
.network_globals_connectivity
.connected_outbound_only_peers();
let wanted_peers = if peer_count < self.target_peers.saturating_sub(dialing_peers) {
let peer_count = self.network_globals_wrapper.connected_or_dialing_peers();
let outbound_only_peer_count =
self.network_globals_wrapper.connected_outbound_only_peers();
// return wanted number of peers
if peer_count < self.target_peers.saturating_sub(dialing_peers) {
// We need more peers in general.
self.max_peers().saturating_sub(dialing_peers) - peer_count
} else if outbound_only_peer_count < self.min_outbound_only_peers()
Expand All @@ -212,8 +131,7 @@ impl<N: NetworkGlobalsConnectivity> Connectivity<N> {
.saturating_sub(peer_count)
} else {
0
};
wanted_peers
}
} else {
0
}
Expand Down
8 changes: 5 additions & 3 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ pub use libp2p::identity::Keypair;

pub mod peerdb;

use crate::peer_manager::connectivity::{Connectivity, LHNetworkGlobalsConnectivity};
use crate::peer_manager::connectivity::Connectivity;
use crate::peer_manager::network_globals_wrapper::LHNetworkGlobalsWrapper;
use crate::peer_manager::peerdb::client::ClientKind;
use libp2p::multiaddr;
pub use peerdb::peer_info::{
Expand All @@ -42,6 +43,7 @@ use types::data_column_custody_group::{
pub mod config;
mod connectivity;
mod network_behaviour;
mod network_globals_wrapper;

/// The heartbeat performs regular updates such as updating reputations and performing discovery
/// requests. This defines the interval in seconds.
Expand Down Expand Up @@ -117,7 +119,7 @@ pub struct PeerManager<E: EthSpec> {
/// The logger associated with the `PeerManager`.
log: slog::Logger,

connectivity: Connectivity<LHNetworkGlobalsConnectivity<E>>,
connectivity: Connectivity<LHNetworkGlobalsWrapper<E>>,
}

/// The events that the `PeerManager` outputs (requests).
Expand Down Expand Up @@ -204,7 +206,7 @@ impl<E: EthSpec> PeerManager<E> {
MIN_OUTBOUND_ONLY_FACTOR,
TARGET_OUTBOUND_ONLY_FACTOR,
discovery_enabled,
LHNetworkGlobalsConnectivity::new(network_globals),
LHNetworkGlobalsWrapper::new(network_globals),
),
})
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use crate::{NetworkGlobals, PeerId};
use std::sync::Arc;
use std::time::Instant;
use types::EthSpec;

pub trait NetworkGlobalsWrapper {
/// Returns the number of libp2p connected peers with outbound-only connections.
fn connected_outbound_only_peers(&self) -> usize;

/// Returns the number of libp2p peers that are either connected or being dialed.
fn connected_or_dialing_peers(&self) -> usize;

/// Update min ttl of a peer.
fn update_min_ttl(&self, peer_id: &PeerId, min_ttl: Instant);

/// Returns true if the peer should be dialed. This checks the connection state and the
/// score state and determines if the peer manager should dial this peer.
fn should_dial(&self, peer_id: &PeerId) -> bool;
}

pub struct LHNetworkGlobalsWrapper<E: EthSpec> {
/// Storage of network globals to access the `PeerDB`.
network_globals: Arc<NetworkGlobals<E>>,
}

impl<E: EthSpec> LHNetworkGlobalsWrapper<E> {
pub fn new(network_globals: Arc<NetworkGlobals<E>>) -> Self {
Self { network_globals }
}

/// Returns the number of libp2p connected peers with outbound-only connections.
fn connected_outbound_only_peers(&self) -> usize {
self.network_globals.connected_outbound_only_peers()
}

/// Returns the number of libp2p peers that are either connected or being dialed.
fn connected_or_dialing_peers(&self) -> usize {
self.network_globals.connected_or_dialing_peers()
}

/// Update min ttl of a peer.
fn update_min_ttl(&self, peer_id: &PeerId, min_ttl: Instant) {
self.network_globals
.peers
.write()
.update_min_ttl(peer_id, min_ttl);
}

/// Returns true if the peer should be dialed. This checks the connection state and the
/// score state and determines if the peer manager should dial this peer.
fn should_dial(&self, peer_id: &PeerId) -> bool {
self.network_globals.peers.read().should_dial(peer_id)
}
}

impl<E: EthSpec> NetworkGlobalsWrapper for LHNetworkGlobalsWrapper<E> {
/// Returns the number of libp2p connected peers with outbound-only connections.
fn connected_outbound_only_peers(&self) -> usize {
self.connected_outbound_only_peers()
}

/// Returns the number of libp2p peers that are either connected or being dialed.
fn connected_or_dialing_peers(&self) -> usize {
self.connected_or_dialing_peers()
}

/// Update min ttl of a peer.
fn update_min_ttl(&self, peer_id: &PeerId, min_ttl: Instant) {
self.update_min_ttl(peer_id, min_ttl)
}

/// Returns true if the peer should be dialed. This checks the connection state and the
/// score state and determines if the peer manager should dial this peer.
fn should_dial(&self, peer_id: &PeerId) -> bool {
self.should_dial(peer_id)
}
}

0 comments on commit 7b58820

Please sign in to comment.