diff --git a/iroh-blobs/examples/connect/mod.rs b/iroh-blobs/examples/connect/mod.rs index 4191275a46..d667add0eb 100644 --- a/iroh-blobs/examples/connect/mod.rs +++ b/iroh-blobs/examples/connect/mod.rs @@ -1,94 +1,32 @@ -//! Common code used to created quinn connections in the examples -use anyhow::{bail, Context, Result}; -use quinn::crypto::rustls::{QuicClientConfig, QuicServerConfig}; -use std::{path::PathBuf, sync::Arc}; -use tokio::fs; +//! Common code used to created connections in the examples -pub const EXAMPLE_ALPN: &[u8] = b"n0/iroh/examples/bytes/0"; - -// Path where the tls certificates are saved. This example expects that you have run the `provide-bytes` example first, which generates the certificates. -pub const CERT_PATH: &str = "./certs"; - -// derived from `quinn/examples/client.rs` -// load the certificates from CERT_PATH -// Assumes that you have already run the `provide-bytes` example, that generates the certificates -#[allow(unused)] -pub async fn load_certs() -> Result { - let mut roots = rustls::RootCertStore::empty(); - let path = PathBuf::from(CERT_PATH).join("cert.der"); - match fs::read(path).await { - Ok(cert) => { - roots.add(rustls::pki_types::CertificateDer::from(cert))?; - } - Err(e) => { - bail!("failed to open local server certificate: {}\nYou must run the `provide-bytes` example to create the certificate.\n\tcargo run --example provide-bytes", e); - } - } - Ok(roots) -} +use anyhow::{Context, Result}; +use futures_lite::StreamExt; +use iroh_net::discovery::dns::DnsDiscovery; +use iroh_net::discovery::local_swarm_discovery::LocalSwarmDiscovery; +use iroh_net::discovery::pkarr::PkarrPublisher; +use iroh_net::discovery::ConcurrentDiscovery; +use iroh_net::key::SecretKey; -// derived from `quinn/examples/server.rs` -// creates a self signed certificate and saves it to "./certs" -#[allow(unused)] -pub async fn make_and_write_certs() -> Result<( - rustls::pki_types::PrivateKeyDer<'static>, - rustls::pki_types::CertificateDer<'static>, -)> { - let path = std::path::PathBuf::from(CERT_PATH); - let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap(); - let key_path = path.join("key.der"); - let cert_path = path.join("cert.der"); +pub const EXAMPLE_ALPN: &[u8] = b"n0/iroh/examples/bytes/0"; - let key = cert.serialize_private_key_der(); - let cert = cert.serialize_der().unwrap(); - tokio::fs::create_dir_all(path) - .await - .context("failed to create certificate directory")?; - tokio::fs::write(cert_path, &cert) - .await - .context("failed to write certificate")?; - tokio::fs::write(key_path, &key) +pub async fn make_iroh_endpoint() -> Result { + let secret_key = SecretKey::generate(); + let discovery = ConcurrentDiscovery::from_services(vec![ + Box::new(PkarrPublisher::n0_dns(secret_key.clone())), + Box::new(DnsDiscovery::n0_dns()), + Box::new(LocalSwarmDiscovery::new(secret_key.public())?), + ]); + let ep = iroh_net::Endpoint::builder() + .secret_key(secret_key) + .discovery(Box::new(discovery)) + .alpns(vec![EXAMPLE_ALPN.to_vec()]) + .bind() + .await?; + // Wait for full connectivity + ep.direct_addresses() + .next() .await - .context("failed to write private key")?; - - Ok(( - rustls::pki_types::PrivateKeyDer::try_from(key).unwrap(), - rustls::pki_types::CertificateDer::from(cert), - )) -} - -// derived from `quinn/examples/client.rs` -// Creates a client quinn::Endpoint -#[allow(unused)] -pub fn make_client_endpoint(roots: rustls::RootCertStore) -> Result { - let mut client_crypto = rustls::ClientConfig::builder() - .with_root_certificates(roots) - .with_no_client_auth(); - - client_crypto.alpn_protocols = vec![EXAMPLE_ALPN.to_vec()]; - let client_config: QuicClientConfig = client_crypto.try_into()?; - let client_config = quinn::ClientConfig::new(Arc::new(client_config)); - let mut endpoint = quinn::Endpoint::client("[::]:0".parse().unwrap())?; - endpoint.set_default_client_config(client_config); - Ok(endpoint) -} - -// derived from `quinn/examples/server.rs` -// makes a quinn server endpoint -#[allow(unused)] -pub fn make_server_endpoint( - key: rustls::pki_types::PrivateKeyDer<'static>, - cert: rustls::pki_types::CertificateDer<'static>, -) -> Result { - let mut server_crypto = rustls::ServerConfig::builder() - .with_no_client_auth() - .with_single_cert(vec![cert], key)?; - server_crypto.alpn_protocols = vec![EXAMPLE_ALPN.to_vec()]; - let server_config: QuicServerConfig = server_crypto.try_into()?; - let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(server_config)); - let transport_config = Arc::get_mut(&mut server_config.transport).unwrap(); - transport_config.max_concurrent_uni_streams(0_u8.into()); - - let endpoint = quinn::Endpoint::server(server_config, "[::1]:4433".parse()?)?; - Ok(endpoint) + .context("no direct addrs")?; + Ok(ep) } diff --git a/iroh-blobs/examples/fetch-fsm.rs b/iroh-blobs/examples/fetch-fsm.rs index 9ff063af55..51018b1311 100644 --- a/iroh-blobs/examples/fetch-fsm.rs +++ b/iroh-blobs/examples/fetch-fsm.rs @@ -1,12 +1,10 @@ //! An example how to download a single blob or collection from a node and write it to stdout using the `get` finite state machine directly. //! -//! Since this example does not use [`iroh-net::Endpoint`], it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses. -//! //! Run the provide-bytes example first. It will give instructions on how to run this example properly. -use std::net::SocketAddr; use anyhow::{Context, Result}; use iroh_io::ConcatenateSliceWriter; +use iroh_net::ticket::NodeTicket; use tracing_subscriber::{prelude::*, EnvFilter}; use iroh_blobs::{ @@ -17,7 +15,7 @@ use iroh_blobs::{ }; mod connect; -use connect::{load_certs, make_client_endpoint}; +use connect::{make_iroh_endpoint, EXAMPLE_ALPN}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info pub fn setup_logging() { @@ -34,10 +32,10 @@ async fn main() -> Result<()> { setup_logging(); let args: Vec<_> = std::env::args().collect(); if args.len() != 4 { - anyhow::bail!("usage: fetch-bytes [HASH] [SOCKET_ADDR] [FORMAT]"); + anyhow::bail!("usage: fetch-bytes HASH NODE_TICKET FORMAT"); } - let hash: Hash = args[1].parse().context("unable to parse [HASH]")?; - let addr: SocketAddr = args[2].parse().context("unable to parse [SOCKET_ADDR]")?; + let hash: Hash = args[1].parse().context("unable to parse HASH")?; + let ticket: NodeTicket = args[2].parse().context("unable to parse NODE_TICKET")?; let format = { if args[3] != "blob" && args[3] != "collection" { anyhow::bail!( @@ -48,17 +46,15 @@ async fn main() -> Result<()> { args[3].clone() }; - // load tls certificates - // This will error if you have not run the `provide-bytes` example - let roots = load_certs().await?; - // create an endpoint to listen for incoming connections - let endpoint = make_client_endpoint(roots)?; - println!("\nlistening on {}", endpoint.local_addr()?); - println!("fetching hash {hash} from {addr}"); + let endpoint = make_iroh_endpoint().await?; + println!("\nlistening as NodeId {}", endpoint.node_id()); + println!("fetching hash {hash} from {ticket}"); // connect - let connection = endpoint.connect(addr, "localhost")?.await?; + let connection = endpoint + .connect(ticket.node_addr().clone(), EXAMPLE_ALPN) + .await?; if format == "collection" { // create a request for a collection diff --git a/iroh-blobs/examples/fetch-stream.rs b/iroh-blobs/examples/fetch-stream.rs index ac064d8d8b..37d94ea71c 100644 --- a/iroh-blobs/examples/fetch-stream.rs +++ b/iroh-blobs/examples/fetch-stream.rs @@ -3,9 +3,10 @@ //! Since this example does not use [`iroh-net::Endpoint`], it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses. //! //! Run the provide-bytes example first. It will give instructions on how to run this example properly. -use std::net::SocketAddr; use anyhow::{Context, Result}; +use connect::EXAMPLE_ALPN; +use iroh_net::ticket::NodeTicket; use tracing_subscriber::{prelude::*, EnvFilter}; use std::io; @@ -25,7 +26,7 @@ use iroh_blobs::{ }; mod connect; -use connect::{load_certs, make_client_endpoint}; +use connect::make_iroh_endpoint; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info pub fn setup_logging() { @@ -42,10 +43,10 @@ async fn main() -> Result<()> { setup_logging(); let args: Vec<_> = std::env::args().collect(); if args.len() != 4 { - anyhow::bail!("usage: fetch-bytes [HASH] [SOCKET_ADDR] [FORMAT]"); + anyhow::bail!("usage: fetch-bytes HASH NODE_TICKET FORMAT"); } let hash: Hash = args[1].parse().context("unable to parse [HASH]")?; - let addr: SocketAddr = args[2].parse().context("unable to parse [SOCKET_ADDR]")?; + let ticket: NodeTicket = args[2].parse().context("unable to parse NODE_TICKET")?; let format = { if args[3] != "blob" && args[3] != "collection" { anyhow::bail!( @@ -56,17 +57,15 @@ async fn main() -> Result<()> { args[3].clone() }; - // load tls certificates - // This will error if you have not run the `provide-bytes` example - let roots = load_certs().await?; - // create an endpoint to listen for incoming connections - let endpoint = make_client_endpoint(roots)?; - println!("\nlistening on {}", endpoint.local_addr()?); - println!("fetching hash {hash} from {addr}"); + let endpoint = make_iroh_endpoint().await?; + println!("\nlistening as NodeId {}", endpoint.node_id()); + println!("fetching hash {hash} from {ticket}"); // connect - let connection = endpoint.connect(addr, "localhost")?.await?; + let connection = endpoint + .connect(ticket.node_addr().clone(), EXAMPLE_ALPN) + .await?; let mut stream = if format == "collection" { // create a request for a collection diff --git a/iroh-blobs/examples/provide-bytes.rs b/iroh-blobs/examples/provide-bytes.rs index f916dd467a..094b151f24 100644 --- a/iroh-blobs/examples/provide-bytes.rs +++ b/iroh-blobs/examples/provide-bytes.rs @@ -3,20 +3,22 @@ //! Since this example does not use [`iroh-net::Endpoint`], it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses. //! //! Run this example with -//! cargo run --example provide-bytes blob +//! cargo run --all-features --example provide-bytes blob //! To provide a blob (single file) //! //! Run this example with -//! cargo run --example provide-bytes collection +//! cargo run --all-features --example provide-bytes collection //! To provide a collection (multiple blobs) use anyhow::Result; +use iroh_net::ticket::NodeTicket; use tracing::warn; use tracing_subscriber::{prelude::*, EnvFilter}; use iroh_blobs::{format::collection::Collection, util::local_pool::LocalPool, Hash}; mod connect; -use connect::{make_and_write_certs, make_server_endpoint, CERT_PATH}; + +use connect::make_iroh_endpoint; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info pub fn setup_logging() { @@ -32,7 +34,7 @@ async fn main() -> Result<()> { let args: Vec<_> = std::env::args().collect(); if args.len() != 2 { anyhow::bail!( - "usage: provide-bytes [FORMAT], where [FORMAT] is either 'blob' or 'collection'\n\nThe 'blob' example demonstrates sending a single blob of bytes. The 'collection' example demonstrates sending multiple blobs of bytes, grouped together in a 'collection'." + "usage: provide-bytes FORMAT, where FORMAT is either 'blob' or 'collection'\n\nThe 'blob' example demonstrates sending a single blob of bytes. The 'collection' example demonstrates sending multiple blobs of bytes, grouped together in a 'collection'." ); } let format = { @@ -68,17 +70,14 @@ async fn main() -> Result<()> { (db, Hash::from(hash.as_bytes())) }; - // create tls certs and save to CERT_PATH - let (key, cert) = make_and_write_certs().await?; - // create an endpoint to listen for incoming connections - let endpoint = make_server_endpoint(key, cert)?; - let addr = endpoint.local_addr()?; - println!("\nlistening on {addr}"); + let endpoint = make_iroh_endpoint().await?; println!("providing hash {hash}"); + let node_ticket = NodeTicket::new(endpoint.node_addr().await?)?; + println!("Node ticket: {node_ticket}"); - println!("\nfetch the content using a finite state machine by running the following example:\n\ncargo run --example fetch-fsm {hash} \"{addr}\" {format}"); - println!("\nfetch the content using a stream by running the following example:\n\ncargo run --example fetch-stream {hash} \"{addr}\" {format}\n"); + println!("\nfetch the content using a finite state machine by running the following example:\n\ncargo run --all-features --example fetch-fsm {hash} \"{node_ticket}\" {format}"); + println!("\nfetch the content using a stream by running the following example:\n\ncargo run --all-features --example fetch-stream {hash} \"{node_ticket}\" {format}\n"); // create a new local pool handle with 1 worker thread let lp = LocalPool::single(); @@ -116,7 +115,6 @@ async fn main() -> Result<()> { match tokio::signal::ctrl_c().await { Ok(()) => { - tokio::fs::remove_dir_all(std::path::PathBuf::from(CERT_PATH)).await?; accept_task.abort(); Ok(()) } diff --git a/iroh-cli/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs index 706f300daf..b6a50a8034 100644 --- a/iroh-cli/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -792,7 +792,7 @@ async fn accept( match connecting.await { Ok(connection) => { if n == 0 { - let Ok(remote_peer_id) = endpoint::get_remote_node_id(&connection) else { + let Ok(remote_peer_id) = connection.remote_node_id() else { return; }; println!("Accepted connection from {}", remote_peer_id); diff --git a/iroh-docs/src/net.rs b/iroh-docs/src/net.rs index 5879fdf7e0..3e8418154d 100644 --- a/iroh-docs/src/net.rs +++ b/iroh-docs/src/net.rs @@ -5,7 +5,7 @@ use std::{ time::{Duration, Instant}, }; -use iroh_net::{endpoint::get_remote_node_id, key::PublicKey, Endpoint, NodeAddr}; +use iroh_net::{key::PublicKey, Endpoint, NodeAddr}; use serde::{Deserialize, Serialize}; use tracing::{debug, error_span, trace, Instrument}; @@ -116,7 +116,7 @@ where { let t_start = Instant::now(); let connection = connecting.await.map_err(AcceptError::connect)?; - let peer = get_remote_node_id(&connection).map_err(AcceptError::connect)?; + let peer = connection.remote_node_id().map_err(AcceptError::connect)?; let (mut send_stream, mut recv_stream) = connection .accept_bi() .await diff --git a/iroh-gossip/examples/chat.rs b/iroh-gossip/examples/chat.rs index 5d99ba10e6..c294bc8410 100644 --- a/iroh-gossip/examples/chat.rs +++ b/iroh-gossip/examples/chat.rs @@ -221,7 +221,7 @@ async fn handle_connection( ) -> anyhow::Result<()> { let alpn = conn.alpn().await?; let conn = conn.await?; - let peer_id = iroh_net::endpoint::get_remote_node_id(&conn)?; + let peer_id = conn.remote_node_id()?; match alpn.as_ref() { GOSSIP_ALPN => gossip.handle_connection(conn).await.context(format!( "connection to {peer_id} with ALPN {} failed", diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index 88009f8904..377d6450fa 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -11,7 +11,7 @@ use futures_util::TryFutureExt; use iroh_metrics::inc; use iroh_net::{ dialer::Dialer, - endpoint::{get_remote_node_id, Connection, DirectAddr}, + endpoint::{Connection, DirectAddr}, key::PublicKey, AddrInfo, Endpoint, NodeAddr, NodeId, }; @@ -147,7 +147,7 @@ impl Gossip { /// /// Make sure to check the ALPN protocol yourself before passing the connection. pub async fn handle_connection(&self, conn: Connection) -> anyhow::Result<()> { - let peer_id = get_remote_node_id(&conn)?; + let peer_id = conn.remote_node_id()?; self.send(ToActor::HandleConnection(peer_id, ConnOrigin::Accept, conn)) .await?; Ok(()) diff --git a/iroh-net/examples/dht_discovery.rs b/iroh-net/examples/dht_discovery.rs index 1b78d9bce1..aea3b5c52e 100644 --- a/iroh-net/examples/dht_discovery.rs +++ b/iroh-net/examples/dht_discovery.rs @@ -11,7 +11,7 @@ use std::str::FromStr; use clap::Parser; -use iroh_net::{endpoint::get_remote_node_id, Endpoint, NodeId}; +use iroh_net::{Endpoint, NodeId}; use tracing::warn; use url::Url; @@ -88,7 +88,7 @@ async fn chat_server(args: Args) -> anyhow::Result<()> { }; tokio::spawn(async move { let connection = connecting.await?; - let remote_node_id = get_remote_node_id(&connection)?; + let remote_node_id = connection.remote_node_id()?; println!("got connection from {}", remote_node_id); // just leave the tasks hanging. this is just an example. let (mut writer, mut reader) = connection.accept_bi().await?; diff --git a/iroh-net/examples/listen-unreliable.rs b/iroh-net/examples/listen-unreliable.rs index f54a5c8862..b013670d8a 100644 --- a/iroh-net/examples/listen-unreliable.rs +++ b/iroh-net/examples/listen-unreliable.rs @@ -74,7 +74,7 @@ async fn main() -> anyhow::Result<()> { }; let alpn = connecting.alpn().await?; let conn = connecting.await?; - let node_id = iroh_net::endpoint::get_remote_node_id(&conn)?; + let node_id = conn.remote_node_id()?; info!( "new (unreliable) connection from {node_id} with ALPN {} (coming from {})", String::from_utf8_lossy(&alpn), diff --git a/iroh-net/examples/listen.rs b/iroh-net/examples/listen.rs index af1a5a9949..94c63fa14b 100644 --- a/iroh-net/examples/listen.rs +++ b/iroh-net/examples/listen.rs @@ -78,7 +78,7 @@ async fn main() -> anyhow::Result<()> { }; let alpn = connecting.alpn().await?; let conn = connecting.await?; - let node_id = iroh_net::endpoint::get_remote_node_id(&conn)?; + let node_id = conn.remote_node_id()?; info!( "new connection from {node_id} with ALPN {} (coming from {})", String::from_utf8_lossy(&alpn), diff --git a/iroh-net/src/dialer.rs b/iroh-net/src/dialer.rs index 8c37b08c08..1cb3ba6dd5 100644 --- a/iroh-net/src/dialer.rs +++ b/iroh-net/src/dialer.rs @@ -8,6 +8,7 @@ use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::error; +use crate::endpoint::Connection; use crate::{Endpoint, NodeId}; /// Dials nodes and maintains a queue of pending dials. @@ -19,7 +20,7 @@ use crate::{Endpoint, NodeId}; #[derive(Debug)] pub struct Dialer { endpoint: Endpoint, - pending: JoinSet<(NodeId, anyhow::Result)>, + pending: JoinSet<(NodeId, anyhow::Result)>, pending_dials: HashMap, } @@ -70,7 +71,7 @@ impl Dialer { } /// Waits for the next dial operation to complete. - pub async fn next_conn(&mut self) -> (NodeId, anyhow::Result) { + pub async fn next_conn(&mut self) -> (NodeId, anyhow::Result) { match self.pending_dials.is_empty() { false => { let (node_id, res) = loop { @@ -107,7 +108,7 @@ impl Dialer { } impl Stream for Dialer { - type Item = (NodeId, anyhow::Result); + type Item = (NodeId, anyhow::Result); fn poll_next( mut self: Pin<&mut Self>, diff --git a/iroh-net/src/endpoint.rs b/iroh-net/src/endpoint.rs index 4607fefefa..582f108aa8 100644 --- a/iroh-net/src/endpoint.rs +++ b/iroh-net/src/endpoint.rs @@ -29,7 +29,7 @@ use url::Url; use crate::discovery::{Discovery, DiscoveryTask}; use crate::dns::{default_resolver, DnsResolver}; -use crate::key::{PublicKey, SecretKey}; +use crate::key::SecretKey; use crate::magicsock::{self, Handle, QuicMappedAddr}; use crate::relay::{RelayMode, RelayUrl}; use crate::{tls, NodeId}; @@ -39,8 +39,9 @@ mod rtt_actor; use self::rtt_actor::RttMessage; pub use quinn::{ - ApplicationClose, Connection, ConnectionClose, ConnectionError, ReadError, RecvStream, - RetryError, SendStream, ServerConfig, TransportConfig, VarInt, WriteError, + AcceptBi, AcceptUni, ApplicationClose, ConnectionClose, ConnectionError, ConnectionStats, + OpenBi, OpenUni, ReadDatagram, ReadError, RecvStream, RetryError, SendDatagramError, + SendStream, ServerConfig, TransportConfig, VarInt, WriteError, ZeroRttAccepted, }; pub use super::magicsock::{ @@ -73,7 +74,7 @@ pub struct Builder { secret_key: Option, relay_mode: RelayMode, alpn_protocols: Vec>, - transport_config: Option, + transport_config: Option, keylog: bool, discovery: Option>, proxy_url: Option, @@ -168,9 +169,11 @@ impl Builder { /// Sets a secret key to authenticate with other peers. /// /// This secret key's public key will be the [`PublicKey`] of this endpoint and thus - /// also its [`NodeId`] + /// also its [`NodeId`]. /// /// If not set, a new secret key will be generated. + /// + /// [`PublicKey`]: crate::key::PublicKey pub fn secret_key(mut self, secret_key: SecretKey) -> Self { self.secret_key = Some(secret_key); self @@ -231,7 +234,7 @@ impl Builder { // # Methods for more specialist customisation. - /// Sets a custom [`quinn::TransportConfig`] for this endpoint. + /// Sets a custom [`TransportConfig`] for this endpoint. /// /// The transport config contains parameters governing the QUIC state machine. /// @@ -239,7 +242,7 @@ impl Builder { /// internet applications. Applications protocols which forbid remotely-initiated /// streams should set `max_concurrent_bidi_streams` and `max_concurrent_uni_streams` to /// zero. - pub fn transport_config(mut self, transport_config: quinn::TransportConfig) -> Self { + pub fn transport_config(mut self, transport_config: TransportConfig) -> Self { self.transport_config = Some(transport_config); self } @@ -301,13 +304,13 @@ impl Builder { #[derive(Debug)] struct StaticConfig { secret_key: SecretKey, - transport_config: Arc, + transport_config: Arc, keylog: bool, } impl StaticConfig { - /// Create a [`quinn::ServerConfig`] with the specified ALPN protocols. - fn create_server_config(&self, alpn_protocols: Vec>) -> Result { + /// Create a [`ServerConfig`] with the specified ALPN protocols. + fn create_server_config(&self, alpn_protocols: Vec>) -> Result { let server_config = make_server_config( &self.secret_key, alpn_protocols, @@ -318,18 +321,18 @@ impl StaticConfig { } } -/// Creates a [`quinn::ServerConfig`] with the given secret key and limits. +/// Creates a [`ServerConfig`] with the given secret key and limits. // This return type can not longer be used anywhere in our public API. It is however still // used by iroh::node::Node (or rather iroh::node::Builder) to create a plain Quinn // endpoint. pub fn make_server_config( secret_key: &SecretKey, alpn_protocols: Vec>, - transport_config: Arc, + transport_config: Arc, keylog: bool, -) -> Result { +) -> Result { let quic_server_config = tls::make_server_config(secret_key, alpn_protocols, keylog)?; - let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_server_config)); + let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config)); server_config.transport_config(transport_config); Ok(server_config) @@ -451,7 +454,7 @@ impl Endpoint { /// endpoint must support this `alpn`, otherwise the connection attempt will fail with /// an error. #[instrument(skip_all, fields(me = %self.node_id().fmt_short(), remote = %node_addr.node_id.fmt_short(), alpn = ?String::from_utf8_lossy(alpn)))] - pub async fn connect(&self, node_addr: NodeAddr, alpn: &[u8]) -> Result { + pub async fn connect(&self, node_addr: NodeAddr, alpn: &[u8]) -> Result { // Connecting to ourselves is not supported. if node_addr.node_id == self.node_id() { bail!( @@ -502,11 +505,7 @@ impl Endpoint { /// information being provided by either the discovery service or using /// [`Endpoint::add_node_addr`]. See [`Endpoint::connect`] for the details of how it /// uses the discovery service to establish a connection to a remote node. - pub async fn connect_by_node_id( - &self, - node_id: NodeId, - alpn: &[u8], - ) -> Result { + pub async fn connect_by_node_id(&self, node_id: NodeId, alpn: &[u8]) -> Result { let addr = NodeAddr::new(node_id); self.connect(addr, alpn).await } @@ -520,7 +519,7 @@ impl Endpoint { node_id: NodeId, alpn: &[u8], addr: QuicMappedAddr, - ) -> Result { + ) -> Result { debug!("Attempting connection..."); let client_config = { let alpn_protocols = vec![alpn.to_vec()]; @@ -531,7 +530,7 @@ impl Endpoint { self.static_config.keylog, )?; let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config)); - let mut transport_config = quinn::TransportConfig::default(); + let mut transport_config = TransportConfig::default(); transport_config.keep_alive_interval(Some(Duration::from_secs(1))); client_config.transport_config(Arc::new(transport_config)); client_config @@ -556,7 +555,7 @@ impl Endpoint { warn!("rtt-actor not reachable: {err:#}"); } debug!("Connection established"); - Ok(connection) + Ok(Connection { inner: connection }) } /// Accepts an incoming connection on the endpoint. @@ -588,8 +587,8 @@ impl Endpoint { /// /// # Errors /// - /// Will return an error if we attempt to add our own [`PublicKey`] to the node map or if the - /// direct addresses are a subset of ours. + /// Will return an error if we attempt to add our own [`NodeId`] to the node map or if + /// the direct addresses are a subset of ours. pub fn add_node_addr(&self, node_addr: NodeAddr) -> Result<()> { self.add_node_addr_inner(node_addr, magicsock::Source::App) } @@ -604,8 +603,8 @@ impl Endpoint { /// /// # Errors /// - /// Will return an error if we attempt to add our own [`PublicKey`] to the node map or if the - /// direct addresses are a subset of ours. + /// Will return an error if we attempt to add our own [`NodeId`] to the node map or + /// if the direct addresses are a subset of ours. pub fn add_node_addr_with_source( &self, node_addr: NodeAddr, @@ -828,7 +827,7 @@ impl Endpoint { /// Closes the QUIC endpoint and the magic socket. /// /// This will close all open QUIC connections with the provided error_code and - /// reason. See [`quinn::Connection`] for details on how these are interpreted. + /// reason. See [`Connection`] for details on how these are interpreted. /// /// It will then wait for all connections to actually be shutdown, and afterwards /// close the magic socket. @@ -1062,7 +1061,7 @@ pub struct IncomingFuture { } impl Future for IncomingFuture { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { let this = self.project(); @@ -1071,7 +1070,7 @@ impl Future for IncomingFuture { Poll::Ready(Err(err)) => Poll::Ready(Err(err)), Poll::Ready(Ok(conn)) => { try_send_rtt_msg(&conn, this.ep); - Poll::Ready(Ok(conn)) + Poll::Ready(Ok(Connection { inner: conn })) } } } @@ -1088,18 +1087,18 @@ pub struct Connecting { impl Connecting { /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security. - pub fn into_0rtt(self) -> Result<(quinn::Connection, quinn::ZeroRttAccepted), Self> { + pub fn into_0rtt(self) -> Result<(Connection, ZeroRttAccepted), Self> { match self.inner.into_0rtt() { Ok((conn, zrtt_accepted)) => { try_send_rtt_msg(&conn, &self.ep); - Ok((conn, zrtt_accepted)) + Ok((Connection { inner: conn }, zrtt_accepted)) } Err(inner) => Err(Self { inner, ep: self.ep }), } } /// Parameters negotiated during the handshake - pub async fn handshake_data(&mut self) -> Result, quinn::ConnectionError> { + pub async fn handshake_data(&mut self) -> Result, ConnectionError> { self.inner.handshake_data().await } @@ -1129,7 +1128,7 @@ impl Connecting { } impl Future for Connecting { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { let this = self.project(); @@ -1138,15 +1137,345 @@ impl Future for Connecting { Poll::Ready(Err(err)) => Poll::Ready(Err(err)), Poll::Ready(Ok(conn)) => { try_send_rtt_msg(&conn, this.ep); - Poll::Ready(Ok(conn)) + Poll::Ready(Ok(Connection { inner: conn })) } } } } -/// Extract the [`PublicKey`] from the peer's TLS certificate. -// TODO: make this a method now -pub fn get_remote_node_id(connection: &quinn::Connection) -> Result { +/// A QUIC connection. +/// +/// If all references to a connection (including every clone of the Connection handle, +/// streams of incoming streams, and the various stream types) have been dropped, then the +/// connection will be automatically closed with an error_code of 0 and an empty reason. You +/// can also close the connection explicitly by calling Connection::close(). +/// +/// Closing the connection immediately abandons efforts to deliver data to the peer. Upon +/// receiving CONNECTION_CLOSE the peer may drop any stream data not yet delivered to the +/// application. Connection::close() describes in more detail how to gracefully close a +/// connection without losing application data. +/// +/// May be cloned to obtain another handle to the same connection. +#[derive(Debug, Clone)] +pub struct Connection { + inner: quinn::Connection, +} + +impl Connection { + /// Initiates a new outgoing unidirectional stream. + /// + /// Streams are cheap and instantaneous to open unless blocked by flow control. As a + /// consequence, the peer won’t be notified that a stream has been opened until the + /// stream is actually used. + #[inline] + pub fn open_uni(&self) -> OpenUni<'_> { + self.inner.open_uni() + } + + /// Initiates a new outgoing bidirectional stream. + /// + /// Streams are cheap and instantaneous to open unless blocked by flow control. As a + /// consequence, the peer won't be notified that a stream has been opened until the + /// stream is actually used. Calling [`open_bi`] then waiting on the [`RecvStream`] + /// without writing anything to [`SendStream`] will never succeed. + /// + /// [`open_bi`]: Connection::open_bi + #[inline] + pub fn open_bi(&self) -> OpenBi<'_> { + self.inner.open_bi() + } + + /// Accepts the next incoming uni-directional stream. + #[inline] + pub fn accept_uni(&self) -> AcceptUni<'_> { + self.inner.accept_uni() + } + + /// Accept the next incoming bidirectional stream. + /// + /// **Important Note**: The `Connection` that calls [`open_bi`] must write to its + /// [`SendStream`] before the peer `Connection` is able to accept the stream using + /// `accept_bi()`. Calling [`open_bi`] then waiting on the [`RecvStream`] without + /// writing anything to the connected [`SendStream`] will never succeed. + /// + /// [`open_bi`]: Connection::open_bi + #[inline] + pub fn accept_bi(&self) -> AcceptBi<'_> { + self.inner.accept_bi() + } + + /// Receives an application datagram. + #[inline] + pub fn read_datagram(&self) -> ReadDatagram<'_> { + self.inner.read_datagram() + } + + /// Wait for the connection to be closed for any reason. + /// + /// Despite the return type's name, closed connections are often not an error condition + /// at the application layer. Cases that might be routine include + /// [`ConnectionError::LocallyClosed`] and [`ConnectionError::ApplicationClosed`]. + #[inline] + pub async fn closed(&self) -> ConnectionError { + self.inner.closed().await + } + + /// If the connection is closed, the reason why. + /// + /// Returns `None` if the connection is still open. + #[inline] + pub fn close_reason(&self) -> Option { + self.inner.close_reason() + } + + /// Closes the connection immediately. + /// + /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No + /// more data is sent to the peer and the peer may drop buffered data upon receiving the + /// CONNECTION_CLOSE frame. + /// + /// `error_code` and `reason` are not interpreted, and are provided directly to the + /// peer. + /// + /// `reason` will be truncated to fit in a single packet with overhead; to improve odds + /// that it is preserved in full, it should be kept under 1KiB. + /// + /// # Gracefully closing a connection + /// + /// Only the peer last receiving application data can be certain that all data is + /// delivered. The only reliable action it can then take is to close the connection, + /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE + /// frame is very likely if both endpoints stay online long enough, calling + /// [`Endpoint::close()`] will wait to provide sufficient time. Otherwise, the + /// remote peer will time out the connection, provided that the idle timeout is not + /// disabled. + /// + /// The sending side can not guarantee all stream data is delivered to the remote + /// application. It only knows the data is delivered to the QUIC stack of the remote + /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling + /// [`close()`] the remote endpoint may drop any data it received but is as yet + /// undelivered to the application, including data that was acknowledged as received to + /// the local endpoint. + /// + /// [`Endpoint::close()`]: Endpoint::close + /// [`close()`]: Connection::close + #[inline] + pub fn close(&self, error_code: VarInt, reason: &[u8]) { + self.inner.close(error_code, reason) + } + + /// Transmits `data` as an unreliable, unordered application datagram. + /// + /// Application datagrams are a low-level primitive. They may be lost or delivered out + /// of order, and `data` must both fit inside a single QUIC packet and be smaller than + /// the maximum dictated by the peer. + #[inline] + pub fn send_datagram(&self, data: bytes::Bytes) -> Result<(), SendDatagramError> { + self.inner.send_datagram(data) + } + + // TODO: It seems `SendDatagram` is not yet exposed by quinn. This has been fixed + // upstream and will be in the next release. + // /// Transmits `data` as an unreliable, unordered application datagram + // /// + // /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion + // /// conditions, which effectively prioritizes old datagrams over new datagrams. + // /// + // /// See [`send_datagram()`] for details. + // /// + // /// [`send_datagram()`]: Connection::send_datagram + // #[inline] + // pub fn send_datagram_wait(&self, data: bytes::Bytes) -> SendDatagram<'_> { + // self.inner.send_datagram_wait(data) + // } + + /// Computes the maximum size of datagrams that may be passed to [`send_datagram`]. + /// + /// Returns `None` if datagrams are unsupported by the peer or disabled locally. + /// + /// This may change over the lifetime of a connection according to variation in the path + /// MTU estimate. The peer can also enforce an arbitrarily small fixed limit, but if the + /// peer's limit is large this is guaranteed to be a little over a kilobyte at minimum. + /// + /// Not necessarily the maximum size of received datagrams. + /// + /// [`send_datagram`]: Self::send_datagram + #[inline] + pub fn max_datagram_size(&self) -> Option { + self.inner.max_datagram_size() + } + + /// Bytes available in the outgoing datagram buffer. + /// + /// When greater than zero, calling [`send_datagram`] with a + /// datagram of at most this size is guaranteed not to cause older datagrams to be + /// dropped. + /// + /// [`send_datagram`]: Self::send_datagram + #[inline] + pub fn datagram_send_buffer_space(&self) -> usize { + self.inner.datagram_send_buffer_space() + } + + /// The peer's UDP address. + /// + /// If [`ServerConfig::migration`] is `true`, clients may change addresses at will, + /// e.g. when switching to a cellular internet connection. + #[inline] + pub fn remote_address(&self) -> SocketAddr { + self.inner.remote_address() + } + + /// The local IP address which was used when the peer established the connection. + /// + /// This can be different from the address the endpoint is bound to, in case the + /// endpoint is bound to a wildcard address like `0.0.0.0` or `::`. + /// + /// This will return `None` for clients, or when the platform does not expose this + /// information. See [`quinn::udp::RecvMeta::dst_ip`] for a list of supported + /// platforms. + #[inline] + pub fn local_ip(&self) -> Option { + self.inner.local_ip() + } + + /// Current best estimate of this connection's latency (round-trip-time). + #[inline] + pub fn rtt(&self) -> Duration { + self.inner.rtt() + } + + /// Returns connection statistics. + #[inline] + pub fn stats(&self) -> ConnectionStats { + self.inner.stats() + } + + /// Current state of the congestion control algorithm, for debugging purposes. + #[inline] + pub fn congestion_state(&self) -> Box { + self.inner.congestion_state() + } + + /// Parameters negotiated during the handshake. + /// + /// Guaranteed to return `Some` on fully established connections or after + /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for + /// details on the returned value. + /// + /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data + #[inline] + pub fn handshake_data(&self) -> Option> { + self.inner.handshake_data() + } + + /// Extracts the ALPN protocol from the peer's handshake data. + pub fn alpn(&self) -> Result> { + let data = self.handshake_data().context("handshake data missing")?; + match data.downcast::() { + Ok(data) => match data.protocol { + Some(protocol) => Ok(protocol), + None => bail!("no ALPN protocol available"), + }, + Err(_) => bail!("unknown handshake type"), + } + } + + /// Cryptographic identity of the peer. + /// + /// The dynamic type returned is determined by the configured [`Session`]. For the + /// default `rustls` session, the return value can be [`downcast`] to a + /// Vec<[rustls::pki_types::CertificateDer]> + /// + /// [`Session`]: quinn_proto::crypto::Session + /// [`downcast`]: Box::downcast + #[inline] + pub fn peer_identity(&self) -> Option> { + self.inner.peer_identity() + } + + /// Returns the [`NodeId`] from the peer's TLS certificate. + /// + /// The [`PublicKey`] of a node is also known as a [`NodeId`]. This [`PublicKey`] is + /// included in the TLS certificate presented during the handshake when connecting. + /// This function allows you to get the [`NodeId`] of the remote node of this + /// connection. + /// + /// [`PublicKey`]: crate::key::PublicKey + pub fn remote_node_id(&self) -> Result { + let data = self.peer_identity(); + match data { + None => bail!("no peer certificate found"), + Some(data) => match data.downcast::>() { + Ok(certs) => { + if certs.len() != 1 { + bail!( + "expected a single peer certificate, but {} found", + certs.len() + ); + } + let cert = tls::certificate::parse(&certs[0])?; + Ok(cert.peer_id()) + } + Err(_) => bail!("invalid peer certificate"), + }, + } + } + + /// A stable identifier for this connection. + /// + /// Peer addresses and connection IDs can change, but this value will remain fixed for + /// the lifetime of the connection. + #[inline] + pub fn stable_id(&self) -> usize { + self.inner.stable_id() + } + + /// Derives keying material from this connection's TLS session secrets. + /// + /// When both peers call this method with the same `label` and `context` + /// arguments and `output` buffers of equal length, they will get the + /// same sequence of bytes in `output`. These bytes are cryptographically + /// strong and pseudorandom, and are suitable for use as keying material. + /// + /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information. + #[inline] + pub fn export_keying_material( + &self, + output: &mut [u8], + label: &[u8], + context: &[u8], + ) -> Result<(), quinn_proto::crypto::ExportKeyingMaterialError> { + self.inner.export_keying_material(output, label, context) + } + + /// Modifies the number of unidirectional streams that may be concurrently opened. + /// + /// No streams may be opened by the peer unless fewer than `count` are already + /// open. Large `count`s increase both minimum and worst-case memory consumption. + #[inline] + pub fn set_max_concurrent_uni_streams(&self, count: VarInt) { + self.inner.set_max_concurrent_uni_streams(count) + } + + /// See [`quinn_proto::TransportConfig::receive_window`]. + #[inline] + pub fn set_receive_window(&self, receive_window: VarInt) { + self.inner.set_receive_window(receive_window) + } + + /// Modifies the number of bidirectional streams that may be concurrently opened. + /// + /// No streams may be opened by the peer unless fewer than `count` are already + /// open. Large `count`s increase both minimum and worst-case memory consumption. + #[inline] + pub fn set_max_concurrent_bi_streams(&self, count: VarInt) { + self.inner.set_max_concurrent_bi_streams(count) + } +} + +/// Extracts the [`NodeId`] from the peer's TLS certificate. +fn get_remote_node_id(connection: &quinn::Connection) -> Result { let data = connection.peer_identity(); match data { None => bail!("no peer certificate found"), @@ -1488,7 +1817,7 @@ mod tests { println!("[server] round {}", i + 1); let incoming = ep.accept().await.unwrap(); let conn = incoming.await.unwrap(); - let peer_id = get_remote_node_id(&conn).unwrap(); + let peer_id = conn.remote_node_id().unwrap(); info!(%i, peer = %peer_id.fmt_short(), "accepted connection"); let (mut send, mut recv) = conn.accept_bi().await.unwrap(); let mut buf = vec![0u8; chunk_size]; @@ -1604,7 +1933,7 @@ mod tests { let mut iconn = incoming.accept().unwrap(); let alpn = iconn.alpn().await.unwrap(); let conn = iconn.await.unwrap(); - let node_id = get_remote_node_id(&conn).unwrap(); + let node_id = conn.remote_node_id().unwrap(); assert_eq!(node_id, src); assert_eq!(alpn, TEST_ALPN); let (mut send, mut recv) = conn.accept_bi().await.unwrap(); @@ -1678,7 +2007,7 @@ mod tests { .await .unwrap(); - async fn handle_direct_conn(ep: &Endpoint, node_id: PublicKey) -> Result<()> { + async fn handle_direct_conn(ep: &Endpoint, node_id: NodeId) -> Result<()> { let mut stream = ep.conn_type_stream(node_id)?; let src = ep.node_id().fmt_short(); let dst = node_id.fmt_short(); @@ -1694,7 +2023,7 @@ mod tests { async fn accept(ep: &Endpoint) -> NodeId { let incoming = ep.accept().await.unwrap(); let conn = incoming.await.unwrap(); - let node_id = get_remote_node_id(&conn).unwrap(); + let node_id = conn.remote_node_id().unwrap(); tracing::info!(node_id=%node_id.fmt_short(), "accepted connection"); node_id } diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index c23a9c927b..7250d698c2 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -46,10 +46,7 @@ use futures_lite::future::Boxed as BoxedFuture; use iroh::{ blobs::Hash, client::blobs, - net::{ - endpoint::{get_remote_node_id, Connecting}, - Endpoint, NodeId, - }, + net::{endpoint::Connecting, Endpoint, NodeId}, node::ProtocolHandler, }; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -146,7 +143,7 @@ impl ProtocolHandler for BlobSearch { // Wait for the connection to be fully established. let connection = connecting.await?; // We can get the remote's node id from the connection. - let node_id = get_remote_node_id(&connection)?; + let node_id = connection.remote_node_id()?; println!("accepted connection from {node_id}"); // Our protocol is a simple request-response protocol, so we expect the diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index e2d655a556..322ec197aa 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -10,7 +10,10 @@ use bytes::Bytes; use futures_lite::FutureExt; use iroh::node::{Builder, DocsStorage}; use iroh_base::node_addr::AddrInfoOptions; -use iroh_net::{defaults::staging::default_relay_map, key::SecretKey, NodeAddr, NodeId}; +use iroh_net::defaults::staging::default_relay_map; +use iroh_net::endpoint::Connection; +use iroh_net::key::SecretKey; +use iroh_net::{NodeAddr, NodeId}; use rand::RngCore; use bao_tree::{blake3, ChunkNum, ChunkRanges}; @@ -27,7 +30,7 @@ use iroh_blobs::{ }; /// Create a new endpoint and dial a peer, returning the connection. -async fn dial(secret_key: SecretKey, peer: NodeAddr) -> anyhow::Result { +async fn dial(secret_key: SecretKey, peer: NodeAddr) -> anyhow::Result { let endpoint = iroh_net::Endpoint::builder() .secret_key(secret_key) .bind()