Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh-net)!: Create our own connection struct #2768

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 27 additions & 89 deletions iroh-blobs/examples/connect/mod.rs
Original file line number Diff line number Diff line change
@@ -1,94 +1,32 @@
//! Common code used to created quinn connections in the examples
use anyhow::{bail, Context, Result};
use quinn::crypto::rustls::{QuicClientConfig, QuicServerConfig};
use std::{path::PathBuf, sync::Arc};
use tokio::fs;
//! Common code used to created connections in the examples

pub const EXAMPLE_ALPN: &[u8] = b"n0/iroh/examples/bytes/0";

// Path where the tls certificates are saved. This example expects that you have run the `provide-bytes` example first, which generates the certificates.
pub const CERT_PATH: &str = "./certs";

// derived from `quinn/examples/client.rs`
// load the certificates from CERT_PATH
// Assumes that you have already run the `provide-bytes` example, that generates the certificates
#[allow(unused)]
pub async fn load_certs() -> Result<rustls::RootCertStore> {
let mut roots = rustls::RootCertStore::empty();
let path = PathBuf::from(CERT_PATH).join("cert.der");
match fs::read(path).await {
Ok(cert) => {
roots.add(rustls::pki_types::CertificateDer::from(cert))?;
}
Err(e) => {
bail!("failed to open local server certificate: {}\nYou must run the `provide-bytes` example to create the certificate.\n\tcargo run --example provide-bytes", e);
}
}
Ok(roots)
}
use anyhow::{Context, Result};
use futures_lite::StreamExt;
use iroh_net::discovery::dns::DnsDiscovery;
use iroh_net::discovery::local_swarm_discovery::LocalSwarmDiscovery;
use iroh_net::discovery::pkarr::PkarrPublisher;
use iroh_net::discovery::ConcurrentDiscovery;
use iroh_net::key::SecretKey;

// derived from `quinn/examples/server.rs`
// creates a self signed certificate and saves it to "./certs"
#[allow(unused)]
pub async fn make_and_write_certs() -> Result<(
rustls::pki_types::PrivateKeyDer<'static>,
rustls::pki_types::CertificateDer<'static>,
)> {
let path = std::path::PathBuf::from(CERT_PATH);
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap();
let key_path = path.join("key.der");
let cert_path = path.join("cert.der");
pub const EXAMPLE_ALPN: &[u8] = b"n0/iroh/examples/bytes/0";

let key = cert.serialize_private_key_der();
let cert = cert.serialize_der().unwrap();
tokio::fs::create_dir_all(path)
.await
.context("failed to create certificate directory")?;
tokio::fs::write(cert_path, &cert)
.await
.context("failed to write certificate")?;
tokio::fs::write(key_path, &key)
pub async fn make_iroh_endpoint() -> Result<iroh_net::Endpoint> {
let secret_key = SecretKey::generate();
let discovery = ConcurrentDiscovery::from_services(vec![
Box::new(PkarrPublisher::n0_dns(secret_key.clone())),
Box::new(DnsDiscovery::n0_dns()),
Box::new(LocalSwarmDiscovery::new(secret_key.public())?),
]);
let ep = iroh_net::Endpoint::builder()
.secret_key(secret_key)
.discovery(Box::new(discovery))
.alpns(vec![EXAMPLE_ALPN.to_vec()])
.bind()
.await?;
// Wait for full connectivity
ep.direct_addresses()
.next()
.await
.context("failed to write private key")?;

Ok((
rustls::pki_types::PrivateKeyDer::try_from(key).unwrap(),
rustls::pki_types::CertificateDer::from(cert),
))
}

// derived from `quinn/examples/client.rs`
// Creates a client quinn::Endpoint
#[allow(unused)]
pub fn make_client_endpoint(roots: rustls::RootCertStore) -> Result<quinn::Endpoint> {
let mut client_crypto = rustls::ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();

client_crypto.alpn_protocols = vec![EXAMPLE_ALPN.to_vec()];
let client_config: QuicClientConfig = client_crypto.try_into()?;
let client_config = quinn::ClientConfig::new(Arc::new(client_config));
let mut endpoint = quinn::Endpoint::client("[::]:0".parse().unwrap())?;
endpoint.set_default_client_config(client_config);
Ok(endpoint)
}

// derived from `quinn/examples/server.rs`
// makes a quinn server endpoint
#[allow(unused)]
pub fn make_server_endpoint(
key: rustls::pki_types::PrivateKeyDer<'static>,
cert: rustls::pki_types::CertificateDer<'static>,
) -> Result<quinn::Endpoint> {
let mut server_crypto = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(vec![cert], key)?;
server_crypto.alpn_protocols = vec![EXAMPLE_ALPN.to_vec()];
let server_config: QuicServerConfig = server_crypto.try_into()?;
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(server_config));
let transport_config = Arc::get_mut(&mut server_config.transport).unwrap();
transport_config.max_concurrent_uni_streams(0_u8.into());

let endpoint = quinn::Endpoint::server(server_config, "[::1]:4433".parse()?)?;
Ok(endpoint)
.context("no direct addrs")?;
Ok(ep)
}
26 changes: 11 additions & 15 deletions iroh-blobs/examples/fetch-fsm.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
//! An example how to download a single blob or collection from a node and write it to stdout using the `get` finite state machine directly.
//!
//! Since this example does not use [`iroh-net::Endpoint`], it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses.
//!
//! Run the provide-bytes example first. It will give instructions on how to run this example properly.
use std::net::SocketAddr;

