Skip to content

Commit

Permalink
feat: backoff recon synchronization to peers with bad data (#597)
Browse files Browse the repository at this point in the history
* feat: backoff recon synchronization to peers with bad data

With this change peers will backoff synchronization attempts with peers
that fail synchronization. This will have the effect that the bootstrap
nodes do not talk to nodes that are sending bad data very often.

However the backoff state is tied to the connection state and is reset
if the connection resets. Allowing developers to restart their node and
be in good standing with bootstrap nodes.

Additionally a small change was added to prefer using the connected
address to a peer when reporting the address of connected peers.

* fix: change default connection timeout to 15m

Keep connections around longer so we can remember that we have backed
off peers, otherwise we the connection will close and reconnect resetting
the backoff delay.

* fix: keep interests and model syncs separate

This change makes it so that interests and model syncs are independent.
We always start with the interests sync. But then after the initial
syncs they can independently be backed off.

This helps with no resetting idle connections and therefore resetting the
backoff state. As there is likely some activity on the connection for
one of the two syncs.

* comment

* Update recon/src/libp2p.rs

Co-authored-by: Mohsin Zaidi <[email protected]>

---------

Co-authored-by: Mohsin Zaidi <[email protected]>
  • Loading branch information
nathanielc and smrz2001 authored Nov 15, 2024
1 parent 315d791 commit 1a895e1
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 81 deletions.
14 changes: 10 additions & 4 deletions p2p/src/behaviour/ceramic_peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct CeramicPeerManager {
pub struct Info {
pub last_rtt: Option<Duration>,
pub last_info: Option<IdentifyInfo>,
pub connected_point: Option<Multiaddr>,
}

impl Info {
Expand Down Expand Up @@ -145,11 +146,16 @@ impl NetworkBehaviour for CeramicPeerManager {
self.handle_connection_established(&event.peer_id)
}

if let Some(info) = self.info.get_mut(&event.peer_id) {
if let Some(ref mut info) = info.last_info {
info.listen_addrs
.retain(|addr| !event.failed_addresses.contains(addr))
let info = self.info.entry(event.peer_id).or_default();
info.connected_point = Some(match event.endpoint {
libp2p::core::ConnectedPoint::Dialer { address, .. } => address.clone(),
libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => {
send_back_addr.clone()
}
});
if let Some(ref mut info) = info.last_info {
info.listen_addrs
.retain(|addr| !event.failed_addresses.contains(addr))
}
}
libp2p::swarm::FromSwarm::ConnectionClosed(event) => {
Expand Down
14 changes: 10 additions & 4 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,10 +984,16 @@ where
.peer_manager
.info_for_peer(&pid)
.map(|info| {
info.last_info
.as_ref()
.map(|last_info| last_info.listen_addrs.clone())
.unwrap_or_default()
info.connected_point
.iter()
.chain(
info.last_info
.as_ref()
.map(|last_info| last_info.listen_addrs.iter())
.unwrap_or_default(),
)
.cloned()
.collect()
})
.unwrap_or_default(),
)
Expand Down
132 changes: 87 additions & 45 deletions recon/src/libp2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ mod tests;
mod upgrade;

use ceramic_core::{EventId, Interest};
use futures::{future::BoxFuture, FutureExt};
use libp2p::{
core::ConnectedPoint,
swarm::{ConnectionId, NetworkBehaviour, NotifyHandler, ToSwarm},
};
use libp2p_identity::PeerId;
use std::{
cmp::min,
collections::{btree_map::Entry, BTreeMap},
task::Poll,
time::{Duration, Instant},
time::Duration,
};
use tokio::time::Instant;
use tracing::{debug, trace, warn};

pub use crate::protocol::Recon;
Expand All @@ -49,13 +52,20 @@ pub const PROTOCOL_NAME_MODEL: &str = "/ceramic/recon/0.1.0/model";
pub struct Config {
/// Start a new sync once the duration has past in the failed or synchronized state.
/// Defaults to 1 second.
pub per_peer_sync_timeout: Duration,
pub per_peer_sync_delay: Duration,
/// Backoff sequential failures as multiples of delay.
pub per_peer_sync_backoff: f64,
/// Maximum delay between synchronization attempts.
/// Defaults to 10 minutes
pub per_peer_maximum_sync_delay: Duration,
}

impl Default for Config {
fn default() -> Self {
Self {
per_peer_sync_timeout: Duration::from_millis(1000),
per_peer_sync_delay: Duration::from_millis(1000),
per_peer_sync_backoff: 2.0,
per_peer_maximum_sync_delay: Duration::from_secs(60 * 10),
}
}
}
Expand All @@ -65,22 +75,37 @@ impl Default for Config {
/// The Behavior tracks all peers on the network that speak the Recon protocol.
/// It is responsible for starting and stopping syncs with various peers depending on the needs of
/// the application.
#[derive(Debug)]
pub struct Behaviour<I, M> {
interest: I,
model: M,
config: Config,
peers: BTreeMap<PeerId, PeerInfo>,
swarm_events_sender: tokio::sync::mpsc::Sender<ToSwarm<Event, FromBehaviour>>,
swarm_events_receiver: tokio::sync::mpsc::Receiver<ToSwarm<Event, FromBehaviour>>,
next_sync: Option<BoxFuture<'static, ()>>,
}

impl<I: std::fmt::Debug, M: std::fmt::Debug> std::fmt::Debug for Behaviour<I, M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Behaviour")
.field("interest", &self.interest)
.field("model", &self.model)
.field("config", &self.config)
.field("peers", &self.peers)
.field("swarm_events_sender", &self.swarm_events_sender)
.field("swarm_events_receiver", &self.swarm_events_receiver)
.field("next_sync", &"_")
.finish()
}
}

/// Information about a remote peer and its sync status.
#[derive(Clone, Debug)]
struct PeerInfo {
status: PeerStatus,
connections: Vec<ConnectionInfo>,
last_sync: Option<Instant>,
next_sync: BTreeMap<StreamSet, Instant>,
sync_delay: BTreeMap<StreamSet, Duration>,
}

#[derive(Clone, Copy, Debug)]
Expand All @@ -106,7 +131,10 @@ pub enum PeerStatus {
stream_set: StreamSet,
},
/// The last attempt to synchronize with the remote peer resulted in an error.
Failed,
Failed {
/// The stream_set that has failed synchronizing.
stream_set: StreamSet,
},
/// Local peer has stopped synchronizing with the remote peer and will not attempt to
/// synchronize again.
Stopped,
Expand All @@ -127,6 +155,7 @@ impl<I, M> Behaviour<I, M> {
peers: BTreeMap::new(),
swarm_events_sender: tx,
swarm_events_receiver: rx,
next_sync: None,
}
}

Expand Down Expand Up @@ -162,7 +191,13 @@ where
.or_insert_with(|| PeerInfo {
status: PeerStatus::Waiting,
connections: vec![connection_info],
last_sync: None,
next_sync: BTreeMap::from_iter([
// Schedule all stream_sets initially
(StreamSet::Interest, Instant::now()),
// Schedule models after interests
(StreamSet::Model, Instant::now() + Duration::from_millis(1)),
]),
sync_delay: Default::default(),
});
}
libp2p::swarm::FromSwarm::ConnectionClosed(info) => {
Expand Down Expand Up @@ -214,8 +249,17 @@ where
// is now idle.
FromHandler::Succeeded { stream_set } => {
if let Entry::Occupied(mut entry) = self.peers.entry(peer_id) {
debug!(%peer_id, ?stream_set, "synchronization succeeded with peer");
let info = entry.get_mut();
info.last_sync = Some(Instant::now());
let sync_delay = *info
.sync_delay
.get(&stream_set)
.unwrap_or(&self.config.per_peer_sync_delay);
info.next_sync
.insert(stream_set, Instant::now() + sync_delay);
// On success reset delay
info.sync_delay
.insert(stream_set, self.config.per_peer_sync_delay);
info.status = PeerStatus::Synchronized { stream_set };
Some(ToSwarm::GenerateEvent(Event::PeerEvent(PeerEvent {
remote_peer_id: peer_id,
Expand All @@ -229,12 +273,25 @@ where

// The peer has failed to synchronized with us, mark the time and record that the peer connection
// is now failed.
FromHandler::Failed(error) => {
FromHandler::Failed { stream_set, error } => {
if let Entry::Occupied(mut entry) = self.peers.entry(peer_id) {
let info = entry.get_mut();
warn!(%peer_id, %error, "synchronization failed with peer");
info.last_sync = Some(Instant::now());
info.status = PeerStatus::Failed;
let sync_delay = *info
.sync_delay
.get(&stream_set)
.unwrap_or(&self.config.per_peer_sync_delay);
warn!(%peer_id, %error, ?sync_delay, ?stream_set, "synchronization failed with peer");
info.next_sync
.insert(stream_set, Instant::now() + sync_delay);
// On failure increase sync delay
info.sync_delay.insert(
stream_set,
min(
sync_delay.mul_f64(self.config.per_peer_sync_backoff),
self.config.per_peer_maximum_sync_delay,
),
);
info.status = PeerStatus::Failed { stream_set };
Some(ToSwarm::GenerateEvent(Event::PeerEvent(PeerEvent {
remote_peer_id: peer_id,
status: info.status,
Expand Down Expand Up @@ -267,52 +324,37 @@ where
if connection_info.dialer {
match info.status {
PeerStatus::Waiting | PeerStatus::Started { .. } | PeerStatus::Stopped => {}
PeerStatus::Failed => {
// Sync if its been a while since we last synchronized
let should_sync = if let Some(last_sync) = &info.last_sync {
last_sync.elapsed() > self.config.per_peer_sync_timeout
} else {
false
};
if should_sync {
PeerStatus::Failed { .. } | PeerStatus::Synchronized { .. } => {
// Find earliest scheduled stream set
let (next_stream_set, next_sync) =
info.next_sync.iter().min_by_key(|(_, t)| *t).expect(
"next_sync should always be initialized with stream sets",
);
debug!(?next_stream_set,?next_sync, now=?Instant::now(), "polling");
// Sync if enough time has passed since we synced the stream set.
if *next_sync < Instant::now() {
self.next_sync = None;
info.status = PeerStatus::Waiting;
return Poll::Ready(ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(connection_info.id),
event: FromBehaviour::StartSync {
stream_set: StreamSet::Interest,
},
});
}
}
PeerStatus::Synchronized { stream_set } => {
// Sync if we just finished an interest sync or its been a while since we
// last synchronized.
let should_sync = stream_set == StreamSet::Interest
|| if let Some(last_sync) = &info.last_sync {
last_sync.elapsed() > self.config.per_peer_sync_timeout
} else {
false
};
if should_sync {
info.status = PeerStatus::Waiting;
let next_stream_set = match stream_set {
StreamSet::Interest => StreamSet::Model,
StreamSet::Model => StreamSet::Interest,
};
return Poll::Ready(ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(connection_info.id),
event: FromBehaviour::StartSync {
stream_set: next_stream_set,
stream_set: *next_stream_set,
},
});
} else {
self.next_sync =
Some(Box::pin(tokio::time::sleep_until(*next_sync)));
}
}
}
}
}
}
// Ensure we are scheduled to be polled again when the next sync is ready
if let Some(ref mut next_sync) = &mut self.next_sync {
let _ = next_sync.poll_unpin(cx);
}

Poll::Pending
}
Expand Down
Loading

0 comments on commit 1a895e1

Please sign in to comment.