Skip to content

Commit

Permalink
fix: remove Ceramic peer discovery
Browse files Browse the repository at this point in the history
We are seeing issue related to Kademlia and seem to be connected to
Ceramic peer discovery. As this feature is not needed currently, its
being removed to eliminate any compounding variables. Once its needed it
can be reintroduced and fully tested against our established
baseline.
  • Loading branch information
nathanielc committed Oct 30, 2023
1 parent 3bd79d3 commit 953e09b
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 82 deletions.
8 changes: 1 addition & 7 deletions one/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,7 @@ impl Daemon {
None
};
let ipfs = Ipfs::builder()
.with_p2p(
p2p_config,
keypair,
recons,
&network.name(),
sql_pool.clone(),
)
.with_p2p(p2p_config, keypair, recons, sql_pool.clone())
.await?
.build(sql_pool.clone())
.await?;
Expand Down
11 changes: 1 addition & 10 deletions one/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ impl Builder<Init> {
libp2p_config: Libp2pConfig,
keypair: Keypair,
recons: Option<(I, M)>,
ceramic_peers_key: &str,
sql_pool: SqlitePool,
) -> anyhow::Result<Builder<WithP2p>>
where
Expand All @@ -52,15 +51,7 @@ impl Builder<Init> {

config.libp2p = libp2p_config;

let mut p2p = Node::new(
config,
addr.clone(),
keypair,
recons,
ceramic_peers_key,
sql_pool,
)
.await?;
let mut p2p = Node::new(config, addr.clone(), keypair, recons, sql_pool).await?;

let task = task::spawn(async move {
if let Err(err) = p2p.run().await {
Expand Down
40 changes: 14 additions & 26 deletions p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,22 @@ use ceramic_core::{EventId, Interest};
use cid::Cid;
use iroh_bitswap::{Bitswap, Block, Config as BitswapConfig, Store};
use iroh_rpc_client::Client;
use libp2p::gossipsub::{self, MessageAuthenticity};
use libp2p::identify;
use libp2p::kad::{
self,
store::{MemoryStore, MemoryStoreConfig},
RecordKey,
};
use libp2p::mdns::tokio::Behaviour as Mdns;
use libp2p::multiaddr::Protocol;
use libp2p::ping::Behaviour as Ping;
use libp2p::relay;
use libp2p::swarm::behaviour::toggle::Toggle;
use libp2p::swarm::NetworkBehaviour;
use libp2p::{autonat, dcutr};
use libp2p::{
autonat,
connection_limits::{self, ConnectionLimits},
kad::QueryId,
dcutr,
gossipsub::{self, MessageAuthenticity},
identify,
kad::{
self,
store::{MemoryStore, MemoryStoreConfig},
},
mdns::tokio::Behaviour as Mdns,
multiaddr::Protocol,
ping::Behaviour as Ping,
relay,
swarm::behaviour::toggle::Toggle,
swarm::NetworkBehaviour,
};
use libp2p_identity::Keypair;
use recon::{libp2p::Recon, Sha256a};
Expand Down Expand Up @@ -266,17 +265,6 @@ where
}
Ok(())
}
pub fn discover_ceramic_peers(&mut self, key: &RecordKey) -> Option<QueryId> {
info!(?key, "discovering Ceramic peers");
if let Some(kad) = self.kad.as_mut() {
if let Err(err) = kad.start_providing(key.clone()) {
warn!(%err,"failed to start providing ceramic peers key");
}
Some(kad.get_providers(key.clone()))
} else {
None
}
}
}

#[cfg(test)]
Expand Down
42 changes: 3 additions & 39 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use libp2p::{
identity::Keypair,
kad::{
self, BootstrapOk, GetClosestPeersError, GetClosestPeersOk, GetProvidersOk, QueryId,
QueryResult, RecordKey,
QueryResult,
},
mdns,
metrics::Recorder,
multiaddr::Protocol,
swarm::{dial_opts::DialOpts, ConnectionHandler, DialError, NetworkBehaviour, SwarmEvent},
swarm::{dial_opts::DialOpts, ConnectionHandler, NetworkBehaviour, SwarmEvent},
PeerId, StreamProtocol, Swarm,
};
use sqlx::SqlitePool;
Expand Down Expand Up @@ -81,9 +81,6 @@ where
providers: Providers,
listen_addrs: Vec<Multiaddr>,

ceramic_peers_key: RecordKey,
ceramic_peers_query_id: Option<QueryId>,

trust_observed_addrs: bool,
}

Expand Down Expand Up @@ -118,7 +115,6 @@ pub(crate) const DEFAULT_PROVIDER_LIMIT: usize = 10;
const NICE_INTERVAL: Duration = Duration::from_secs(6);
const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60);
const EXPIRY_INTERVAL: Duration = Duration::from_secs(1);
const DISCOVER_INTERVAL: Duration = Duration::from_secs(5 * 60);

impl<I, M> Drop for Node<I, M>
where
Expand Down Expand Up @@ -147,7 +143,6 @@ where
rpc_addr: P2pAddr,
keypair: Keypair,
recons: Option<(I, M)>,
ceramic_peers_key: impl AsRef<[u8]>,
sql_pool: SqlitePool,
) -> Result<Self> {
let (network_sender_in, network_receiver_in) = channel(1024); // TODO: configurable
Expand Down Expand Up @@ -196,8 +191,6 @@ where
bitswap_sessions: Default::default(),
providers: Providers::new(4),
listen_addrs,
ceramic_peers_key: RecordKey::new(&ceramic_peers_key),
ceramic_peers_query_id: None,
trust_observed_addrs: libp2p_config.trust_observed_addrs,
})
}
Expand All @@ -218,7 +211,6 @@ where
let mut nice_interval = self.use_dht.then(|| tokio::time::interval(NICE_INTERVAL));
let mut bootstrap_interval = tokio::time::interval(BOOTSTRAP_INTERVAL);
let mut expiry_interval = tokio::time::interval(EXPIRY_INTERVAL);
let mut discover_interval = tokio::time::interval(DISCOVER_INTERVAL);

loop {
inc!(P2PMetrics::LoopCounter);
Expand Down Expand Up @@ -278,9 +270,6 @@ where
warn!("expiry error {:?}", err);
}
}
_ = discover_interval.tick() => {
self.ceramic_peers_query_id = self.swarm.behaviour_mut().discover_ceramic_peers(&self.ceramic_peers_key);
}
}
}
}
Expand Down Expand Up @@ -541,31 +530,7 @@ where
})
.collect();

if self
.ceramic_peers_query_id
.map(|i| i == id)
.unwrap_or_default()
{
let local_peer_id = *self.swarm.local_peer_id();
let mut providers = providers
.into_iter()
.filter(|peer| *peer != local_peer_id);
info!(
peers.count = providers.by_ref().count(),
"discovered ceramic peers"
);
providers.for_each(|peer| {
debug!(?peer, "dialing ceramic peer");
if let Err(err) = self.swarm.dial(peer) {
if !matches!(
err,
DialError::DialPeerConditionFalse(_)
) {
warn!(%err, "failed to dial ceramic peer")
}
}
});
} else if let Some(kad) = behaviour.kad.as_mut() {
if let Some(kad) = behaviour.kad.as_mut() {
debug!(
"provider results for {:?} last: {}",
key, step.last
Expand Down Expand Up @@ -1359,7 +1324,6 @@ mod tests {
rpc_server_addr,
keypair.into(),
None::<(DummyRecon<Interest>, DummyRecon<EventId>)>,
"/ceramic-test",
sql_pool,
)
.await?;
Expand Down

0 comments on commit 953e09b

Please sign in to comment.