From b744fbaa5c83cd3eb62653a2f2f942bf219d426c Mon Sep 17 00:00:00 2001 From: shamil-gadelshin Date: Wed, 22 Nov 2023 20:48:43 +0700 Subject: [PATCH] Remove obsolete addresses from subspace-bootstrap-node's DHT routing table. (#2256) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * networking: Introduce minor refactoring and exporting types. * networking: Add networking manager to bootstrap-node. * networking: Rename NetworkingParametersRegistry to KnownPeersRegistry. - rename NetworkingParametersManager from subspace-bootstrap-node. * networking: Make KnownPeersRegistry mandatoryl * networking: Move KnownPeersRegistry into a separate file. * Refactor known peers registry. * networking: Refactor bootstrap-node’s known peers registry. * networking: Refactor KnownPeersManager. --- .../bin/subspace-farmer/commands/farm/dsn.rs | 21 +- crates/subspace-farmer/src/lib.rs | 4 + .../subspace-farmer/src/single_disk_farm.rs | 5 +- .../src/behavior/persistent_parameters.rs | 217 ++++++++++++------ .../subspace-networking/src/behavior/tests.rs | 44 +++- .../src/bin/subspace-bootstrap-node/main.rs | 32 ++- crates/subspace-networking/src/constructor.rs | 13 +- crates/subspace-networking/src/lib.rs | 4 +- crates/subspace-networking/src/node_runner.rs | 21 +- crates/subspace-networking/src/utils.rs | 4 +- crates/subspace-node/src/bin/subspace-node.rs | 4 +- crates/subspace-service/src/dsn.rs | 52 +++-- 12 files changed, 300 insertions(+), 121 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs index d3340b74b5..987b9f6a44 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use std::sync::{Arc, Weak}; use subspace_farmer::piece_cache::PieceCache; use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; -use subspace_farmer::{NodeClient, NodeRpcClient}; +use subspace_farmer::{NodeClient, NodeRpcClient, KNOWN_PEERS_CACHE_SIZE}; use subspace_networking::libp2p::identity::Keypair; use subspace_networking::libp2p::kad::RecordKey; use subspace_networking::libp2p::metrics::Metrics; @@ -14,9 +14,10 @@ use subspace_networking::libp2p::multiaddr::Protocol; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::utils::strip_peer_id; use subspace_networking::{ - construct, Config, KademliaMode, NetworkingParametersManager, Node, NodeRunner, PeerInfo, - PeerInfoProvider, PieceByIndexRequest, PieceByIndexRequestHandler, PieceByIndexResponse, - SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse, + construct, Config, KademliaMode, KnownPeersManager, KnownPeersManagerConfig, Node, NodeRunner, + PeerInfo, PeerInfoProvider, PieceByIndexRequest, PieceByIndexRequestHandler, + PieceByIndexResponse, SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, + SegmentHeaderResponse, }; use subspace_rpc_primitives::MAX_SEGMENT_HEADERS_PER_REQUEST; use tracing::{debug, error, info, Instrument}; @@ -50,13 +51,15 @@ pub(super) fn configure_dsn( piece_cache: PieceCache, initialize_metrics: bool, ) -> Result<(Node, NodeRunner, Registry), anyhow::Error> { - let networking_parameters_registry = NetworkingParametersManager::new( - &base_path.join("known_addresses.bin"), - strip_peer_id(bootstrap_nodes.clone()) + let networking_parameters_registry = KnownPeersManager::new(KnownPeersManagerConfig { + path: Some(base_path.join("known_addresses.bin").into_boxed_path()), + ignore_peer_list: strip_peer_id(bootstrap_nodes.clone()) .into_iter() .map(|(peer_id, _)| peer_id) .collect::>(), - ) + cache_size: KNOWN_PEERS_CACHE_SIZE, + ..Default::default() + }) .map(Box::new)?; // Metrics @@ -73,7 +76,7 @@ pub(super) fn configure_dsn( reserved_peers, listen_on, allow_non_global_addresses_in_dht: enable_private_ips, - networking_parameters_registry: Some(networking_parameters_registry), + networking_parameters_registry, request_response_protocols: vec![ PieceByIndexRequestHandler::create(move |_, &PieceByIndexRequest { piece_index }| { debug!(?piece_index, "Piece request received. Trying cache..."); diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index ae7caea319..8c5443b1c9 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -41,7 +41,11 @@ pub mod reward_signing; pub mod single_disk_farm; pub mod utils; +/// Size of the LRU cache for peers. +pub const KNOWN_PEERS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(100).expect("Not zero; qed"); + pub use identity::Identity; pub use jsonrpsee; pub use node_client::node_rpc_client::NodeRpcClient; pub use node_client::{Error as RpcClientError, NodeClient}; +use std::num::NonZeroUsize; diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 2b00d818ba..0c3b56a879 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -18,6 +18,7 @@ use crate::single_disk_farm::plotting::{ plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, }; use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop}; +use crate::KNOWN_PEERS_CACHE_SIZE; use async_lock::RwLock; use derive_more::{Display, From}; use event_listener_primitives::{Bag, HandlerId}; @@ -51,7 +52,7 @@ use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; use subspace_farmer_components::plotting::{PieceGetter, PlottedSector}; use subspace_farmer_components::sector::{sector_size, SectorMetadata, SectorMetadataChecksummed}; use subspace_farmer_components::FarmerProtocolInfo; -use subspace_networking::NetworkingParametersManager; +use subspace_networking::KnownPeersManager; use subspace_proof_of_space::Table; use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse}; use thiserror::Error; @@ -690,7 +691,7 @@ impl SingleDiskFarm { let fixed_space_usage = RESERVED_PLOT_METADATA + RESERVED_FARM_INFO + Identity::file_size() as u64 - + NetworkingParametersManager::file_size() as u64; + + KnownPeersManager::file_size(KNOWN_PEERS_CACHE_SIZE) as u64; // Calculate how many sectors can fit let target_sector_count = { let potentially_plottable_space = allocated_space.saturating_sub(fixed_space_usage) diff --git a/crates/subspace-networking/src/behavior/persistent_parameters.rs b/crates/subspace-networking/src/behavior/persistent_parameters.rs index 83f061b0f4..79f10f9d68 100644 --- a/crates/subspace-networking/src/behavior/persistent_parameters.rs +++ b/crates/subspace-networking/src/behavior/persistent_parameters.rs @@ -2,7 +2,7 @@ use crate::utils::{AsyncJoinOnDrop, CollectionBatcher, Handler, HandlerFn, PeerA use async_trait::async_trait; use event_listener_primitives::HandlerId; use fs2::FileExt; -use futures::future::Fuse; +use futures::future::{pending, Fuse}; use futures::FutureExt; use libp2p::multiaddr::Protocol; use libp2p::{Multiaddr, PeerId}; @@ -13,7 +13,7 @@ use parking_lot::Mutex; use std::collections::HashSet; use std::fs::OpenOptions; use std::io::{Read, Seek, SeekFrom}; -use std::num::NonZeroUsize; +use std::num::{NonZeroU64, NonZeroUsize}; use std::path::Path; use std::pin::Pin; use std::str::FromStr; @@ -30,7 +30,7 @@ use tracing::{debug, error, trace, warn}; type FailureTime = Option; /// Size of the LRU cache for peers. -const PEER_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(100).expect("Not zero; qed"); +const KNOWN_PEERS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(100).expect("Not zero; qed"); /// Size of the LRU cache for addresses of a single peer ID. const ADDRESSES_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(30).expect("Not zero; qed"); /// Pause duration between network parameters save. @@ -49,8 +49,6 @@ pub struct PeerAddressRemovedEvent { pub peer_id: PeerId, /// Peer address pub address: Multiaddr, - /// No address left in the permanent storage. - pub last_address: bool, } #[derive(Debug, Encode, Decode)] @@ -62,6 +60,7 @@ struct EncodableKnownPeerAddress { #[derive(Debug, Encode, Decode)] struct EncodableKnownPeers { + cache_size: NonZeroU64, timestamp: u64, // Each entry is a tuple of peer ID + list of multiaddresses with corresponding failure time known_peers: Vec<(Vec, Vec)>, @@ -69,7 +68,10 @@ struct EncodableKnownPeers { impl EncodableKnownPeers { fn into_cache(self) -> LruCache> { - let mut peers_cache = LruCache::new(PEER_CACHE_SIZE); + let mut peers_cache = LruCache::new( + NonZeroUsize::new(self.cache_size.get() as usize) + .expect("Upstream value is NoneZeroUsize"), + ); 'peers: for (peer_id, addresses) in self.known_peers { let mut peer_cache = LruCache::::new(ADDRESSES_CACHE_SIZE); @@ -107,10 +109,15 @@ impl EncodableKnownPeers { peers_cache } - fn from_cache(cache: &LruCache>) -> Self { + fn from_cache( + cache: &LruCache>, + cache_size: NonZeroUsize, + ) -> Self { let single_peer_encoded_address_size = - NetworkingParametersManager::single_peer_encoded_address_size(); + KnownPeersManager::single_peer_encoded_address_size(); Self { + cache_size: NonZeroU64::new(cache_size.get() as u64) + .expect("Getting the value from another NonZero type"), timestamp: SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("Never before Unix epoch; qed") @@ -180,7 +187,7 @@ impl KnownPeersSlots { /// Defines operations with the networking parameters. #[async_trait] -pub trait NetworkingParametersRegistry: Send + Sync { +pub trait KnownPeersRegistry: Send + Sync { /// Registers a peer ID and associated addresses async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec); @@ -217,13 +224,13 @@ pub(crate) struct StubNetworkingParametersManager; impl StubNetworkingParametersManager { /// Returns an instance of `StubNetworkingParametersManager` as the `Box` reference. - pub fn boxed(self) -> Box { + pub fn boxed(self) -> Box { Box::new(self) } } #[async_trait] -impl NetworkingParametersRegistry for StubNetworkingParametersManager { +impl KnownPeersRegistry for StubNetworkingParametersManager { async fn add_known_peer(&mut self, _: PeerId, _: Vec) {} async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec) {} @@ -247,9 +254,40 @@ impl NetworkingParametersRegistry for StubNetworkingParametersManager { } } +/// Configuration for [`KnownPeersManager`]. +#[derive(Debug, Clone)] +pub struct KnownPeersManagerConfig { + /// Defines whether we return known peers batches on next_known_addresses_batch(). + pub enable_known_peers_source: bool, + /// Defines cache size. + pub cache_size: NonZeroUsize, + /// Peer ID list to filter on address adding. + pub ignore_peer_list: HashSet, + /// Defines whether we enable cache persistence. + pub path: Option>, + /// Defines interval before the next peer address removes entry from the cache. + pub failed_address_cache_removal_interval: Duration, + /// Defines interval before the next peer address removal triggers [`PeerAddressRemovedEvent`]. + pub failed_address_kademlia_removal_interval: Duration, +} + +impl Default for KnownPeersManagerConfig { + fn default() -> Self { + Self { + enable_known_peers_source: true, + cache_size: KNOWN_PEERS_CACHE_SIZE, + ignore_peer_list: Default::default(), + path: None, + failed_address_cache_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD_SECS, + failed_address_kademlia_removal_interval: + REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA_SECS, + } + } +} + /// Networking parameters persistence errors. #[derive(Debug, Error)] -pub enum NetworkParametersPersistenceError { +pub enum KnownPeersManagerPersistenceError { /// I/O error. #[error("I/O error: {0}")] Io(#[from] io::Error), @@ -259,48 +297,54 @@ pub enum NetworkParametersPersistenceError { } /// Handles networking parameters. It manages network parameters set and its persistence. -pub struct NetworkingParametersManager { - // Defines whether the cache requires saving to DB +pub struct KnownPeersManager { + /// Defines whether the cache requires saving to DB cache_need_saving: bool, - // LRU cache for the known peers and their addresses + /// LRU cache for the known peers and their addresses known_peers: LruCache>, - // Period between networking parameters saves. + /// Period between networking parameters saves. networking_parameters_save_delay: Pin>>, /// Slots backed by file that store known peers - known_peers_slots: Arc>, - // Provides batching capabilities for the address collection (it stores the last batch index) + known_peers_slots: Option>>, + /// Provides batching capabilities for the address collection (it stores the last batch index) collection_batcher: CollectionBatcher, - // Event handler triggered when we decide to remove address from the storage. + /// Event handler triggered when we decide to remove address from the storage. address_removed: Handler, - // Peer ID list to filter on address adding. - ignore_peer_list: HashSet, + /// Defines configuration. + config: KnownPeersManagerConfig, } -impl Drop for NetworkingParametersManager { +impl Drop for KnownPeersManager { fn drop(&mut self) { if self.cache_need_saving { - self.known_peers_slots - .lock() - .write_to_inactive_slot(&EncodableKnownPeers::from_cache(&self.known_peers)); + if let Some(known_peers_slots) = &self.known_peers_slots { + known_peers_slots + .lock() + .write_to_inactive_slot(&EncodableKnownPeers::from_cache( + &self.known_peers, + self.config.cache_size, + )); + } } } } -impl NetworkingParametersManager { - /// Object constructor. It accepts `NetworkingParametersProvider` implementation as a parameter. - /// On object creation it starts a job for networking parameters cache handling. - pub fn new( +impl KnownPeersManager { + fn init_file( path: &Path, - ignore_peer_list: HashSet, - ) -> Result { + cache_size: NonZeroUsize, + ) -> Result< + (Option, Arc>), + KnownPeersManagerPersistenceError, + > { let mut file = OpenOptions::new() .read(true) .write(true) .create(true) .open(path)?; - let known_addresses_size = Self::known_addresses_size(); - let file_size = Self::file_size(); + let known_addresses_size = Self::known_addresses_size(cache_size); + let file_size = Self::file_size(cache_size); // Try reading existing encoded known peers from file let mut maybe_newest_known_addresses = None::; @@ -361,7 +405,7 @@ impl NetworkingParametersManager { // Allocating the whole file (`set_len` below can create a sparse file, which will cause // writes to fail later) file.allocate(file_size as u64) - .map_err(NetworkParametersPersistenceError::CantPreallocateKnownPeersFile)?; + .map_err(KnownPeersManagerPersistenceError::CantPreallocateKnownPeersFile)?; // Truncating file (if necessary) file.set_len(file_size as u64)?; true @@ -392,24 +436,38 @@ impl NetworkingParametersManager { } } + let known_peers_slots = Arc::new(Mutex::new(KnownPeersSlots { + a: a_mmap, + b: b_mmap, + })); + + Ok((maybe_newest_known_addresses, known_peers_slots)) + } + + /// Object constructor. + pub fn new(config: KnownPeersManagerConfig) -> Result { + let (maybe_newest_known_addresses, known_peers_slots) = if let Some(path) = &config.path { + Self::init_file(path, config.cache_size) + .map(|(known_addresses, slots)| (known_addresses, Some(slots)))? + } else { + (None, None) + }; + let known_peers = maybe_newest_known_addresses .map(EncodableKnownPeers::into_cache) - .unwrap_or_else(|| LruCache::new(PEER_CACHE_SIZE)); + .unwrap_or_else(|| LruCache::new(config.cache_size)); Ok(Self { cache_need_saving: false, known_peers, networking_parameters_save_delay: Self::default_delay(), - known_peers_slots: Arc::new(Mutex::new(KnownPeersSlots { - a: a_mmap, - b: b_mmap, - })), + known_peers_slots, collection_batcher: CollectionBatcher::new( NonZeroUsize::new(PEERS_ADDRESSES_BATCH_SIZE) .expect("Manual non-zero initialization failed."), ), address_removed: Default::default(), - ignore_peer_list, + config, }) } @@ -424,13 +482,13 @@ impl NetworkingParametersManager { } /// Size of the backing file on disk - pub fn file_size() -> usize { + pub fn file_size(cache_size: NonZeroUsize) -> usize { // *2 because we have a/b parts of the file - Self::known_addresses_size() * 2 + Self::known_addresses_size(cache_size) * 2 } /// Creates a reference to the `NetworkingParametersRegistry` trait implementation. - pub fn boxed(self) -> Box { + pub fn boxed(self) -> Box { Box::new(self) } @@ -464,20 +522,32 @@ impl NetworkingParametersManager { /// /// NOTE: This is max size that needs to be allocated on disk for successful write of a single /// `known_addresses` copy, the actual written data can occupy only a part of this size - fn known_addresses_size() -> usize { + fn known_addresses_size(cache_size: NonZeroUsize) -> usize { // Timestamp (when was written) + compact encoding of the length of peer records + peer // records + checksum mem::size_of::() - + Compact::compact_len(&(PEER_CACHE_SIZE.get() as u32)) - + Self::single_peer_encoded_size() * PEER_CACHE_SIZE.get() + + Compact::compact_len(&(cache_size.get() as u32)) + + Self::single_peer_encoded_size() * cache_size.get() + mem::size_of::() } + + fn persistent_enabled(&self) -> bool { + self.config.path.is_some() + } + + #[cfg(test)] + pub(crate) fn contains_address(&self, peer_id: &PeerId, address: &Multiaddr) -> bool { + self.known_peers + .peek(peer_id) + .map(|addresses| addresses.peek(address).is_some()) + .unwrap_or_default() + } } #[async_trait] -impl NetworkingParametersRegistry for NetworkingParametersManager { +impl KnownPeersRegistry for KnownPeersManager { async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec) { - if self.ignore_peer_list.contains(&peer_id) { + if self.config.ignore_peer_list.contains(&peer_id) { debug!( %peer_id, addr_num=addresses.len(), @@ -529,8 +599,8 @@ impl NetworkingParametersRegistry for NetworkingParametersManager { &mut self.known_peers, peer_id, addresses, - REMOVE_KNOWN_PEERS_GRACE_PERIOD_SECS, - REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA_SECS, + self.config.failed_address_cache_removal_interval, + self.config.failed_address_kademlia_removal_interval, ); for event in removed_addresses { @@ -549,6 +619,10 @@ impl NetworkingParametersRegistry for NetworkingParametersManager { } async fn next_known_addresses_batch(&mut self) -> Vec { + if !self.config.enable_known_peers_source { + return Vec::new(); + } + // We take cached known addresses and combine them with manually provided bootstrap addresses. let combined_addresses = self.known_addresses().await.into_iter().collect::>(); @@ -561,31 +635,41 @@ impl NetworkingParametersRegistry for NetworkingParametersManager { } fn start_over_address_batching(&mut self) { + if !self.config.enable_known_peers_source { + return; + } + self.collection_batcher.reset(); } async fn run(&mut self) { + if !self.persistent_enabled() { + pending().await + } + loop { (&mut self.networking_parameters_save_delay).await; - if self.cache_need_saving { - let known_peers = EncodableKnownPeers::from_cache(&self.known_peers); - let known_peers_slots = Arc::clone(&self.known_peers_slots); - let write_known_peers_fut = - AsyncJoinOnDrop::new(tokio::task::spawn_blocking(move || { - known_peers_slots - .lock() - .write_to_inactive_slot(&known_peers); - })); - - if let Err(error) = write_known_peers_fut.await { - error!(%error, "Failed to write known peers"); - } + if let Some(known_peers_slots) = &self.known_peers_slots { + if self.cache_need_saving { + let known_peers = + EncodableKnownPeers::from_cache(&self.known_peers, self.config.cache_size); + let known_peers_slots = Arc::clone(known_peers_slots); + let write_known_peers_fut = + AsyncJoinOnDrop::new(tokio::task::spawn_blocking(move || { + known_peers_slots + .lock() + .write_to_inactive_slot(&known_peers); + })); + + if let Err(error) = write_known_peers_fut.await { + error!(%error, "Failed to write known peers"); + } - self.cache_need_saving = false; + self.cache_need_saving = false; + } } - - self.networking_parameters_save_delay = NetworkingParametersManager::default_delay(); + self.networking_parameters_save_delay = KnownPeersManager::default_delay(); } } @@ -656,7 +740,6 @@ pub(super) fn remove_known_peer_addresses_internal( let address_removed = PeerAddressRemovedEvent{ peer_id, address: addr.clone(), - last_address }; address_removed_events.push(address_removed); diff --git a/crates/subspace-networking/src/behavior/tests.rs b/crates/subspace-networking/src/behavior/tests.rs index c3c0b84874..34db7acb6d 100644 --- a/crates/subspace-networking/src/behavior/tests.rs +++ b/crates/subspace-networking/src/behavior/tests.rs @@ -1,6 +1,9 @@ use super::persistent_parameters::remove_known_peer_addresses_internal; use crate::behavior::persistent_parameters::{append_p2p_suffix, remove_p2p_suffix}; -use crate::{Config, GenericRequest, GenericRequestHandler}; +use crate::{ + Config, GenericRequest, GenericRequestHandler, KnownPeersManager, KnownPeersManagerConfig, + KnownPeersRegistry, +}; use futures::channel::oneshot; use futures::future::pending; use libp2p::multiaddr::Protocol; @@ -277,3 +280,42 @@ async fn test_address_p2p_prefix_addition() { assert_eq!(append_p2p_suffix(peer_id, long_addr.clone()), long_addr); assert_eq!(append_p2p_suffix(peer_id, short_addr.clone()), long_addr); } + +#[tokio::test()] +async fn test_known_peers_removal_address_after_specified_interval() { + let config = KnownPeersManagerConfig { + enable_known_peers_source: false, + cache_size: NonZeroUsize::new(100).unwrap(), + ignore_peer_list: Default::default(), + path: None, + failed_address_cache_removal_interval: Duration::from_millis(100), + ..Default::default() + }; + let mut known_peers = KnownPeersManager::new(config).unwrap(); + let peer_id = PeerId::random(); + let mut address = Multiaddr::empty(); + address.push(Protocol::Tcp(10)); + + known_peers + .add_known_peer(peer_id, vec![address.clone()]) + .await; + + // We added address successfully. + assert!(known_peers.contains_address(&peer_id, &address)); + + known_peers + .remove_known_peer_addresses(peer_id, vec![address.clone()]) + .await; + + // We didn't remove address instantly. + assert!(known_peers.contains_address(&peer_id, &address)); + + sleep(Duration::from_millis(110)).await; + + known_peers + .remove_known_peer_addresses(peer_id, vec![address.clone()]) + .await; + + // We removed address after the configured interval. + assert!(!known_peers.contains_address(&peer_id, &address)); +} diff --git a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs index f15889510a..649db9ca41 100644 --- a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs +++ b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs @@ -1,6 +1,6 @@ //! Simple bootstrap node implementation -#![feature(type_changing_struct_update)] +#![feature(const_option, type_changing_struct_update)] use clap::Parser; use futures::{select, FutureExt}; @@ -10,18 +10,30 @@ use libp2p::metrics::Metrics; use libp2p::{identity, Multiaddr, PeerId}; use prometheus_client::registry::Registry; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::error::Error; use std::fmt::{Display, Formatter}; use std::net::SocketAddr; +use std::num::NonZeroUsize; use std::sync::Arc; +use std::time::Duration; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::libp2p::multiaddr::Protocol; -use subspace_networking::{peer_id, Config, KademliaMode}; +use subspace_networking::utils::strip_peer_id; +use subspace_networking::{ + peer_id, Config, KademliaMode, KnownPeersManager, KnownPeersManagerConfig, +}; use tracing::{debug, info, Level}; use tracing_subscriber::fmt::Subscriber; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; +/// Defines an expiration period for the peer marked for the removal for Kademlia DHT. +const REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA_SECS: Duration = Duration::from_secs(3600); + +/// Size of the LRU cache for peers. +pub const KNOWN_PEERS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(10000).expect("Not zero; qed"); + #[derive(Debug, Parser)] #[clap(about, version)] enum Command { @@ -155,6 +167,21 @@ async fn main() -> Result<(), Box> { }) .transpose()?; + let known_peers_registry_config = KnownPeersManagerConfig { + enable_known_peers_source: false, + cache_size: KNOWN_PEERS_CACHE_SIZE, + ignore_peer_list: strip_peer_id(bootstrap_nodes.clone()) + .into_iter() + .map(|(peer_id, _)| peer_id) + .collect::>(), + path: None, + failed_address_kademlia_removal_interval: + REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA_SECS, + failed_address_cache_removal_interval: + REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA_SECS, + }; + let known_peers_registry = KnownPeersManager::new(known_peers_registry_config)?; + let config = Config { listen_on, allow_non_global_addresses_in_dht: enable_private_ips, @@ -173,6 +200,7 @@ async fn main() -> Result<(), Box> { kademlia_mode: KademliaMode::Static(Mode::Server), external_addresses, metrics, + networking_parameters_registry: known_peers_registry.boxed(), ..Config::new(protocol_version.to_string(), keypair, (), None) }; diff --git a/crates/subspace-networking/src/constructor.rs b/crates/subspace-networking/src/constructor.rs index 3f0d3dcc81..a2737302a2 100644 --- a/crates/subspace-networking/src/constructor.rs +++ b/crates/subspace-networking/src/constructor.rs @@ -1,9 +1,7 @@ pub(crate) mod temporary_bans; mod transport; -use crate::behavior::persistent_parameters::{ - NetworkingParametersRegistry, StubNetworkingParametersManager, -}; +use crate::behavior::persistent_parameters::{KnownPeersRegistry, StubNetworkingParametersManager}; use crate::behavior::{Behavior, BehaviorConfig}; use crate::constructor::temporary_bans::TemporaryBans; use crate::constructor::transport::build_transport; @@ -222,8 +220,8 @@ pub struct Config { pub allow_non_global_addresses_in_dht: bool, /// How frequently should random queries be done using Kademlia DHT to populate routing table. pub initial_random_query_interval: Duration, - /// A reference to the `NetworkingParametersRegistry` implementation (optional). - pub networking_parameters_registry: Option>, + /// A reference to the `NetworkingParametersRegistry` implementation. + pub networking_parameters_registry: Box, /// The configuration for the `RequestResponsesBehaviour` protocol. pub request_response_protocols: Vec>, /// Defines set of peers with a permanent connection (and reconnection if necessary). @@ -371,7 +369,7 @@ where local_records_provider, allow_non_global_addresses_in_dht: false, initial_random_query_interval: Duration::from_secs(1), - networking_parameters_registry: None, + networking_parameters_registry: StubNetworkingParametersManager.boxed(), request_response_protocols: Vec::new(), yamux_config, reserved_peers: Vec::new(), @@ -638,8 +636,7 @@ where swarm, shared_weak, next_random_query_interval: initial_random_query_interval, - networking_parameters_registry: networking_parameters_registry - .unwrap_or(StubNetworkingParametersManager.boxed()), + networking_parameters_registry, reserved_peers: strip_peer_id(reserved_peers).into_iter().collect(), temporary_bans, metrics, diff --git a/crates/subspace-networking/src/lib.rs b/crates/subspace-networking/src/lib.rs index 7dfb5cf543..06200bc99c 100644 --- a/crates/subspace-networking/src/lib.rs +++ b/crates/subspace-networking/src/lib.rs @@ -28,7 +28,8 @@ mod shared; pub mod utils; pub use crate::behavior::persistent_parameters::{ - NetworkParametersPersistenceError, NetworkingParametersManager, + KnownPeersManager, KnownPeersManagerConfig, KnownPeersManagerPersistenceError, + KnownPeersRegistry, PeerAddressRemovedEvent, }; pub use crate::node::{ GetClosestPeersError, Node, SendRequestError, SubscribeError, TopicSubscription, @@ -53,3 +54,4 @@ pub use protocols::request_response::handlers::segment_header::{ pub use shared::{NewPeerInfo, PeerDiscovered}; pub use utils::multihash::Multihash; pub use utils::unique_record_binary_heap::{KeyWrapper, UniqueRecordBinaryHeap}; +pub use utils::PeerAddress; diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 940f36fe5b..df64166809 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -1,5 +1,5 @@ use crate::behavior::persistent_parameters::{ - append_p2p_suffix, remove_p2p_suffix, NetworkingParametersRegistry, PeerAddressRemovedEvent, + append_p2p_suffix, remove_p2p_suffix, KnownPeersRegistry, PeerAddressRemovedEvent, PEERS_ADDRESSES_BATCH_SIZE, }; use crate::behavior::{ @@ -120,7 +120,7 @@ where /// Defines an interval between periodical tasks. periodical_tasks_interval: Pin>>, /// Manages the networking parameters like known peers and addresses - networking_parameters_registry: Box, + networking_parameters_registry: Box, /// Defines set of peers with a permanent connection (and reconnection if necessary). reserved_peers: HashMap, /// Temporarily banned peers. @@ -162,7 +162,7 @@ where pub(crate) swarm: Swarm>>, pub(crate) shared_weak: Weak, pub(crate) next_random_query_interval: Duration, - pub(crate) networking_parameters_registry: Box, + pub(crate) networking_parameters_registry: Box, pub(crate) reserved_peers: HashMap, pub(crate) temporary_bans: Arc>, pub(crate) metrics: Option, @@ -398,6 +398,21 @@ where fn handle_removed_address_event(&mut self, event: PeerAddressRemovedEvent) { trace!(?event, "Peer addressed removed event.",); + let bootstrap_node_ids = strip_peer_id(self.bootstrap_addresses.clone()) + .into_iter() + .map(|(peer_id, _)| peer_id) + .collect::>(); + + if bootstrap_node_ids.contains(&event.peer_id) { + debug!( + ?event, + ?bootstrap_node_ids, + "Skipped removing bootstrap node from Kademlia buckets." + ); + + return; + } + // Remove both versions of the address self.swarm.behaviour_mut().kademlia.remove_address( &event.peer_id, diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index dec1013d51..b039a9c0c8 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -113,8 +113,8 @@ impl CollectionBatcher { } } -// Convenience alias for peer ID and its multiaddresses. -pub(crate) type PeerAddress = (PeerId, Multiaddr); +/// Convenience alias for peer ID and its multiaddresses. +pub type PeerAddress = (PeerId, Multiaddr); /// Helper function. Converts multiaddresses to a tuple with peer ID removing the peer Id suffix. /// It logs incorrect multiaddresses. diff --git a/crates/subspace-node/src/bin/subspace-node.rs b/crates/subspace-node/src/bin/subspace-node.rs index b822c6e66d..6b0f0d392c 100644 --- a/crates/subspace-node/src/bin/subspace-node.rs +++ b/crates/subspace-node/src/bin/subspace-node.rs @@ -501,9 +501,7 @@ fn main() -> Result<(), Error> { DsnConfig { keypair, - base_path: cli.run.base_path()?.map(|base_path| { - base_path.config_dir(consensus_chain_config.chain_spec.id()) - }), + base_path: consensus_chain_config.base_path.path().into(), listen_on: cli.dsn_listen_on, bootstrap_nodes: dsn_bootstrap_nodes, reserved_peers: cli.dsn_reserved_peers, diff --git a/crates/subspace-service/src/dsn.rs b/crates/subspace-service/src/dsn.rs index a93e0eeb80..7b6b763869 100644 --- a/crates/subspace-service/src/dsn.rs +++ b/crates/subspace-service/src/dsn.rs @@ -3,6 +3,7 @@ use sc_client_api::AuxStore; use sc_consensus_subspace::archiver::SegmentHeadersStore; use std::collections::HashSet; use std::fs; +use std::num::NonZeroUsize; use std::path::PathBuf; use std::sync::Arc; use subspace_core_primitives::{SegmentHeader, SegmentIndex}; @@ -11,9 +12,10 @@ use subspace_networking::libp2p::metrics::Metrics; use subspace_networking::libp2p::{identity, Multiaddr}; use subspace_networking::utils::strip_peer_id; use subspace_networking::{ - CreationError, KademliaMode, NetworkParametersPersistenceError, NetworkingParametersManager, - Node, NodeRunner, PeerInfoProvider, PieceByIndexRequestHandler, - SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse, + CreationError, KademliaMode, KnownPeersManager, KnownPeersManagerConfig, + KnownPeersManagerPersistenceError, Node, NodeRunner, PeerInfoProvider, + PieceByIndexRequestHandler, SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, + SegmentHeaderResponse, }; use thiserror::Error; use tracing::{debug, error, trace}; @@ -22,6 +24,9 @@ const SEGMENT_HEADERS_NUMBER_LIMIT: u64 = 1000; /// Should be sufficient number of target connections for everyone, limits are higher const TARGET_CONNECTIONS: u32 = 15; +/// Size of the LRU cache for peers. +pub const KNOWN_PEERS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(100).expect("Not zero; qed"); + /// Errors that might happen during DSN configuration. #[derive(Debug, Error)] pub enum DsnConfigurationError { @@ -30,7 +35,7 @@ pub enum DsnConfigurationError { CreationError(#[from] CreationError), /// Network parameter manager error. #[error("Network parameter manager error: {0}")] - NetworkParameterManagerError(#[from] NetworkParametersPersistenceError), + NetworkParameterManagerError(#[from] KnownPeersManagerPersistenceError), } /// DSN configuration parameters. @@ -52,7 +57,7 @@ pub struct DsnConfig { pub allow_non_global_addresses_in_dht: bool, /// System base path. - pub base_path: Option, + pub base_path: PathBuf, /// Defines max established incoming swarm connection limit. pub max_in_connections: u32, @@ -87,25 +92,26 @@ where let mut metric_registry = Registry::default(); let metrics = enable_metrics.then(|| Metrics::new(&mut metric_registry)); - let networking_parameters_registry = dsn_config - .base_path - .map(|path| { - // TODO: Remove this in the future after enough upgrade time that this no longer exist - if path.join("known_addresses_db").is_dir() { - let _ = fs::remove_file(path.join("known_addresses_db")); - } - let file_path = path.join("known_addresses.bin"); - - NetworkingParametersManager::new( - &file_path, - strip_peer_id(dsn_config.bootstrap_nodes.clone()) - .into_iter() - .map(|(peer_id, _)| peer_id) - .collect::>(), - ) - .map(NetworkingParametersManager::boxed) + let networking_parameters_registry = { + let path = dsn_config.base_path; + + // TODO: Remove this in the future after enough upgrade time that this no longer exist + if path.join("known_addresses_db").is_dir() { + let _ = fs::remove_file(path.join("known_addresses_db")); + } + let file_path = path.join("known_addresses.bin"); + + KnownPeersManager::new(KnownPeersManagerConfig { + path: Some(file_path.into_boxed_path()), + ignore_peer_list: strip_peer_id(dsn_config.bootstrap_nodes.clone()) + .into_iter() + .map(|(peer_id, _)| peer_id) + .collect::>(), + cache_size: KNOWN_PEERS_CACHE_SIZE, + ..Default::default() }) - .transpose()?; + .map(KnownPeersManager::boxed)? + }; let keypair = dsn_config.keypair.clone(); let default_networking_config = subspace_networking::Config::new(