Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove Ceramic peer discovery #161

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions one/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,7 @@ impl Daemon {
None
};
let ipfs = Ipfs::builder()
.with_p2p(
p2p_config,
keypair,
recons,
&network.name(),
sql_pool.clone(),
)
.with_p2p(p2p_config, keypair, recons, sql_pool.clone())
.await?
.build(sql_pool.clone())
.await?;
Expand Down
11 changes: 1 addition & 10 deletions one/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ impl Builder<Init> {
libp2p_config: Libp2pConfig,
keypair: Keypair,
recons: Option<(I, M)>,
ceramic_peers_key: &str,
sql_pool: SqlitePool,
) -> anyhow::Result<Builder<WithP2p>>
where
Expand All @@ -52,15 +51,7 @@ impl Builder<Init> {

config.libp2p = libp2p_config;

let mut p2p = Node::new(
config,
addr.clone(),
keypair,
recons,
ceramic_peers_key,
sql_pool,
)
.await?;
let mut p2p = Node::new(config, addr.clone(), keypair, recons, sql_pool).await?;

let task = task::spawn(async move {
if let Err(err) = p2p.run().await {
Expand Down
40 changes: 14 additions & 26 deletions p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,22 @@ use ceramic_core::{EventId, Interest};
use cid::Cid;
use iroh_bitswap::{Bitswap, Block, Config as BitswapConfig, Store};
use iroh_rpc_client::Client;
use libp2p::gossipsub::{self, MessageAuthenticity};
use libp2p::identify;
use libp2p::kad::{
self,
store::{MemoryStore, MemoryStoreConfig},
RecordKey,
};
use libp2p::mdns::tokio::Behaviour as Mdns;
use libp2p::multiaddr::Protocol;
use libp2p::ping::Behaviour as Ping;
use libp2p::relay;
use libp2p::swarm::behaviour::toggle::Toggle;
use libp2p::swarm::NetworkBehaviour;
use libp2p::{autonat, dcutr};
use libp2p::{
autonat,
connection_limits::{self, ConnectionLimits},
kad::QueryId,
dcutr,
gossipsub::{self, MessageAuthenticity},
identify,
kad::{
self,
store::{MemoryStore, MemoryStoreConfig},
},
mdns::tokio::Behaviour as Mdns,
multiaddr::Protocol,
ping::Behaviour as Ping,
relay,
swarm::behaviour::toggle::Toggle,
swarm::NetworkBehaviour,
};
use libp2p_identity::Keypair;
use recon::{libp2p::Recon, Sha256a};
Expand Down Expand Up @@ -266,17 +265,6 @@ where
}
Ok(())
}
pub fn discover_ceramic_peers(&mut self, key: &RecordKey) -> Option<QueryId> {
info!(?key, "discovering Ceramic peers");
if let Some(kad) = self.kad.as_mut() {
if let Err(err) = kad.start_providing(key.clone()) {
warn!(%err,"failed to start providing ceramic peers key");
}
Some(kad.get_providers(key.clone()))
} else {
None
}
}
}

#[cfg(test)]
Expand Down
42 changes: 3 additions & 39 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use libp2p::{
identity::Keypair,
kad::{
self, BootstrapOk, GetClosestPeersError, GetClosestPeersOk, GetProvidersOk, QueryId,
QueryResult, RecordKey,
QueryResult,
},
mdns,
metrics::Recorder,
multiaddr::Protocol,
swarm::{dial_opts::DialOpts, ConnectionHandler, DialError, NetworkBehaviour, SwarmEvent},
swarm::{dial_opts::DialOpts, ConnectionHandler, NetworkBehaviour, SwarmEvent},
PeerId, StreamProtocol, Swarm,
};
use sqlx::SqlitePool;
Expand Down Expand Up @@ -81,9 +81,6 @@ where
providers: Providers,
listen_addrs: Vec<Multiaddr>,

ceramic_peers_key: RecordKey,
ceramic_peers_query_id: Option<QueryId>,

trust_observed_addrs: bool,
}

Expand Down Expand Up @@ -118,7 +115,6 @@ pub(crate) const DEFAULT_PROVIDER_LIMIT: usize = 10;
const NICE_INTERVAL: Duration = Duration::from_secs(6);
const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60);
const EXPIRY_INTERVAL: Duration = Duration::from_secs(1);
const DISCOVER_INTERVAL: Duration = Duration::from_secs(5 * 60);

impl<I, M> Drop for Node<I, M>
where
Expand Down Expand Up @@ -147,7 +143,6 @@ where
rpc_addr: P2pAddr,
keypair: Keypair,
recons: Option<(I, M)>,
ceramic_peers_key: impl AsRef<[u8]>,
sql_pool: SqlitePool,
) -> Result<Self> {
let (network_sender_in, network_receiver_in) = channel(1024); // TODO: configurable
Expand Down Expand Up @@ -196,8 +191,6 @@ where
bitswap_sessions: Default::default(),
providers: Providers::new(4),
listen_addrs,
ceramic_peers_key: RecordKey::new(&ceramic_peers_key),
ceramic_peers_query_id: None,
trust_observed_addrs: libp2p_config.trust_observed_addrs,
})
}
Expand All @@ -218,7 +211,6 @@ where
let mut nice_interval = self.use_dht.then(|| tokio::time::interval(NICE_INTERVAL));
let mut bootstrap_interval = tokio::time::interval(BOOTSTRAP_INTERVAL);
let mut expiry_interval = tokio::time::interval(EXPIRY_INTERVAL);
let mut discover_interval = tokio::time::interval(DISCOVER_INTERVAL);

loop {
inc!(P2PMetrics::LoopCounter);
Expand Down Expand Up @@ -278,9 +270,6 @@ where
warn!("expiry error {:?}", err);
}
}
_ = discover_interval.tick() => {
self.ceramic_peers_query_id = self.swarm.behaviour_mut().discover_ceramic_peers(&self.ceramic_peers_key);
}
}
}
}
Expand Down Expand Up @@ -541,31 +530,7 @@ where
})
.collect();

if self
.ceramic_peers_query_id
.map(|i| i == id)
.unwrap_or_default()
{
let local_peer_id = *self.swarm.local_peer_id();
let mut providers = providers
.into_iter()
.filter(|peer| *peer != local_peer_id);
info!(
peers.count = providers.by_ref().count(),
"discovered ceramic peers"
);
providers.for_each(|peer| {
debug!(?peer, "dialing ceramic peer");
if let Err(err) = self.swarm.dial(peer) {
if !matches!(
err,
DialError::DialPeerConditionFalse(_)
) {
warn!(%err, "failed to dial ceramic peer")
}
}
});
} else if let Some(kad) = behaviour.kad.as_mut() {
if let Some(kad) = behaviour.kad.as_mut() {
debug!(
"provider results for {:?} last: {}",
key, step.last
Expand Down Expand Up @@ -1359,7 +1324,6 @@ mod tests {
rpc_server_addr,
keypair.into(),
None::<(DummyRecon<Interest>, DummyRecon<EventId>)>,
"/ceramic-test",
sql_pool,
)
.await?;
Expand Down
Loading