Skip to content

Commit

Permalink
litep2p: Update network backend to v0.7.0 (#5609)
Browse files Browse the repository at this point in the history
This release introduces several new features, improvements, and fixes to
the litep2p library. Key updates include enhanced error handling,
configurable connection limits, and a new API for managing public
addresses.

For a detailed set of changes, see [litep2p
changelog](https://github.com/paritytech/litep2p/blob/master/CHANGELOG.md#070---2024-09-05).

This PR makes use of:
- connection limits to optimize network throughput
- better errors that are propagated to substrate metrics
- public addresses API to report healthy addresses to the Identify
protocol

Measuring warp sync time is a bit inaccurate since the network is not
deterministic and we might end up using faster peers (peers with more
resources to handle our requests). However, I did not see warp sync
times of 16 minutes, instead, they are roughly stabilized between 8 and
10 minutes.

For measuring warp-sync time, I've used
[sub-trige-logs](https://github.com/lexnv/sub-triage-logs/?tab=readme-ov-file#warp-time)

Phase | Time
 -|-
Warp  | 426.999999919s
State | 99.000000555s
Total | 526.000000474s

Phase | Time
 -|-
Warp  | 731.999999837s
State | 71.000000882s
Total | 803.000000719s

Closes: #4986

After exposing the `litep2p::public_addresses` interface, we can report
to litep2p confirmed external addresses. This should mitigate or at
least improve: #4925.
Will keep the issue around to confirm this.

We are one step closer to exposing similar metrics as libp2p:
#4681.

cc @paritytech/networking

- [x] Use public address interface to confirm addresses to identify
protocol

---------

Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv committed Nov 15, 2024
1 parent dba2dd5 commit 141eb46
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ linked-hash-map = { version = "0.5.4" }
linked_hash_set = { version = "0.1.4" }
linregress = { version = "0.5.1" }
lite-json = { version = "0.2.0", default-features = false }
litep2p = { version = "0.6.2" }
litep2p = { version = "0.7.0", features = ["websocket"] }
log = { version = "0.4.22", default-features = false }
macro_magic = { version = "0.5.1" }
maplit = { version = "1.0.2" }
Expand Down
21 changes: 21 additions & 0 deletions prdoc/pr_5609.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
title: Update litep2p network backend to v0.7.0

doc:
- audience: [ Node Dev, Node Operator ]
description: |
This PR updates the Litep2p network backend to version 0.7.0.
This new release introduces several new features, improvements, and fixes to the litep2p library.
Key updates include enhanced error handling propagated through metrics, configurable connection limits,
and a new API for managing public addresses.

The Identify protocol no longer includes public addresses in its configuration.
Instead, we rely on the `litep2p.public_addresses` interface to propagate external addresses of the node.

Litep2p uses hickory DNS resolver (formerly known as trust DNS).
Similarly to the trust DNS, the hickory logs are silenced.

crates:
- name: sc-network
bump: patch
- name: sc-tracing
bump: minor
8 changes: 3 additions & 5 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,9 @@ impl Discovery {
) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option<MdnsConfig>) {
let (ping_config, ping_event_stream) = PingConfig::default();
let user_agent = format!("{} ({})", config.client_version, config.node_name);
let (identify_config, identify_event_stream) = IdentifyConfig::new(
"/substrate/1.0".to_string(),
Some(user_agent),
config.public_addresses.clone().into_iter().map(Into::into).collect(),
);

let (identify_config, identify_event_stream) =
IdentifyConfig::new("/substrate/1.0".to_string(), Some(user_agent));

let (mdns_config, mdns_event_stream) = match config.transport {
crate::config::TransportConfig::Normal { enable_mdns, .. } => match enable_mdns {
Expand Down
78 changes: 55 additions & 23 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use libp2p::kad::{PeerRecord, Record as P2PRecord, RecordKey};
use litep2p::{
config::ConfigBuilder,
crypto::ed25519::Keypair,
error::{DialError, NegotiationError},
executor::Executor,
protocol::{
libp2p::{
Expand All @@ -64,15 +65,14 @@ use litep2p::{
},
transport::{
tcp::config::Config as TcpTransportConfig,
websocket::config::Config as WebSocketTransportConfig, Endpoint,
websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
},
types::{
multiaddr::{Multiaddr, Protocol},
ConnectionId,
},
Error as Litep2pError, Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
};
use parking_lot::RwLock;
use prometheus_endpoint::Registry;

use sc_client_api::BlockBackend;
Expand Down Expand Up @@ -183,9 +183,6 @@ pub struct Litep2pNetworkBackend {

/// Prometheus metrics.
metrics: Option<Metrics>,

/// External addresses.
external_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
}

impl Litep2pNetworkBackend {
Expand Down Expand Up @@ -557,6 +554,9 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
.with_libp2p_ping(ping_config)
.with_libp2p_identify(identify_config)
.with_libp2p_kademlia(kademlia_config)
.with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
))
.with_executor(executor);

if let Some(config) = maybe_mdns_config {
Expand All @@ -570,15 +570,22 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
let litep2p =
Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;

let external_addresses: Arc<RwLock<HashSet<Multiaddr>>> = Arc::new(RwLock::new(
HashSet::from_iter(network_config.public_addresses.iter().cloned().map(Into::into)),
));
litep2p.listen_addresses().for_each(|address| {
log::debug!(target: LOG_TARGET, "listening on: {address}");

listen_addresses.write().insert(address.clone());
});

let public_addresses = litep2p.public_addresses();
for address in network_config.public_addresses.iter() {
if let Err(err) = public_addresses.add_address(address.clone().into()) {
log::warn!(
target: LOG_TARGET,
"failed to add public address {address:?}: {err:?}",
);
}
}

let network_service = Arc::new(Litep2pNetworkService::new(
local_peer_id,
keypair.clone(),
Expand All @@ -588,7 +595,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
block_announce_protocol.clone(),
request_response_senders,
Arc::clone(&listen_addresses),
Arc::clone(&external_addresses),
public_addresses,
));

// register rest of the metrics now that `Litep2p` has been created
Expand All @@ -614,7 +621,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
event_streams: out_events::OutChannels::new(None)?,
peers: HashMap::new(),
litep2p,
external_addresses,
})
}

Expand Down Expand Up @@ -917,10 +923,16 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
}
Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
let mut addresses = self.external_addresses.write();

if addresses.insert(address.clone()) {
log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
match self.litep2p.public_addresses().add_address(address.clone().into()) {
Ok(inserted) => if inserted {
log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
},
Err(err) => {
log::warn!(
target: LOG_TARGET,
"🔍 Failed to add discovered external address {address:?}: {err:?}",
);
},
}
}
Some(DiscoveryEvent::Ping { peer, rtt }) => {
Expand Down Expand Up @@ -1006,20 +1018,40 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
}
}
Some(Litep2pEvent::DialFailure { address, error }) => {
log::trace!(
log::debug!(
target: LOG_TARGET,
"failed to dial peer at {address:?}: {error:?}",
);

let reason = match error {
Litep2pError::PeerIdMismatch(_, _) => "invalid-peer-id",
Litep2pError::Timeout | Litep2pError::TransportError(_) |
Litep2pError::IoError(_) | Litep2pError::WebSocket(_) => "transport-error",
_ => "other",
};
if let Some(metrics) = &self.metrics {
let reason = match error {
DialError::Timeout => "timeout",
DialError::AddressError(_) => "invalid-address",
DialError::DnsError(_) => "cannot-resolve-dns",
DialError::NegotiationError(error) => match error {
NegotiationError::Timeout => "timeout",
NegotiationError::PeerIdMissing => "missing-peer-id",
NegotiationError::StateMismatch => "state-mismatch",
NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
NegotiationError::SnowError(_) => "noise-error",
NegotiationError::ParseError(_) => "parse-error",
NegotiationError::IoError(_) => "io-error",
NegotiationError::WebSocket(_) => "webscoket-error",
}
};

metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
}
}
Some(Litep2pEvent::ListDialFailures { errors }) => {
log::debug!(
target: LOG_TARGET,
"failed to dial peer on multiple addresses {errors:?}",
);

if let Some(metrics) = &self.metrics {
metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
}
}
_ => {}
Expand Down
16 changes: 9 additions & 7 deletions substrate/client/network/src/litep2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use crate::litep2p::Record;
use codec::DecodeAll;
use futures::{channel::oneshot, stream::BoxStream};
use libp2p::{identity::SigningError, kad::record::Key as KademliaKey};
use litep2p::{crypto::ed25519::Keypair, types::multiaddr::Multiaddr as LiteP2pMultiaddr};
use litep2p::{
addresses::PublicAddresses, crypto::ed25519::Keypair,
types::multiaddr::Multiaddr as LiteP2pMultiaddr,
};
use parking_lot::RwLock;

use sc_network_common::{
Expand Down Expand Up @@ -196,7 +199,7 @@ pub struct Litep2pNetworkService {
listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,

/// External addresses.
external_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
external_addresses: PublicAddresses,
}

impl Litep2pNetworkService {
Expand All @@ -210,7 +213,7 @@ impl Litep2pNetworkService {
block_announce_protocol: ProtocolName,
request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
external_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
external_addresses: PublicAddresses,
) -> Self {
Self {
local_peer_id,
Expand Down Expand Up @@ -323,9 +326,8 @@ impl NetworkStatusProvider for Litep2pNetworkService {
.collect(),
external_addresses: self
.external_addresses
.read()
.iter()
.cloned()
.get_addresses()
.into_iter()
.map(|a| Multiaddr::from(a).into())
.collect(),
connected_peers: HashMap::new(),
Expand Down Expand Up @@ -491,7 +493,7 @@ impl NetworkEventStream for Litep2pNetworkService {

impl NetworkStateInfo for Litep2pNetworkService {
fn external_addresses(&self) -> Vec<Multiaddr> {
self.external_addresses.read().iter().cloned().map(Into::into).collect()
self.external_addresses.get_addresses().into_iter().map(Into::into).collect()
}

fn listen_addresses(&self) -> Vec<Multiaddr> {
Expand Down
31 changes: 29 additions & 2 deletions substrate/client/network/src/litep2p/shim/request_response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ use crate::{

use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, StreamExt};
use litep2p::{
error::{ImmediateDialError, NegotiationError, SubstreamError},
protocol::request_response::{
DialOptions, RequestResponseError, RequestResponseEvent, RequestResponseHandle,
DialOptions, RejectReason, RequestResponseError, RequestResponseEvent,
RequestResponseHandle,
},
types::RequestId,
};
Expand Down Expand Up @@ -372,7 +374,32 @@ impl RequestResponseProtocol {
let status = match error {
RequestResponseError::NotConnected =>
Some((RequestFailure::NotConnected, "not-connected")),
RequestResponseError::Rejected => Some((RequestFailure::Refused, "rejected")),
RequestResponseError::Rejected(reason) => {
let reason = match reason {
RejectReason::ConnectionClosed => "connection-closed",
RejectReason::SubstreamClosed => "substream-closed",
RejectReason::SubstreamOpenError(substream_error) => match substream_error {
SubstreamError::NegotiationError(NegotiationError::Timeout) =>
"substream-timeout",
_ => "substream-open-error",
},
RejectReason::DialFailed(None) => "dial-failed",
RejectReason::DialFailed(Some(ImmediateDialError::AlreadyConnected)) =>
"dial-already-connected",
RejectReason::DialFailed(Some(ImmediateDialError::PeerIdMissing)) =>
"dial-peerid-missing",
RejectReason::DialFailed(Some(ImmediateDialError::TriedToDialSelf)) =>
"dial-tried-to-dial-self",
RejectReason::DialFailed(Some(ImmediateDialError::NoAddressAvailable)) =>
"dial-no-address-available",
RejectReason::DialFailed(Some(ImmediateDialError::TaskClosed)) =>
"dial-task-closed",
RejectReason::DialFailed(Some(ImmediateDialError::ChannelClogged)) =>
"dial-channel-clogged",
};

Some((RequestFailure::Refused, reason))
},
RequestResponseError::Timeout =>
Some((RequestFailure::Network(OutboundFailure::Timeout), "timeout")),
RequestResponseError::Canceled => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,12 @@ async fn too_many_inbound_requests() {
match handle2.next().await {
Some(RequestResponseEvent::RequestFailed { peer, error, .. }) => {
assert_eq!(peer, peer1);
assert_eq!(error, RequestResponseError::Rejected);
assert_eq!(
error,
RequestResponseError::Rejected(
litep2p::protocol::request_response::RejectReason::SubstreamClosed
)
);
},
event => panic!("inavlid event: {event:?}"),
}
Expand Down
3 changes: 3 additions & 0 deletions substrate/client/tracing/src/logging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ where
.add_directive(
parse_default_directive("trust_dns_proto=off").expect("provided directive is valid"),
)
.add_directive(
parse_default_directive("hickory_proto=off").expect("provided directive is valid"),
)
.add_directive(
parse_default_directive("libp2p_mdns::behaviour::iface=off")
.expect("provided directive is valid"),
Expand Down

0 comments on commit 141eb46

Please sign in to comment.