Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unstable dynamic routing feature #599

Merged
merged 11 commits into from
Sep 23, 2024
Merged
630 changes: 404 additions & 226 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 17 additions & 1 deletion ic-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ keywords = ["internet-computer", "agent", "icp", "dfinity"]
include = ["src", "Cargo.toml", "../LICENSE", "README.md"]

[dependencies]
arc-swap = { version = "1.7", optional = true }
async-channel = { version = "1.9", optional = true }
async-lock = "3.3"
async-trait = { version = "0.1", optional = true }
async-watch = { version = "0.3", optional = true }
backoff = "0.4.0"
cached = { version = "0.52", features = ["ahash"], default-features = false }
candid = { workspace = true }
Expand All @@ -41,8 +45,10 @@ serde_cbor = { workspace = true }
serde_repr = { workspace = true }
sha2 = { workspace = true }
simple_asn1 = "0.6.1"
stop-token = { version = "0.7", optional = true }
thiserror = { workspace = true }
time = { workspace = true }
tracing = { version = "0.1", optional = true }
url = "2.1.0"

[dependencies.reqwest]
Expand All @@ -67,6 +73,7 @@ web-sys = { version = "0.3", features = ["Window"], optional = true }

[dev-dependencies]
serde_json.workspace = true
tracing-subscriber = "0.3"

[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
tokio = { workspace = true, features = ["full"] }
Expand Down Expand Up @@ -99,8 +106,17 @@ wasm-bindgen = [
"backoff/wasm-bindgen",
"cached/wasm",
]
_internal_dynamic-routing = [
"dep:arc-swap",
"dep:async-channel",
"dep:async-trait",
"dep:async-watch",
"dep:stop-token",
"tracing",
]
tracing = ["dep:tracing"] # Does very little right now.

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu", "wasm32-unknown-unknown"]
rustdoc-args = ["--cfg=docsrs"]
features = ["hyper"]
features = ["_internal_dynamic-routing"]
32 changes: 32 additions & 0 deletions ic-agent/src/agent/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,38 @@ impl AgentBuilder {
Agent::new(self.config)
}

#[cfg(all(feature = "reqwest", not(target_family = "wasm")))]
/// Set the dynamic transport layer for the [`Agent`], performing continuos discovery of the API boundary nodes and routing traffic via them based on the latencies.
pub async fn with_discovery_transport(self, client: reqwest::Client) -> Self {
use crate::agent::http_transport::{
dynamic_routing::{
dynamic_route_provider::{DynamicRouteProviderBuilder, IC0_SEED_DOMAIN},
node::Node,
snapshot::latency_based_routing::LatencyRoutingSnapshot,
},
route_provider::RouteProvider,
ReqwestTransport,
};

// TODO: This is a temporary solution to get the seed node.
let seed = Node::new(IC0_SEED_DOMAIN).unwrap();

let route_provider = DynamicRouteProviderBuilder::new(
LatencyRoutingSnapshot::new(),
vec![seed],
client.clone(),
)
.build()
.await;

let route_provider = Arc::new(route_provider) as Arc<dyn RouteProvider>;

let transport = ReqwestTransport::create_with_client_route(route_provider, client)
.expect("failed to create transport");

self.with_transport(transport)
}

/// Set the URL of the [Agent].
pub fn with_url<S: Into<String>>(self, url: S) -> Self {
self.with_route_provider(url.into().parse::<Url>().unwrap())
Expand Down
93 changes: 92 additions & 1 deletion ic-agent/src/agent/route_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use url::Url;

use crate::agent::AgentError;

#[cfg(feature = "_internal_dynamic-routing")]
pub mod dynamic_routing;

const IC0_DOMAIN: &str = "ic0.app";
const ICP0_DOMAIN: &str = "icp0.io";
const ICP_API_DOMAIN: &str = "icp-api.io";
Expand All @@ -18,8 +21,19 @@ const LOCALHOST_SUB_DOMAIN: &str = ".localhost";

/// A [`RouteProvider`] for dynamic generation of routing urls.
pub trait RouteProvider: std::fmt::Debug + Send + Sync {
/// Generate next routing url
/// Generates the next routing URL based on the internal routing logic.
///
/// This method returns a single `Url` that can be used for routing.
/// The logic behind determining the next URL can vary depending on the implementation
fn route(&self) -> Result<Url, AgentError>;

/// Generates up to `n` different routing URLs in order of priority.
///
/// This method returns a vector of `Url` instances, each representing a routing
/// endpoint. The URLs are ordered by priority, with the most preferred route
/// appearing first. The returned vector can contain fewer than `n` URLs if
/// fewer are available.
fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError>;
}

/// A simple implementation of the [`RouteProvider`] which produces an even distribution of the urls from the input ones.
Expand All @@ -41,6 +55,28 @@ impl RouteProvider for RoundRobinRouteProvider {
let prev_idx = self.current_idx.fetch_add(1, Ordering::Relaxed);
Ok(self.routes[prev_idx % self.routes.len()].clone())
}

fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
if n == 0 {
return Ok(Vec::new());
}

if n >= self.routes.len() {
return Ok(self.routes.clone());
}

let idx = self.current_idx.fetch_add(n, Ordering::Relaxed) % self.routes.len();
let mut urls = Vec::with_capacity(n);

if self.routes.len() - idx >= n {
urls.extend_from_slice(&self.routes[idx..idx + n]);
} else {
urls.extend_from_slice(&self.routes[idx..]);
urls.extend_from_slice(&self.routes[..n - urls.len()]);
}

Ok(urls)
}
}

