From 2a5c09d6c356bd8e16a05f6045b38199f8a4effb Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 27 Mar 2024 15:57:54 +0100 Subject: [PATCH] feat: set more than one node on download requests --- iroh-cli/src/commands/blob.rs | 2 +- iroh/examples/collection-fetch.rs | 4 +- iroh/examples/hello-world-fetch.rs | 4 +- iroh/src/node/rpc.rs | 66 ++++++++++++++++++++++++++---- iroh/src/rpc_protocol.rs | 19 ++++++--- 5 files changed, 76 insertions(+), 19 deletions(-) diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index 2ee8e81b3f4..d326f8ffa0e 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -263,7 +263,7 @@ impl BlobCommands { .download(BlobDownloadRequest { hash, format, - peer: node_addr, + nodes: vec![node_addr], tag, mode, }) diff --git a/iroh/examples/collection-fetch.rs b/iroh/examples/collection-fetch.rs index bbf6cb19593..53fac68f52f 100644 --- a/iroh/examples/collection-fetch.rs +++ b/iroh/examples/collection-fetch.rs @@ -60,12 +60,12 @@ async fn main() -> Result<()> { // When interacting with the iroh API, you will most likely be using blobs and collections. format: ticket.format(), - // The `peer` field is a `NodeAddr`, which combines all of the known address information we have for the remote node. + // The `nodes` field is a list of `NodeAddr`, where each combines all of the known address information we have for the remote node. // This includes the `node_id` (or `PublicKey` of the node), any direct UDP addresses we know about for that node, as well as the relay url of that node. The relay url is the url of the relay server that that node is connected to. // If the direct UDP addresses to that node do not work, than we can use the relay node to attempt to holepunch between your current node and the remote node. // If holepunching fails, iroh will use the relay node to proxy a connection to the remote node over HTTPS. // Thankfully, the ticket contains all of this information - peer: ticket.node_addr().clone(), + nodes: vec![ticket.node_addr().clone()], // You can create a special tag name (`SetTagOption::Named`), or create an automatic tag that is derived from the timestamp. tag: iroh::rpc_protocol::SetTagOption::Auto, diff --git a/iroh/examples/hello-world-fetch.rs b/iroh/examples/hello-world-fetch.rs index 71f86c00ad5..fcc75cdbcaf 100644 --- a/iroh/examples/hello-world-fetch.rs +++ b/iroh/examples/hello-world-fetch.rs @@ -60,12 +60,12 @@ async fn main() -> Result<()> { // When interacting with the iroh API, you will most likely be using blobs and collections. format: ticket.format(), - // The `peer` field is a `NodeAddr`, which combines all of the known address information we have for the remote node. + // The `nodes` field is a list of `NodeAddr`, where each combines all of the known address information we have for the remote node. // This includes the `node_id` (or `PublicKey` of the node), any direct UDP addresses we know about for that node, as well as the relay url of that node. The relay url is the url of the relay server that that node is connected to. // If the direct UDP addresses to that node do not work, than we can use the relay node to attempt to holepunch between your current node and the remote node. // If holepunching fails, iroh will use the relay node to proxy a connection to the remote node over HTTPS. // Thankfully, the ticket contains all of this information - peer: ticket.node_addr().clone(), + nodes: vec![ticket.node_addr().clone()], // You can create a special tag name (`SetTagOption::Named`), or create an automatic tag that is derived from the timestamp. tag: iroh::rpc_protocol::SetTagOption::Auto, diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index cc53d2ded4f..23ba2fd55bc 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -3,7 +3,7 @@ use std::io; use std::sync::{Arc, Mutex}; use std::time::Duration; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, ensure, Result}; use futures::{FutureExt, Stream, StreamExt}; use genawaiter::sync::{Co, Gen}; use iroh_base::rpc::RpcResult; @@ -23,7 +23,7 @@ use iroh_bytes::{ HashAndFormat, }; use iroh_io::AsyncSliceReader; -use iroh_net::{MagicEndpoint, NodeAddr, NodeId}; +use iroh_net::{MagicEndpoint, NodeAddr}; use quic_rpc::{ server::{RpcChannel, RpcServerError}, ServiceEndpoint, @@ -1033,19 +1033,26 @@ where let BlobDownloadRequest { hash, format, - peer: node, + nodes, tag, mode, } = req; let hash_and_format = HashAndFormat { hash, format }; let stats = match mode { DownloadMode::Queued => { - let node_id = node.node_id; - endpoint.add_node_addr(node)?; - download_queued(downloader, hash_and_format, node_id, tag, progress.clone()).await? + download_queued( + endpoint, + downloader, + hash_and_format, + nodes, + tag, + progress.clone(), + ) + .await? } DownloadMode::Direct => { - download_direct(db, endpoint, hash_and_format, node, tag, progress.clone()).await? + download_direct_from_nodes(db, endpoint, hash_and_format, nodes, tag, progress.clone()) + .await? } }; @@ -1055,13 +1062,19 @@ where } async fn download_queued( + endpoint: MagicEndpoint, downloader: &Downloader, hash_and_format: HashAndFormat, - node_id: NodeId, + nodes: Vec, tag: SetTagOption, progress: FlumeProgressSender, ) -> Result { - let req = DownloadRequest::new(hash_and_format, vec![node_id]) + let mut node_ids = Vec::with_capacity(nodes.len()); + for node in nodes { + node_ids.push(node.node_id); + endpoint.add_node_addr(node)?; + } + let req = DownloadRequest::new(hash_and_format, node_ids) .progress_sender(progress) .tag(tag); let handle = downloader.queue(req).await; @@ -1069,6 +1082,41 @@ async fn download_queued( Ok(stats) } +async fn download_direct_from_nodes( + db: &D, + endpoint: MagicEndpoint, + hash_and_format: HashAndFormat, + nodes: Vec, + tag: SetTagOption, + progress: FlumeProgressSender, +) -> Result +where + D: BaoStore, +{ + ensure!(!nodes.is_empty(), "No nodes to download from provided."); + let mut last_err = None; + for node in nodes { + let node_id = node.node_id; + match download_direct( + db, + endpoint.clone(), + hash_and_format, + node, + tag.clone(), + progress.clone(), + ) + .await + { + Ok(stats) => return Ok(stats), + Err(err) => { + debug!(?err, node = &node_id.fmt_short(), "Download failed"); + last_err = Some(err) + } + } + } + Err(last_err.unwrap()) +} + async fn download_direct( db: &D, endpoint: MagicEndpoint, diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index a44a7efd6b0..af34dfbc1a6 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -96,8 +96,13 @@ pub struct BlobDownloadRequest { /// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as /// well. pub format: BlobFormat, - /// This mandatory field specifies the peer to download the data from. - pub peer: NodeAddr, + /// This mandatory field specifies the nodes to download the data from. + /// + /// If set to more than a single node, they will all be tried. If `mode` is set to + /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds. + /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel, + /// if the concurrency limits permit. + pub nodes: Vec, /// Optional tag to tag the data with. pub tag: SetTagOption, /// Whether to directly start the download or add it to the downlod queue. @@ -107,10 +112,14 @@ pub struct BlobDownloadRequest { /// Set the mode for whether to directly start the download or add it to the download queue. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum DownloadMode { - /// Start the download right away. This also bypasses the downloader concurrency limits. + /// Start the download right away. + /// + /// No concurrency limits or queuing will be applied. It is up to the user to manage download + /// concurrency. Direct, - /// Queue the download. The download queue will be processed in-order, respecting the - /// downloader concurrency limit. + /// Queue the download. + /// + /// The download queue will be processed in-order, while respecting the downloader concurrency limits. Queued, }