Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

litep2p: Update network backend to v0.7.0 #5609

Merged
merged 19 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
384 changes: 117 additions & 267 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ linked-hash-map = { version = "0.5.4" }
linked_hash_set = { version = "0.1.4" }
linregress = { version = "0.5.1" }
lite-json = { version = "0.2.0", default-features = false }
litep2p = { version = "0.6.2" }
litep2p = { version = "0.7.0", features = ["websocket"] }
log = { version = "0.4.22", default-features = false }
macro_magic = { version = "0.5.1" }
maplit = { version = "1.0.2" }
Expand Down
16 changes: 16 additions & 0 deletions prdoc/pr_5609.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
title: Update litep2p network backend to v0.7.0

doc:
- audience: [ Node Dev, Node Operator ]
description: |
This PR updates the Litep2p network backend to version 0.7.0.
This new release introduces several new features, improvements, and fixes to the litep2p library.
Key updates include enhanced error handling propagated through metrics, configurable connection limits,
and a new API for managing public addresses.

The Identify protocol no longer includes public addresses in its configuration.
Instead, we rely on the `litep2p.public_addresses` interface to propagate external addresses of the node.

crates:
- name: sc-network
bump: minor
8 changes: 3 additions & 5 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,9 @@ impl Discovery {
) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option<MdnsConfig>) {
let (ping_config, ping_event_stream) = PingConfig::default();
let user_agent = format!("{} ({})", config.client_version, config.node_name);
let (identify_config, identify_event_stream) = IdentifyConfig::new(
"/substrate/1.0".to_string(),
Some(user_agent),
config.public_addresses.clone().into_iter().map(Into::into).collect(),
);

let (identify_config, identify_event_stream) =
IdentifyConfig::new("/substrate/1.0".to_string(), Some(user_agent));

let (mdns_config, mdns_event_stream) = match config.transport {
crate::config::TransportConfig::Normal { enable_mdns, .. } => match enable_mdns {
Expand Down
73 changes: 51 additions & 22 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use libp2p::kad::{PeerRecord, Record as P2PRecord, RecordKey};
use litep2p::{
config::ConfigBuilder,
crypto::ed25519::Keypair,
error::{DialError, NegotiationError},
executor::Executor,
protocol::{
libp2p::{
Expand All @@ -64,15 +65,14 @@ use litep2p::{
},
transport::{
tcp::config::Config as TcpTransportConfig,
websocket::config::Config as WebSocketTransportConfig, Endpoint,
websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
},
types::{
multiaddr::{Multiaddr, Protocol},
ConnectionId,
},
Error as Litep2pError, Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
};
use parking_lot::RwLock;
use prometheus_endpoint::Registry;

use sc_client_api::BlockBackend;
Expand Down Expand Up @@ -183,9 +183,6 @@ pub struct Litep2pNetworkBackend {

/// Prometheus metrics.
metrics: Option<Metrics>,

/// External addresses.
external_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
}

impl Litep2pNetworkBackend {
Expand Down Expand Up @@ -557,6 +554,9 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
.with_libp2p_ping(ping_config)
.with_libp2p_identify(identify_config)
.with_libp2p_kademlia(kademlia_config)
.with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
))
.with_executor(executor);

if let Some(config) = maybe_mdns_config {
Expand All @@ -570,15 +570,22 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
let litep2p =
Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;

let external_addresses: Arc<RwLock<HashSet<Multiaddr>>> = Arc::new(RwLock::new(
HashSet::from_iter(network_config.public_addresses.iter().cloned().map(Into::into)),
));
litep2p.listen_addresses().for_each(|address| {
log::debug!(target: LOG_TARGET, "listening on: {address}");

listen_addresses.write().insert(address.clone());
});

let public_addresses = litep2p.public_addresses();
for address in network_config.public_addresses.iter() {
if let Err(err) = public_addresses.add_address(address.clone().into()) {
log::warn!(
target: LOG_TARGET,
"failed to add public address {address:?}: {err:?}",
);
}
}

let network_service = Arc::new(Litep2pNetworkService::new(
local_peer_id,
keypair.clone(),
Expand All @@ -588,7 +595,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
block_announce_protocol.clone(),
request_response_senders,
Arc::clone(&listen_addresses),
Arc::clone(&external_addresses),
public_addresses,
));

// register rest of the metrics now that `Litep2p` has been created
Expand All @@ -614,7 +621,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
event_streams: out_events::OutChannels::new(None)?,
peers: HashMap::new(),
litep2p,
external_addresses,
})
}

Expand Down Expand Up @@ -917,9 +923,12 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
}
Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
let mut addresses = self.external_addresses.write();

if addresses.insert(address.clone()) {
if let Err(err) = self.litep2p.public_addresses().add_address(address.clone().into()) {
log::warn!(
target: LOG_TARGET,
"🔍 Failed to add discovered external address {address:?}: {err:?}",
);
} else {
log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
}
}
Expand Down Expand Up @@ -1006,20 +1015,40 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
}
}
Some(Litep2pEvent::DialFailure { address, error }) => {
log::trace!(
log::debug!(
target: LOG_TARGET,
"failed to dial peer at {address:?}: {error:?}",
);

let reason = match error {
Litep2pError::PeerIdMismatch(_, _) => "invalid-peer-id",
Litep2pError::Timeout | Litep2pError::TransportError(_) |
Litep2pError::IoError(_) | Litep2pError::WebSocket(_) => "transport-error",
_ => "other",
};
if let Some(metrics) = &self.metrics {
let reason = match error {
DialError::Timeout => "timeout",
DialError::AddressError(_) => "invalid-address",
DialError::DnsError(_) => "cannot-resolve-dns",
DialError::NegotiationError(error) => match error {
NegotiationError::Timeout => "timeout",
NegotiationError::PeerIdMissing => "missing-peer-id",
NegotiationError::StateMismatch => "state-mismatch",
NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
NegotiationError::SnowError(_) => "noise-error",
NegotiationError::ParseError(_) => "parse-error",
NegotiationError::IoError(_) => "io-error",
NegotiationError::WebSocket(_) => "webscoket-error",
}
};

metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
}
}
Some(Litep2pEvent::ListDialFailures { errors }) => {
log::debug!(
target: LOG_TARGET,
"failed to dial peer on multiple addresses {errors:?}",
);

if let Some(metrics) = &self.metrics {
metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
}
}
_ => {}
Expand Down
16 changes: 9 additions & 7 deletions substrate/client/network/src/litep2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use crate::litep2p::Record;
use codec::DecodeAll;
use futures::{channel::oneshot, stream::BoxStream};
use libp2p::{identity::SigningError, kad::record::Key as KademliaKey};
use litep2p::{crypto::ed25519::Keypair, types::multiaddr::Multiaddr as LiteP2pMultiaddr};
use litep2p::{
addresses::PublicAddresses, crypto::ed25519::Keypair,
types::multiaddr::Multiaddr as LiteP2pMultiaddr,
};
use parking_lot::RwLock;

use sc_network_common::{
Expand Down Expand Up @@ -196,7 +199,7 @@ pub struct Litep2pNetworkService {
listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,

/// External addresses.
external_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
external_addresses: PublicAddresses,
}

impl Litep2pNetworkService {
Expand All @@ -210,7 +213,7 @@ impl Litep2pNetworkService {
block_announce_protocol: ProtocolName,
request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
external_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
external_addresses: PublicAddresses,
) -> Self {
Self {
local_peer_id,
Expand Down Expand Up @@ -323,9 +326,8 @@ impl NetworkStatusProvider for Litep2pNetworkService {
.collect(),
external_addresses: self
.external_addresses
.read()
.iter()
.cloned()
.get_addresses()
.into_iter()
.map(|a| Multiaddr::from(a).into())
.collect(),
connected_peers: HashMap::new(),
Expand Down Expand Up @@ -491,7 +493,7 @@ impl NetworkEventStream for Litep2pNetworkService {

impl NetworkStateInfo for Litep2pNetworkService {
fn external_addresses(&self) -> Vec<Multiaddr> {
self.external_addresses.read().iter().cloned().map(Into::into).collect()
self.external_addresses.get_addresses().into_iter().map(Into::into).collect()
}

fn listen_addresses(&self) -> Vec<Multiaddr> {
Expand Down
31 changes: 29 additions & 2 deletions substrate/client/network/src/litep2p/shim/request_response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ use crate::{

use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, StreamExt};
use litep2p::{
error::{ImmediateDialError, NegotiationError, SubstreamError},
protocol::request_response::{
DialOptions, RequestResponseError, RequestResponseEvent, RequestResponseHandle,
DialOptions, RejectReason, RequestResponseError, RequestResponseEvent,
RequestResponseHandle,
},
types::RequestId,
};
Expand Down Expand Up @@ -372,7 +374,32 @@ impl RequestResponseProtocol {
let status = match error {
RequestResponseError::NotConnected =>
Some((RequestFailure::NotConnected, "not-connected")),
RequestResponseError::Rejected => Some((RequestFailure::Refused, "rejected")),
RequestResponseError::Rejected(reason) => {
let reason = match reason {
RejectReason::ConnectionClosed => "connection-closed",
RejectReason::SubstreamClosed => "substream-closed",
RejectReason::SubstreamOpenError(substream_error) => match substream_error {
SubstreamError::NegotiationError(NegotiationError::Timeout) =>
"substream-timeout",
_ => "substream-open-error",
},
RejectReason::DialFailed(None) => "dial-failed",
RejectReason::DialFailed(Some(ImmediateDialError::AlreadyConnected)) =>
"dial-already-connected",
RejectReason::DialFailed(Some(ImmediateDialError::PeerIdMissing)) =>
"dial-peerid-missing",
RejectReason::DialFailed(Some(ImmediateDialError::TriedToDialSelf)) =>
"dial-tried-to-dial-self",
RejectReason::DialFailed(Some(ImmediateDialError::NoAddressAvailable)) =>
"dial-no-address-available",
RejectReason::DialFailed(Some(ImmediateDialError::TaskClosed)) =>
"dial-task-closed",
RejectReason::DialFailed(Some(ImmediateDialError::ChannelClogged)) =>
"dial-channel-clogged",
};

Some((RequestFailure::Refused, reason))
},
RequestResponseError::Timeout =>
Some((RequestFailure::Network(OutboundFailure::Timeout), "timeout")),
RequestResponseError::Canceled => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,12 @@ async fn too_many_inbound_requests() {
match handle2.next().await {
Some(RequestResponseEvent::RequestFailed { peer, error, .. }) => {
assert_eq!(peer, peer1);
assert_eq!(error, RequestResponseError::Rejected);
assert_eq!(
error,
RequestResponseError::Rejected(
litep2p::protocol::request_response::RejectReason::SubstreamClosed
)
);
},
event => panic!("inavlid event: {event:?}"),
}
Expand Down
Loading