Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

litep2p: Update network backend to v0.7.0 #5609

Merged
merged 19 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
384 changes: 117 additions & 267 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ linked-hash-map = { version = "0.5.4" }
linked_hash_set = { version = "0.1.4" }
linregress = { version = "0.5.1" }
lite-json = { version = "0.2.0", default-features = false }
litep2p = { version = "0.6.2" }
litep2p = { version = "0.7.0", default-features = false }
lexnv marked this conversation as resolved.
Show resolved Hide resolved
log = { version = "0.4.22", default-features = false }
macro_magic = { version = "0.5.1" }
maplit = { version = "1.0.2" }
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ sp-blockchain = { workspace = true, default-features = true }
sp-core = { workspace = true, default-features = true }
sp-runtime = { workspace = true, default-features = true }
wasm-timer = { workspace = true }
litep2p = { workspace = true }
litep2p = { workspace = true, features = ["websocket"] }
lexnv marked this conversation as resolved.
Show resolved Hide resolved
once_cell = { workspace = true }
void = { workspace = true }
schnellru = { workspace = true }
Expand Down
10 changes: 5 additions & 5 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ impl Discovery {
) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option<MdnsConfig>) {
let (ping_config, ping_event_stream) = PingConfig::default();
let user_agent = format!("{} ({})", config.client_version, config.node_name);
let (identify_config, identify_event_stream) = IdentifyConfig::new(
"/substrate/1.0".to_string(),
Some(user_agent),
config.public_addresses.clone().into_iter().map(Into::into).collect(),
);

// The public addresses are now are added on the litep2p object instead,
// using `litep2p.public_addresses` object.
lexnv marked this conversation as resolved.
Show resolved Hide resolved
let (identify_config, identify_event_stream) =
IdentifyConfig::new("/substrate/1.0".to_string(), Some(user_agent));

let (mdns_config, mdns_event_stream) = match config.transport {
crate::config::TransportConfig::Normal { enable_mdns, .. } => match enable_mdns {
Expand Down
72 changes: 50 additions & 22 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use libp2p::kad::{PeerRecord, Record as P2PRecord, RecordKey};
use litep2p::{
config::ConfigBuilder,
crypto::ed25519::Keypair,
error::DialError,
executor::Executor,
protocol::{
libp2p::{
Expand All @@ -64,15 +65,14 @@ use litep2p::{
},
transport::{
tcp::config::Config as TcpTransportConfig,
websocket::config::Config as WebSocketTransportConfig, Endpoint,
websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
},
types::{
multiaddr::{Multiaddr, Protocol},
ConnectionId,
},
Error as Litep2pError, Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
};
use parking_lot::RwLock;
use prometheus_endpoint::Registry;

use sc_client_api::BlockBackend;
Expand Down Expand Up @@ -183,9 +183,6 @@ pub struct Litep2pNetworkBackend {

/// Prometheus metrics.
metrics: Option<Metrics>,

/// External addresses.
external_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
}

impl Litep2pNetworkBackend {
Expand Down Expand Up @@ -557,6 +554,12 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
.with_libp2p_ping(ping_config)
.with_libp2p_identify(identify_config)
.with_libp2p_kademlia(kademlia_config)
.with_connection_limits(
// By default litep2p accepts only two connections per peer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need multiple connections per peer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Litep2p accepts at most 2 connections with the remote peer to handle cases when both peers dial each other simultaneously. For libp2p, we configure this manually to 2 connections for similar reasons:

/// The maximum allowed number of established connections per peer.
///
/// Typically, and by design of the network behaviours in this crate,
/// there is a single established connection per peer. However, to
/// avoid unnecessary and nondeterministic connection closure in
/// case of (possibly repeated) simultaneous dialing attempts between
/// two peers, the per-peer connection limit is not set to 1 but 2.
const MAX_CONNECTIONS_PER_PEER: usize = 2;

ConnectionLimitsConfig::default().max_incoming_connections(Some(
crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize,
)),
)
.with_executor(executor);

if let Some(config) = maybe_mdns_config {
Expand All @@ -570,15 +573,22 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
let litep2p =
Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;

let external_addresses: Arc<RwLock<HashSet<Multiaddr>>> = Arc::new(RwLock::new(
HashSet::from_iter(network_config.public_addresses.iter().cloned().map(Into::into)),
));
litep2p.listen_addresses().for_each(|address| {
log::debug!(target: LOG_TARGET, "listening on: {address}");

listen_addresses.write().insert(address.clone());
});

let public_addresses = litep2p.public_addresses();
for address in network_config.public_addresses.iter() {
if let Err(err) = public_addresses.add_address(address.clone().into()) {
log::warn!(
target: LOG_TARGET,
"failed to add public address {address:?}: {err:?}",
);
}
}

let network_service = Arc::new(Litep2pNetworkService::new(
local_peer_id,
keypair.clone(),
Expand All @@ -588,7 +598,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
block_announce_protocol.clone(),
request_response_senders,
Arc::clone(&listen_addresses),
Arc::clone(&external_addresses),
public_addresses,
));

// register rest of the metrics now that `Litep2p` has been created
Expand All @@ -614,7 +624,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
event_streams: out_events::OutChannels::new(None)?,
peers: HashMap::new(),
litep2p,
external_addresses,
})
}

Expand Down Expand Up @@ -917,9 +926,12 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
}
Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
let mut addresses = self.external_addresses.write();

if addresses.insert(address.clone()) {
if let Err(err) = self.litep2p.public_addresses().add_address(address.clone().into()) {
log::warn!(
target: LOG_TARGET,
"🔍 Failed to add discovered external address {address:?}: {err:?}",
);
} else {
log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
}
}
Expand Down Expand Up @@ -1006,20 +1018,36 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
}
}
Some(Litep2pEvent::DialFailure { address, error }) => {
log::trace!(
log::debug!(
target: LOG_TARGET,
"failed to dial peer at {address:?}: {error:?}",
);

let reason = match error {
Litep2pError::PeerIdMismatch(_, _) => "invalid-peer-id",
Litep2pError::Timeout | Litep2pError::TransportError(_) |
Litep2pError::IoError(_) | Litep2pError::WebSocket(_) => "transport-error",
_ => "other",
};
if let Some(metrics) = &self.metrics {
let reason = match error {
DialError::Timeout => "timeout".to_string(),
DialError::AddressError(_) => "invalid-address".to_string(),
DialError::DnsError(_) => "cannot-resolve-dns".to_string(),
DialError::NegotiationError(error) => match error {
litep2p::error::NegotiationError::Timeout => "timeout".to_string(),
litep2p::error::NegotiationError::PeerIdMissing => "missing-peer-id".to_string(),
litep2p::error::NegotiationError::StateMismatch => "state-mismatch".to_string(),
litep2p::error::NegotiationError::PeerIdMismatch(_, _) => "peer-id-missmatch".to_string(),
error => format!("negotiation-error-{:?}", error),
}
};

metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
}
}
Some(Litep2pEvent::ListDialFailures { errors }) => {
log::debug!(
target: LOG_TARGET,
"failed to dial peer on multiple addresses {errors:?}",
);

if let Some(metrics) = &self.metrics {
metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
}
}
_ => {}
Expand Down
16 changes: 9 additions & 7 deletions substrate/client/network/src/litep2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use crate::litep2p::Record;
use codec::DecodeAll;
use futures::{channel::oneshot, stream::BoxStream};
use libp2p::{identity::SigningError, kad::record::Key as KademliaKey};
use litep2p::{crypto::ed25519::Keypair, types::multiaddr::Multiaddr as LiteP2pMultiaddr};
use litep2p::{
addresses::PublicAddresses, crypto::ed25519::Keypair,
types::multiaddr::Multiaddr as LiteP2pMultiaddr,
};
use parking_lot::RwLock;

use sc_network_common::{
Expand Down Expand Up @@ -196,7 +199,7 @@ pub struct Litep2pNetworkService {
listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,

/// External addresses.
external_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
external_addresses: PublicAddresses,
}

impl Litep2pNetworkService {
Expand All @@ -210,7 +213,7 @@ impl Litep2pNetworkService {
block_announce_protocol: ProtocolName,
request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
external_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
external_addresses: PublicAddresses,
) -> Self {
Self {
local_peer_id,
Expand Down Expand Up @@ -323,9 +326,8 @@ impl NetworkStatusProvider for Litep2pNetworkService {
.collect(),
external_addresses: self
.external_addresses
.read()
.iter()
.cloned()
.get_addresses()
.into_iter()
.map(|a| Multiaddr::from(a).into())
.collect(),
connected_peers: HashMap::new(),
Expand Down Expand Up @@ -491,7 +493,7 @@ impl NetworkEventStream for Litep2pNetworkService {

impl NetworkStateInfo for Litep2pNetworkService {
fn external_addresses(&self) -> Vec<Multiaddr> {
self.external_addresses.read().iter().cloned().map(Into::into).collect()
self.external_addresses.get_addresses().into_iter().map(Into::into).collect()
}

fn listen_addresses(&self) -> Vec<Multiaddr> {
Expand Down
41 changes: 33 additions & 8 deletions substrate/client/network/src/litep2p/shim/request_response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ use crate::{

use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, StreamExt};
use litep2p::{
error::ImmediateDialError,
protocol::request_response::{
DialOptions, RequestResponseError, RequestResponseEvent, RequestResponseHandle,
DialOptions, RejectReason, RequestResponseError, RequestResponseEvent,
RequestResponseHandle,
},
types::RequestId,
};
Expand Down Expand Up @@ -371,10 +373,33 @@ impl RequestResponseProtocol {

let status = match error {
RequestResponseError::NotConnected =>
Some((RequestFailure::NotConnected, "not-connected")),
RequestResponseError::Rejected => Some((RequestFailure::Refused, "rejected")),
Some((RequestFailure::NotConnected, "not-connected".to_string())),
RequestResponseError::Rejected(reason) => {
let reason = match reason {
RejectReason::ConnectionClosed => "connection-closed".to_string(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also better to use a Cow<'static, str> here. (Less allocations)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed the allocations entirely, since we don't really need to expose each variant of format!("substream-open-error: {:?}", substream_error)

RejectReason::SubstreamClosed => "substream-closed".to_string(),
RejectReason::SubstreamOpenError(substream_error) => {
format!("substream-open-error: {:?}", substream_error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want this as a metric name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's a good point, have remove the String allocation entirely and provided shorter str names, thanks 🙏

},
RejectReason::DialFailed(None) => "dial-failed".to_string(),
RejectReason::DialFailed(Some(ImmediateDialError::AlreadyConnected)) =>
"dial-already-connected".to_string(),
RejectReason::DialFailed(Some(ImmediateDialError::PeerIdMissing)) =>
"dial-peerid-missing".to_string(),
RejectReason::DialFailed(Some(ImmediateDialError::TriedToDialSelf)) =>
"dial-tried-to-dial-self".to_string(),
RejectReason::DialFailed(Some(ImmediateDialError::NoAddressAvailable)) =>
"dial-no-address-available".to_string(),
RejectReason::DialFailed(Some(ImmediateDialError::TaskClosed)) =>
"dial-task-closed".to_string(),
RejectReason::DialFailed(Some(ImmediateDialError::ChannelClogged)) =>
"dial-channel-clogged".to_string(),
};

Some((RequestFailure::Refused, reason))
},
RequestResponseError::Timeout =>
Some((RequestFailure::Network(OutboundFailure::Timeout), "timeout")),
Some((RequestFailure::Network(OutboundFailure::Timeout), "timeout".to_string())),
RequestResponseError::Canceled => {
log::debug!(
target: LOG_TARGET,
Expand All @@ -389,7 +414,7 @@ impl RequestResponseProtocol {
"{}: tried to send too large request to {peer:?} ({request_id:?})",
self.protocol,
);
Some((RequestFailure::Refused, "payload-too-large"))
Some((RequestFailure::Refused, "payload-too-large".to_string()))
},
RequestResponseError::UnsupportedProtocol => match fallback_request {
Some((request, protocol)) => match self.request_tx.get(&protocol) {
Expand Down Expand Up @@ -428,15 +453,15 @@ impl RequestResponseProtocol {
peer,
);

Some((RequestFailure::Refused, "invalid-fallback-protocol"))
Some((RequestFailure::Refused, "invalid-fallback-protocol".to_string()))
},
},
None => Some((RequestFailure::Refused, "unsupported-protocol")),
None => Some((RequestFailure::Refused, "unsupported-protocol".to_string())),
},
};

if let Some((error, reason)) = status {
self.metrics.register_outbound_request_failure(reason);
self.metrics.register_outbound_request_failure(reason.as_str());
let _ = tx.send(Err(error));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,12 @@ async fn too_many_inbound_requests() {
match handle2.next().await {
Some(RequestResponseEvent::RequestFailed { peer, error, .. }) => {
assert_eq!(peer, peer1);
assert_eq!(error, RequestResponseError::Rejected);
assert_eq!(
error,
RequestResponseError::Rejected(
litep2p::protocol::request_response::RejectReason::SubstreamClosed
)
);
},
event => panic!("inavlid event: {event:?}"),
}
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ documentation = "https://docs.rs/sc-network-types"
bs58 = { workspace = true, default-features = true }
ed25519-dalek = { workspace = true, default-features = true }
libp2p-identity = { features = ["ed25519", "peerid", "rand"], workspace = true }
litep2p = { workspace = true }
litep2p = { workspace = true, features = ["websocket"] }
lexnv marked this conversation as resolved.
Show resolved Hide resolved
log = { workspace = true, default-features = true }
multiaddr = { workspace = true }
multihash = { workspace = true }
Expand Down
Loading