Skip to content

Commit

Permalink
feat: set more than one node on download requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando authored and ppodolsky committed Apr 1, 2024
1 parent 0ed7744 commit 2a5c09d
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 19 deletions.
2 changes: 1 addition & 1 deletion iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl BlobCommands {
.download(BlobDownloadRequest {
hash,
format,
peer: node_addr,
nodes: vec![node_addr],
tag,
mode,
})
Expand Down
4 changes: 2 additions & 2 deletions iroh/examples/collection-fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ async fn main() -> Result<()> {
// When interacting with the iroh API, you will most likely be using blobs and collections.
format: ticket.format(),

// The `peer` field is a `NodeAddr`, which combines all of the known address information we have for the remote node.
// The `nodes` field is a list of `NodeAddr`, where each combines all of the known address information we have for the remote node.
// This includes the `node_id` (or `PublicKey` of the node), any direct UDP addresses we know about for that node, as well as the relay url of that node. The relay url is the url of the relay server that that node is connected to.
// If the direct UDP addresses to that node do not work, than we can use the relay node to attempt to holepunch between your current node and the remote node.
// If holepunching fails, iroh will use the relay node to proxy a connection to the remote node over HTTPS.
// Thankfully, the ticket contains all of this information
peer: ticket.node_addr().clone(),
nodes: vec![ticket.node_addr().clone()],

// You can create a special tag name (`SetTagOption::Named`), or create an automatic tag that is derived from the timestamp.
tag: iroh::rpc_protocol::SetTagOption::Auto,
Expand Down
4 changes: 2 additions & 2 deletions iroh/examples/hello-world-fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ async fn main() -> Result<()> {
// When interacting with the iroh API, you will most likely be using blobs and collections.
format: ticket.format(),

// The `peer` field is a `NodeAddr`, which combines all of the known address information we have for the remote node.
// The `nodes` field is a list of `NodeAddr`, where each combines all of the known address information we have for the remote node.
// This includes the `node_id` (or `PublicKey` of the node), any direct UDP addresses we know about for that node, as well as the relay url of that node. The relay url is the url of the relay server that that node is connected to.
// If the direct UDP addresses to that node do not work, than we can use the relay node to attempt to holepunch between your current node and the remote node.
// If holepunching fails, iroh will use the relay node to proxy a connection to the remote node over HTTPS.
// Thankfully, the ticket contains all of this information
peer: ticket.node_addr().clone(),
nodes: vec![ticket.node_addr().clone()],

// You can create a special tag name (`SetTagOption::Named`), or create an automatic tag that is derived from the timestamp.
tag: iroh::rpc_protocol::SetTagOption::Auto,
Expand Down
66 changes: 57 additions & 9 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use anyhow::{anyhow, Result};
use anyhow::{anyhow, ensure, Result};
use futures::{FutureExt, Stream, StreamExt};
use genawaiter::sync::{Co, Gen};
use iroh_base::rpc::RpcResult;
Expand All @@ -23,7 +23,7 @@ use iroh_bytes::{
HashAndFormat,
};
use iroh_io::AsyncSliceReader;
use iroh_net::{MagicEndpoint, NodeAddr, NodeId};
use iroh_net::{MagicEndpoint, NodeAddr};
use quic_rpc::{
server::{RpcChannel, RpcServerError},
ServiceEndpoint,
Expand Down Expand Up @@ -1033,19 +1033,26 @@ where
let BlobDownloadRequest {
hash,
format,
peer: node,
nodes,
tag,
mode,
} = req;
let hash_and_format = HashAndFormat { hash, format };
let stats = match mode {
DownloadMode::Queued => {
let node_id = node.node_id;
endpoint.add_node_addr(node)?;
download_queued(downloader, hash_and_format, node_id, tag, progress.clone()).await?
download_queued(
endpoint,
downloader,
hash_and_format,
nodes,
tag,
progress.clone(),
)
.await?
}
DownloadMode::Direct => {
download_direct(db, endpoint, hash_and_format, node, tag, progress.clone()).await?
download_direct_from_nodes(db, endpoint, hash_and_format, nodes, tag, progress.clone())
.await?
}
};

Expand All @@ -1055,20 +1062,61 @@ where
}

async fn download_queued(
endpoint: MagicEndpoint,
downloader: &Downloader,
hash_and_format: HashAndFormat,
node_id: NodeId,
nodes: Vec<NodeAddr>,
tag: SetTagOption,
progress: FlumeProgressSender<DownloadProgress>,
) -> Result<Stats> {
let req = DownloadRequest::new(hash_and_format, vec![node_id])
let mut node_ids = Vec::with_capacity(nodes.len());
for node in nodes {
node_ids.push(node.node_id);
endpoint.add_node_addr(node)?;
}
let req = DownloadRequest::new(hash_and_format, node_ids)
.progress_sender(progress)
.tag(tag);
let handle = downloader.queue(req).await;
let stats = handle.await?;
Ok(stats)
}

async fn download_direct_from_nodes<D>(
db: &D,
endpoint: MagicEndpoint,
hash_and_format: HashAndFormat,
nodes: Vec<NodeAddr>,
tag: SetTagOption,
progress: FlumeProgressSender<DownloadProgress>,
) -> Result<Stats>
where
D: BaoStore,
{
ensure!(!nodes.is_empty(), "No nodes to download from provided.");
let mut last_err = None;
for node in nodes {
let node_id = node.node_id;
match download_direct(
db,
endpoint.clone(),
hash_and_format,
node,
tag.clone(),
progress.clone(),
)
.await
{
Ok(stats) => return Ok(stats),
Err(err) => {
debug!(?err, node = &node_id.fmt_short(), "Download failed");
last_err = Some(err)
}
}
}
Err(last_err.unwrap())
}

async fn download_direct<D>(
db: &D,
endpoint: MagicEndpoint,
Expand Down
19 changes: 14 additions & 5 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,13 @@ pub struct BlobDownloadRequest {
/// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as
/// well.
pub format: BlobFormat,
/// This mandatory field specifies the peer to download the data from.
pub peer: NodeAddr,
/// This mandatory field specifies the nodes to download the data from.
///
/// If set to more than a single node, they will all be tried. If `mode` is set to
/// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds.
/// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel,
/// if the concurrency limits permit.
pub nodes: Vec<NodeAddr>,
/// Optional tag to tag the data with.
pub tag: SetTagOption,
/// Whether to directly start the download or add it to the downlod queue.
Expand All @@ -107,10 +112,14 @@ pub struct BlobDownloadRequest {
/// Set the mode for whether to directly start the download or add it to the download queue.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DownloadMode {
/// Start the download right away. This also bypasses the downloader concurrency limits.
/// Start the download right away.
///
/// No concurrency limits or queuing will be applied. It is up to the user to manage download
/// concurrency.
Direct,
/// Queue the download. The download queue will be processed in-order, respecting the
/// downloader concurrency limit.
/// Queue the download.
///
/// The download queue will be processed in-order, while respecting the downloader concurrency limits.
Queued,
}

Expand Down

0 comments on commit 2a5c09d

Please sign in to comment.