impl RoundRobinRouteProvider {
Expand Down Expand Up @@ -77,6 +113,9 @@ impl RouteProvider for Url {
fn route(&self) -> Result<Url, AgentError> {
Ok(self.clone())
}
fn n_ordered_routes(&self, _: usize) -> Result<Vec<Url>, AgentError> {
Ok(vec![self.route()?])
}
}

#[cfg(test)]
Expand Down Expand Up @@ -108,4 +147,56 @@ mod tests {
.collect();
assert_eq!(expected_urls, urls);
}

#[test]
fn test_n_routes() {
// Test with an empty list of urls
let provider = RoundRobinRouteProvider::new(Vec::<&str>::new())
.expect("failed to create a route provider");
let urls_iter = provider.n_ordered_routes(1).expect("failed to get urls");
assert!(urls_iter.is_empty());
// Test with non-empty list of urls
let provider = RoundRobinRouteProvider::new(vec![
"https://url1.com",
"https://url2.com",
"https://url3.com",
"https://url4.com",
"https://url5.com",
])
.expect("failed to create a route provider");
// First call
let urls: Vec<_> = provider.n_ordered_routes(3).expect("failed to get urls");
let expected_urls: Vec<Url> = ["https://url1.com", "https://url2.com", "https://url3.com"]
.iter()
.map(|url_str| Url::parse(url_str).expect("invalid URL"))
.collect();
assert_eq!(urls, expected_urls);
// Second call
let urls: Vec<_> = provider.n_ordered_routes(3).expect("failed to get urls");
let expected_urls: Vec<Url> = ["https://url4.com", "https://url5.com", "https://url1.com"]
.iter()
.map(|url_str| Url::parse(url_str).expect("invalid URL"))
.collect();
assert_eq!(urls, expected_urls);
// Third call
let urls: Vec<_> = provider.n_ordered_routes(2).expect("failed to get urls");
let expected_urls: Vec<Url> = ["https://url2.com", "https://url3.com"]
.iter()
.map(|url_str| Url::parse(url_str).expect("invalid URL"))
.collect();
assert_eq!(urls, expected_urls);
// Fourth call
let urls: Vec<_> = provider.n_ordered_routes(5).expect("failed to get urls");
let expected_urls: Vec<Url> = [
"https://url1.com",
"https://url2.com",
"https://url3.com",
"https://url4.com",
"https://url5.com",
]
.iter()
.map(|url_str| Url::parse(url_str).expect("invalid URL"))
.collect();
assert_eq!(urls, expected_urls);
}
}
Loading