diff --git a/p2p/src/behaviour/ceramic_peer_manager.rs b/p2p/src/behaviour/ceramic_peer_manager.rs index 854a58299..f9e9aacbc 100644 --- a/p2p/src/behaviour/ceramic_peer_manager.rs +++ b/p2p/src/behaviour/ceramic_peer_manager.rs @@ -41,6 +41,7 @@ pub struct CeramicPeerManager { pub struct Info { pub last_rtt: Option, pub last_info: Option, + pub connected_point: Option, } impl Info { @@ -145,11 +146,16 @@ impl NetworkBehaviour for CeramicPeerManager { self.handle_connection_established(&event.peer_id) } - if let Some(info) = self.info.get_mut(&event.peer_id) { - if let Some(ref mut info) = info.last_info { - info.listen_addrs - .retain(|addr| !event.failed_addresses.contains(addr)) + let info = self.info.entry(event.peer_id).or_default(); + info.connected_point = Some(match event.endpoint { + libp2p::core::ConnectedPoint::Dialer { address, .. } => address.clone(), + libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => { + send_back_addr.clone() } + }); + if let Some(ref mut info) = info.last_info { + info.listen_addrs + .retain(|addr| !event.failed_addresses.contains(addr)) } } libp2p::swarm::FromSwarm::ConnectionClosed(event) => { diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 991aab345..32d71fb62 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -984,10 +984,16 @@ where .peer_manager .info_for_peer(&pid) .map(|info| { - info.last_info - .as_ref() - .map(|last_info| last_info.listen_addrs.clone()) - .unwrap_or_default() + info.connected_point + .iter() + .chain( + info.last_info + .as_ref() + .map(|last_info| last_info.listen_addrs.iter()) + .unwrap_or_default(), + ) + .cloned() + .collect() }) .unwrap_or_default(), ) diff --git a/recon/src/libp2p.rs b/recon/src/libp2p.rs index 7743997b7..1dab520ee 100644 --- a/recon/src/libp2p.rs +++ b/recon/src/libp2p.rs @@ -18,16 +18,19 @@ mod tests; mod upgrade; use ceramic_core::{EventId, Interest}; +use futures::{future::BoxFuture, FutureExt}; use libp2p::{ core::ConnectedPoint, swarm::{ConnectionId, NetworkBehaviour, NotifyHandler, ToSwarm}, }; use libp2p_identity::PeerId; use std::{ + cmp::min, collections::{btree_map::Entry, BTreeMap}, task::Poll, - time::{Duration, Instant}, + time::Duration, }; +use tokio::time::Instant; use tracing::{debug, trace, warn}; pub use crate::protocol::Recon; @@ -49,13 +52,20 @@ pub const PROTOCOL_NAME_MODEL: &str = "/ceramic/recon/0.1.0/model"; pub struct Config { /// Start a new sync once the duration has past in the failed or synchronized state. /// Defaults to 1 second. - pub per_peer_sync_timeout: Duration, + pub per_peer_sync_delay: Duration, + /// Backoff sequential failures as multiples of delay. + pub per_peer_sync_backoff: f64, + /// Maximum delay between synchronization attempts. + /// Defaults to 10 minutes + pub per_peer_maximum_sync_delay: Duration, } impl Default for Config { fn default() -> Self { Self { - per_peer_sync_timeout: Duration::from_millis(1000), + per_peer_sync_delay: Duration::from_millis(1000), + per_peer_sync_backoff: 2.0, + per_peer_maximum_sync_delay: Duration::from_secs(60 * 10), } } } @@ -65,7 +75,6 @@ impl Default for Config { /// The Behavior tracks all peers on the network that speak the Recon protocol. /// It is responsible for starting and stopping syncs with various peers depending on the needs of /// the application. -#[derive(Debug)] pub struct Behaviour { interest: I, model: M, @@ -73,6 +82,21 @@ pub struct Behaviour { peers: BTreeMap, swarm_events_sender: tokio::sync::mpsc::Sender>, swarm_events_receiver: tokio::sync::mpsc::Receiver>, + next_sync: Option>, +} + +impl std::fmt::Debug for Behaviour { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Behaviour") + .field("interest", &self.interest) + .field("model", &self.model) + .field("config", &self.config) + .field("peers", &self.peers) + .field("swarm_events_sender", &self.swarm_events_sender) + .field("swarm_events_receiver", &self.swarm_events_receiver) + .field("next_sync", &"_") + .finish() + } } /// Information about a remote peer and its sync status. @@ -80,7 +104,8 @@ pub struct Behaviour { struct PeerInfo { status: PeerStatus, connections: Vec, - last_sync: Option, + next_sync: BTreeMap, + sync_delay: BTreeMap, } #[derive(Clone, Copy, Debug)] @@ -106,7 +131,10 @@ pub enum PeerStatus { stream_set: StreamSet, }, /// The last attempt to synchronize with the remote peer resulted in an error. - Failed, + Failed { + /// The stream_set that has failed synchronizing. + stream_set: StreamSet, + }, /// Local peer has stopped synchronizing with the remote peer and will not attempt to /// synchronize again. Stopped, @@ -127,6 +155,7 @@ impl Behaviour { peers: BTreeMap::new(), swarm_events_sender: tx, swarm_events_receiver: rx, + next_sync: None, } } @@ -162,7 +191,13 @@ where .or_insert_with(|| PeerInfo { status: PeerStatus::Waiting, connections: vec![connection_info], - last_sync: None, + next_sync: BTreeMap::from_iter([ + // Schedule all stream_sets initially + (StreamSet::Interest, Instant::now()), + // Schedule models after interests + (StreamSet::Model, Instant::now() + Duration::from_millis(1)), + ]), + sync_delay: Default::default(), }); } libp2p::swarm::FromSwarm::ConnectionClosed(info) => { @@ -214,8 +249,17 @@ where // is now idle. FromHandler::Succeeded { stream_set } => { if let Entry::Occupied(mut entry) = self.peers.entry(peer_id) { + debug!(%peer_id, ?stream_set, "synchronization succeeded with peer"); let info = entry.get_mut(); - info.last_sync = Some(Instant::now()); + let sync_delay = *info + .sync_delay + .get(&stream_set) + .unwrap_or(&self.config.per_peer_sync_delay); + info.next_sync + .insert(stream_set, Instant::now() + sync_delay); + // On success reset delay + info.sync_delay + .insert(stream_set, self.config.per_peer_sync_delay); info.status = PeerStatus::Synchronized { stream_set }; Some(ToSwarm::GenerateEvent(Event::PeerEvent(PeerEvent { remote_peer_id: peer_id, @@ -229,12 +273,25 @@ where // The peer has failed to synchronized with us, mark the time and record that the peer connection // is now failed. - FromHandler::Failed(error) => { + FromHandler::Failed { stream_set, error } => { if let Entry::Occupied(mut entry) = self.peers.entry(peer_id) { let info = entry.get_mut(); - warn!(%peer_id, %error, "synchronization failed with peer"); - info.last_sync = Some(Instant::now()); - info.status = PeerStatus::Failed; + let sync_delay = *info + .sync_delay + .get(&stream_set) + .unwrap_or(&self.config.per_peer_sync_delay); + warn!(%peer_id, %error, ?sync_delay, ?stream_set, "synchronization failed with peer"); + info.next_sync + .insert(stream_set, Instant::now() + sync_delay); + // On failure increase sync delay + info.sync_delay.insert( + stream_set, + min( + sync_delay.mul_f64(self.config.per_peer_sync_backoff), + self.config.per_peer_maximum_sync_delay, + ), + ); + info.status = PeerStatus::Failed { stream_set }; Some(ToSwarm::GenerateEvent(Event::PeerEvent(PeerEvent { remote_peer_id: peer_id, status: info.status, @@ -267,52 +324,37 @@ where if connection_info.dialer { match info.status { PeerStatus::Waiting | PeerStatus::Started { .. } | PeerStatus::Stopped => {} - PeerStatus::Failed => { - // Sync if its been a while since we last synchronized - let should_sync = if let Some(last_sync) = &info.last_sync { - last_sync.elapsed() > self.config.per_peer_sync_timeout - } else { - false - }; - if should_sync { + PeerStatus::Failed { .. } | PeerStatus::Synchronized { .. } => { + // Find earliest scheduled stream set + let (next_stream_set, next_sync) = + info.next_sync.iter().min_by_key(|(_, t)| *t).expect( + "next_sync should always be initialized with stream sets", + ); + debug!(?next_stream_set,?next_sync, now=?Instant::now(), "polling"); + // Sync if enough time has passed since we synced the stream set. + if *next_sync < Instant::now() { + self.next_sync = None; info.status = PeerStatus::Waiting; return Poll::Ready(ToSwarm::NotifyHandler { peer_id: *peer_id, handler: NotifyHandler::One(connection_info.id), event: FromBehaviour::StartSync { - stream_set: StreamSet::Interest, - }, - }); - } - } - PeerStatus::Synchronized { stream_set } => { - // Sync if we just finished an interest sync or its been a while since we - // last synchronized. - let should_sync = stream_set == StreamSet::Interest - || if let Some(last_sync) = &info.last_sync { - last_sync.elapsed() > self.config.per_peer_sync_timeout - } else { - false - }; - if should_sync { - info.status = PeerStatus::Waiting; - let next_stream_set = match stream_set { - StreamSet::Interest => StreamSet::Model, - StreamSet::Model => StreamSet::Interest, - }; - return Poll::Ready(ToSwarm::NotifyHandler { - peer_id: *peer_id, - handler: NotifyHandler::One(connection_info.id), - event: FromBehaviour::StartSync { - stream_set: next_stream_set, + stream_set: *next_stream_set, }, }); + } else { + self.next_sync = + Some(Box::pin(tokio::time::sleep_until(*next_sync))); } } } } } } + // Ensure we are scheduled to be polled again when the next sync is ready + if let Some(ref mut next_sync) = &mut self.next_sync { + let _ = next_sync.poll_unpin(cx); + } Poll::Pending } diff --git a/recon/src/libp2p/handler.rs b/recon/src/libp2p/handler.rs index 257fe2711..5fed86625 100644 --- a/recon/src/libp2p/handler.rs +++ b/recon/src/libp2p/handler.rs @@ -95,8 +95,8 @@ pub enum State { WaitingInbound, RequestOutbound { stream_set: StreamSet }, WaitingOutbound, - Outbound(SyncFuture), - Inbound(SyncFuture), + Outbound(SyncFuture, StreamSet), + Inbound(SyncFuture, StreamSet), } impl std::fmt::Debug for State { @@ -109,8 +109,16 @@ impl std::fmt::Debug for State { .field("stream_set", stream_set) .finish(), Self::WaitingOutbound => f.debug_struct("WaitingOutbound").finish(), - Self::Outbound(_) => f.debug_tuple("Outbound").field(&"_").finish(), - Self::Inbound(_) => f.debug_tuple("Inbound").field(&"_").finish(), + Self::Outbound(_, stream_set) => f + .debug_tuple("Outbound") + .field(&"_") + .field(&stream_set) + .finish(), + Self::Inbound(_, stream_set) => f + .debug_tuple("Inbound") + .field(&"_") + .field(&stream_set) + .finish(), } } } @@ -121,10 +129,17 @@ pub enum FromBehaviour { } #[derive(Debug)] pub enum FromHandler { - Started { stream_set: StreamSet }, - Succeeded { stream_set: StreamSet }, + Started { + stream_set: StreamSet, + }, + Succeeded { + stream_set: StreamSet, + }, Stopped, - Failed(anyhow::Error), + Failed { + stream_set: StreamSet, + error: anyhow::Error, + }, } impl ConnectionHandler for Handler @@ -172,7 +187,8 @@ where let protocol = SubstreamProtocol::new(MultiReadyUpgrade::new(vec![stream_set]), ()); return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }); } - State::Outbound(stream) | State::Inbound(stream) => { + State::Outbound(stream, stream_set) | State::Inbound(stream, stream_set) => { + let stream_set = *stream_set; if let Poll::Ready(result) = stream.poll_unpin(cx) { self.transition_state(State::Idle); match result { @@ -183,7 +199,10 @@ where } Err(e) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - FromHandler::Failed(e), + FromHandler::Failed { + stream_set, + error: e, + }, )); } } @@ -201,8 +220,8 @@ where State::RequestOutbound { .. } | State::WaitingOutbound { .. } | State::WaitingInbound - | State::Outbound(_) - | State::Inbound(_) => {} + | State::Outbound(_, _) + | State::Inbound(_, _) => {} }, } } @@ -245,13 +264,13 @@ where ) .boxed(), }; - self.transition_state(State::Inbound(stream)); + self.transition_state(State::Inbound(stream, stream_set)); } // Ignore inbound connection when we are not expecting it State::RequestOutbound { .. } | State::WaitingOutbound { .. } - | State::Inbound(_) - | State::Outbound(_) => {} + | State::Inbound(_, _) + | State::Outbound(_, _) => {} } } libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedOutbound( @@ -282,14 +301,14 @@ where ) .boxed(), }; - self.transition_state(State::Outbound(stream)); + self.transition_state(State::Outbound(stream, stream_set)); } // Ignore outbound connection when we are not expecting it State::Idle | State::WaitingInbound | State::RequestOutbound { .. } - | State::Outbound(_) - | State::Inbound(_) => {} + | State::Outbound(_, _) + | State::Inbound(_, _) => {} } } libp2p::swarm::handler::ConnectionEvent::AddressChange(_) => {} @@ -306,8 +325,8 @@ where State::Idle | State::WaitingOutbound { .. } | State::RequestOutbound { .. } - | State::Outbound(_) - | State::Inbound(_) => {} + | State::Outbound(_, _) + | State::Inbound(_, _) => {} } } // We failed to upgrade the outbound connection. @@ -323,8 +342,8 @@ where State::Idle | State::WaitingInbound | State::RequestOutbound { .. } - | State::Outbound(_) - | State::Inbound(_) => {} + | State::Outbound(_, _) + | State::Inbound(_, _) => {} } } event => { diff --git a/recon/src/libp2p/stream_set.rs b/recon/src/libp2p/stream_set.rs index 323409e44..e9d7e32ae 100644 --- a/recon/src/libp2p/stream_set.rs +++ b/recon/src/libp2p/stream_set.rs @@ -3,7 +3,7 @@ use anyhow::anyhow; use crate::libp2p::{PROTOCOL_NAME_INTEREST, PROTOCOL_NAME_MODEL}; /// Represents a stream set key -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Copy, Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] pub enum StreamSet { /// Stream set of interest ranges Interest, diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index 8dbe821f8..7f603f90e 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -134,12 +134,17 @@ macro_rules! setup_test { Metrics::register(&mut Registry::default()), ); + // Use shorter timings with large backoff for testing. + let config = crate::libp2p::Config { + per_peer_sync_delay: std::time::Duration::from_millis(10), + per_peer_sync_backoff: 100.0, + per_peer_maximum_sync_delay: std::time::Duration::from_millis(1000), + }; let swarm1 = Swarm::new_ephemeral(|_| { - crate::libp2p::Behaviour::new(alice_interest, alice, crate::libp2p::Config::default()) - }); - let swarm2 = Swarm::new_ephemeral(|_| { - crate::libp2p::Behaviour::new(bob_interests, bob, crate::libp2p::Config::default()) + crate::libp2p::Behaviour::new(alice_interest, alice, config.clone()) }); + let swarm2 = + Swarm::new_ephemeral(|_| crate::libp2p::Behaviour::new(bob_interests, bob, config)); (swarm1, swarm2) }}; @@ -204,7 +209,9 @@ async fn initiator_model_error() { failed_peer, crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm2.local_peer_id().to_owned(), - status: PeerStatus::Failed + status: PeerStatus::Failed { + stream_set: StreamSet::Model + } }) ); }; @@ -243,7 +250,120 @@ async fn responder_model_error() { p2_e4, crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm1.local_peer_id().to_owned(), - status: PeerStatus::Failed + status: PeerStatus::Failed { + stream_set: StreamSet::Model + } + }) + ); + }; + + fut.await; +} + +#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn model_error_backoff() { + let mut bob_model_store = BTreeStoreErrors::default(); + bob_model_store.set_error(Error::new_transient(anyhow::anyhow!( + "transient error should be handled" + ))); + let (mut swarm1, mut swarm2) = setup_test!( + BTreeStoreErrors::default(), + BTreeStoreErrors::default(), + bob_model_store, + BTreeStoreErrors::default(), + ); + + let fut = async move { + swarm1.listen().with_memory_addr_external().await; + swarm2.connect(&mut swarm1).await; + + // Expect interests to sync twice in a row since models fail to sync + let ( + [p1_e1, p1_e2, p1_e3, p1_e4, p1_e5, p1_e6, p1_e7, p1_e8, p1_e9, p1_e10, p1_e11, p1_e12], + [p2_e1, p2_e2, p2_e3, p2_e4, p2_e5, p2_e6, p2_e7, p2_e8, p2_e9, p2_e10, p2_e11, p2_e12], + ): ([crate::libp2p::Event; 12], [crate::libp2p::Event; 12]) = + libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; + + let events = [ + [ + &p1_e1, &p1_e2, &p1_e3, &p1_e4, &p1_e5, &p1_e6, &p1_e7, &p1_e8, &p1_e9, &p1_e10, + &p1_e11, &p1_e12, + ], + [ + &p2_e1, &p2_e2, &p2_e3, &p2_e4, &p2_e5, &p2_e6, &p2_e7, &p2_e8, &p2_e9, &p2_e10, + &p2_e11, &p2_e12, + ], + ]; + + for ev in events.iter().flatten() { + info!("{:?}", ev); + } + let stream_sets: Vec<_> = events + .iter() + .map(|peer_events| { + peer_events + .iter() + .map(|ev| { + let crate::libp2p::Event::PeerEvent(PeerEvent { status, .. }) = ev; + match status { + PeerStatus::Waiting | PeerStatus::Stopped => None, + PeerStatus::Synchronized { stream_set } + | PeerStatus::Started { stream_set } + | PeerStatus::Failed { stream_set } => Some(*stream_set), + } + }) + .collect::>>() + }) + .collect(); + let expected_stream_set_order = vec![ + // First interests sync + Some(StreamSet::Interest), + Some(StreamSet::Interest), + // First model sync + Some(StreamSet::Model), + Some(StreamSet::Model), + // Second interests sync + Some(StreamSet::Interest), + Some(StreamSet::Interest), + // Second model sync with initial short backoff + Some(StreamSet::Model), + Some(StreamSet::Model), + // Third interests sync + Some(StreamSet::Interest), + Some(StreamSet::Interest), + // Third model sync is skipped because the backoff pushed it past the interests sync + Some(StreamSet::Interest), + Some(StreamSet::Interest), + ]; + assert_eq!( + stream_sets, + vec![expected_stream_set_order.clone(), expected_stream_set_order] + ); + assert_eq!( + p2_e4, + crate::libp2p::Event::PeerEvent(PeerEvent { + remote_peer_id: swarm1.local_peer_id().to_owned(), + status: PeerStatus::Failed { + stream_set: StreamSet::Model + } + }) + ); + assert_eq!( + p1_e12, + crate::libp2p::Event::PeerEvent(PeerEvent { + remote_peer_id: swarm2.local_peer_id().to_owned(), + status: PeerStatus::Synchronized { + stream_set: StreamSet::Interest + } + }) + ); + assert_eq!( + p2_e12, + crate::libp2p::Event::PeerEvent(PeerEvent { + remote_peer_id: swarm1.local_peer_id().to_owned(), + status: PeerStatus::Synchronized { + stream_set: StreamSet::Interest + } }) ); };