use anyhow::{Context, Result};
use iroh_io::ConcatenateSliceWriter;
use iroh_net::ticket::NodeTicket;
use tracing_subscriber::{prelude::*, EnvFilter};

use iroh_blobs::{
Expand All @@ -17,7 +15,7 @@ use iroh_blobs::{
};

mod connect;
use connect::{load_certs, make_client_endpoint};
use connect::{make_iroh_endpoint, EXAMPLE_ALPN};

// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
pub fn setup_logging() {
Expand All @@ -34,10 +32,10 @@ async fn main() -> Result<()> {
setup_logging();
let args: Vec<_> = std::env::args().collect();
if args.len() != 4 {
anyhow::bail!("usage: fetch-bytes [HASH] [SOCKET_ADDR] [FORMAT]");
anyhow::bail!("usage: fetch-bytes HASH NODE_TICKET FORMAT");
}
let hash: Hash = args[1].parse().context("unable to parse [HASH]")?;
let addr: SocketAddr = args[2].parse().context("unable to parse [SOCKET_ADDR]")?;
let hash: Hash = args[1].parse().context("unable to parse HASH")?;
let ticket: NodeTicket = args[2].parse().context("unable to parse NODE_TICKET")?;
let format = {
if args[3] != "blob" && args[3] != "collection" {
anyhow::bail!(
Expand All @@ -48,17 +46,15 @@ async fn main() -> Result<()> {
args[3].clone()
};

// load tls certificates
// This will error if you have not run the `provide-bytes` example
let roots = load_certs().await?;

// create an endpoint to listen for incoming connections
let endpoint = make_client_endpoint(roots)?;
println!("\nlistening on {}", endpoint.local_addr()?);
println!("fetching hash {hash} from {addr}");
let endpoint = make_iroh_endpoint().await?;
println!("\nlistening as NodeId {}", endpoint.node_id());
println!("fetching hash {hash} from {ticket}");

// connect
let connection = endpoint.connect(addr, "localhost")?.await?;
let connection = endpoint
.connect(ticket.node_addr().clone(), EXAMPLE_ALPN)
.await?;

if format == "collection" {
// create a request for a collection
Expand Down
23 changes: 11 additions & 12 deletions iroh-blobs/examples/fetch-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
//! Since this example does not use [`iroh-net::Endpoint`], it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses.
//!
//! Run the provide-bytes example first. It will give instructions on how to run this example properly.
use std::net::SocketAddr;

use anyhow::{Context, Result};
use connect::EXAMPLE_ALPN;
use iroh_net::ticket::NodeTicket;
use tracing_subscriber::{prelude::*, EnvFilter};

use std::io;
Expand All @@ -25,7 +26,7 @@ use iroh_blobs::{
};

mod connect;
use connect::{load_certs, make_client_endpoint};
use connect::make_iroh_endpoint;

// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
pub fn setup_logging() {
Expand All @@ -42,10 +43,10 @@ async fn main() -> Result<()> {
setup_logging();
let args: Vec<_> = std::env::args().collect();
if args.len() != 4 {
anyhow::bail!("usage: fetch-bytes [HASH] [SOCKET_ADDR] [FORMAT]");
anyhow::bail!("usage: fetch-bytes HASH NODE_TICKET FORMAT");
}
let hash: Hash = args[1].parse().context("unable to parse [HASH]")?;
let addr: SocketAddr = args[2].parse().context("unable to parse [SOCKET_ADDR]")?;
let ticket: NodeTicket = args[2].parse().context("unable to parse NODE_TICKET")?;
let format = {
if args[3] != "blob" && args[3] != "collection" {
anyhow::bail!(
Expand All @@ -56,17 +57,15 @@ async fn main() -> Result<()> {
args[3].clone()
};

// load tls certificates
// This will error if you have not run the `provide-bytes` example
let roots = load_certs().await?;

// create an endpoint to listen for incoming connections
let endpoint = make_client_endpoint(roots)?;
println!("\nlistening on {}", endpoint.local_addr()?);
println!("fetching hash {hash} from {addr}");
let endpoint = make_iroh_endpoint().await?;
println!("\nlistening as NodeId {}", endpoint.node_id());
println!("fetching hash {hash} from {ticket}");

// connect
let connection = endpoint.connect(addr, "localhost")?.await?;
let connection = endpoint
.connect(ticket.node_addr().clone(), EXAMPLE_ALPN)
.await?;

let mut stream = if format == "collection" {
// create a request for a collection
Expand Down
24 changes: 11 additions & 13 deletions iroh-blobs/examples/provide-bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@
//! Since this example does not use [`iroh-net::Endpoint`], it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses.
//!
//! Run this example with
//! cargo run --example provide-bytes blob
//! cargo run --all-features --example provide-bytes blob
//! To provide a blob (single file)
//!
//! Run this example with
//! cargo run --example provide-bytes collection
//! cargo run --all-features --example provide-bytes collection
//! To provide a collection (multiple blobs)
use anyhow::Result;
use iroh_net::ticket::NodeTicket;
use tracing::warn;
use tracing_subscriber::{prelude::*, EnvFilter};

use iroh_blobs::{format::collection::Collection, util::local_pool::LocalPool, Hash};

mod connect;
use connect::{make_and_write_certs, make_server_endpoint, CERT_PATH};

use connect::make_iroh_endpoint;

// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
pub fn setup_logging() {
Expand All @@ -32,7 +34,7 @@ async fn main() -> Result<()> {
let args: Vec<_> = std::env::args().collect();
if args.len() != 2 {
anyhow::bail!(
"usage: provide-bytes [FORMAT], where [FORMAT] is either 'blob' or 'collection'\n\nThe 'blob' example demonstrates sending a single blob of bytes. The 'collection' example demonstrates sending multiple blobs of bytes, grouped together in a 'collection'."
"usage: provide-bytes FORMAT, where FORMAT is either 'blob' or 'collection'\n\nThe 'blob' example demonstrates sending a single blob of bytes. The 'collection' example demonstrates sending multiple blobs of bytes, grouped together in a 'collection'."
);
}
let format = {
Expand Down Expand Up @@ -68,17 +70,14 @@ async fn main() -> Result<()> {
(db, Hash::from(hash.as_bytes()))
};

// create tls certs and save to CERT_PATH
let (key, cert) = make_and_write_certs().await?;

// create an endpoint to listen for incoming connections
let endpoint = make_server_endpoint(key, cert)?;
let addr = endpoint.local_addr()?;
println!("\nlistening on {addr}");
let endpoint = make_iroh_endpoint().await?;
println!("providing hash {hash}");
let node_ticket = NodeTicket::new(endpoint.node_addr().await?)?;
println!("Node ticket: {node_ticket}");

println!("\nfetch the content using a finite state machine by running the following example:\n\ncargo run --example fetch-fsm {hash} \"{addr}\" {format}");
println!("\nfetch the content using a stream by running the following example:\n\ncargo run --example fetch-stream {hash} \"{addr}\" {format}\n");
println!("\nfetch the content using a finite state machine by running the following example:\n\ncargo run --all-features --example fetch-fsm {hash} \"{node_ticket}\" {format}");
println!("\nfetch the content using a stream by running the following example:\n\ncargo run --all-features --example fetch-stream {hash} \"{node_ticket}\" {format}\n");

// create a new local pool handle with 1 worker thread
let lp = LocalPool::single();
Expand Down Expand Up @@ -116,7 +115,6 @@ async fn main() -> Result<()> {

match tokio::signal::ctrl_c().await {
Ok(()) => {
tokio::fs::remove_dir_all(std::path::PathBuf::from(CERT_PATH)).await?;
accept_task.abort();
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion iroh-cli/src/commands/doctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ async fn accept(
match connecting.await {
Ok(connection) => {
if n == 0 {
let Ok(remote_peer_id) = endpoint::get_remote_node_id(&connection) else {
let Ok(remote_peer_id) = connection.remote_node_id() else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let Ok(remote_node_id)

return;
};
println!("Accepted connection from {}", remote_peer_id);
Expand Down
4 changes: 2 additions & 2 deletions iroh-docs/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
time::{Duration, Instant},
};

use iroh_net::{endpoint::get_remote_node_id, key::PublicKey, Endpoint, NodeAddr};
use iroh_net::{key::PublicKey, Endpoint, NodeAddr};
use serde::{Deserialize, Serialize};
use tracing::{debug, error_span, trace, Instrument};

Expand Down Expand Up @@ -116,7 +116,7 @@ where
{
let t_start = Instant::now();
let connection = connecting.await.map_err(AcceptError::connect)?;
let peer = get_remote_node_id(&connection).map_err(AcceptError::connect)?;
let peer = connection.remote_node_id().map_err(AcceptError::connect)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node_id?

let (mut send_stream, mut recv_stream) = connection
.accept_bi()
.await
Expand Down
2 changes: 1 addition & 1 deletion iroh-gossip/examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ async fn handle_connection(
) -> anyhow::Result<()> {
let alpn = conn.alpn().await?;
let conn = conn.await?;
let peer_id = iroh_net::endpoint::get_remote_node_id(&conn)?;
let peer_id = conn.remote_node_id()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this node_id while we're at it?

match alpn.as_ref() {
GOSSIP_ALPN => gossip.handle_connection(conn).await.context(format!(
"connection to {peer_id} with ALPN {} failed",
Expand Down
4 changes: 2 additions & 2 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures_util::TryFutureExt;
use iroh_metrics::inc;
use iroh_net::{
dialer::Dialer,
endpoint::{get_remote_node_id, Connection, DirectAddr},
endpoint::{Connection, DirectAddr},
key::PublicKey,
AddrInfo, Endpoint, NodeAddr, NodeId,
};
Expand Down Expand Up @@ -147,7 +147,7 @@ impl Gossip {
///
/// Make sure to check the ALPN protocol yourself before passing the connection.
pub async fn handle_connection(&self, conn: Connection) -> anyhow::Result<()> {
let peer_id = get_remote_node_id(&conn)?;
let peer_id = conn.remote_node_id()?;
self.send(ToActor::HandleConnection(peer_id, ConnOrigin::Accept, conn))
.await?;
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions iroh-net/examples/dht_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use std::str::FromStr;

use clap::Parser;
use iroh_net::{endpoint::get_remote_node_id, Endpoint, NodeId};
use iroh_net::{Endpoint, NodeId};
use tracing::warn;
use url::Url;

Expand Down Expand Up @@ -88,7 +88,7 @@ async fn chat_server(args: Args) -> anyhow::Result<()> {
};
tokio::spawn(async move {
let connection = connecting.await?;
let remote_node_id = get_remote_node_id(&connection)?;
let remote_node_id = connection.remote_node_id()?;
println!("got connection from {}", remote_node_id);
// just leave the tasks hanging. this is just an example.
let (mut writer, mut reader) = connection.accept_bi().await?;
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/examples/listen-unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn main() -> anyhow::Result<()> {
};
let alpn = connecting.alpn().await?;
let conn = connecting.await?;
let node_id = iroh_net::endpoint::get_remote_node_id(&conn)?;
let node_id = conn.remote_node_id()?;
info!(
"new (unreliable) connection from {node_id} with ALPN {} (coming from {})",
String::from_utf8_lossy(&alpn),
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/examples/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async fn main() -> anyhow::Result<()> {
};
let alpn = connecting.alpn().await?;
let conn = connecting.await?;
let node_id = iroh_net::endpoint::get_remote_node_id(&conn)?;
let node_id = conn.remote_node_id()?;
info!(
"new connection from {node_id} with ALPN {} (coming from {})",
String::from_utf8_lossy(&alpn),
Expand Down
Loading
Loading