Skip to content

Commit

Permalink
feat: Add HTTP middleware for Transport replacement (#598)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamspofford-dfinity authored Sep 26, 2024
1 parent 603f1ff commit 8967a7f
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

* Added `AgentBuilder::with_arc_http_middleware` for `Transport`-like functionality at the level of HTTP requests.
* Add support for dynamic routing based on boundary node discovery. This is an internal feature for now, with a feature flag `_internal_dynamic-routing`.

## [0.38.1] - 2024-09-23
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions ic-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ include = ["src", "Cargo.toml", "../LICENSE", "README.md"]
arc-swap = { version = "1.7", optional = true }
async-channel = { version = "1.9", optional = true }
async-lock = "3.3"
async-trait = { version = "0.1", optional = true }
async-trait = "0.1"
async-watch = { version = "0.3", optional = true }
backoff = "0.4.0"
cached = { version = "0.52", features = ["ahash"], default-features = false }
Expand Down Expand Up @@ -48,6 +48,7 @@ simple_asn1 = "0.6.1"
stop-token = { version = "0.7", optional = true }
thiserror = { workspace = true }
time = { workspace = true }
tower-service = "0.3"
tracing = { version = "0.1", optional = true }
url = "2.1.0"

Expand Down Expand Up @@ -108,7 +109,6 @@ wasm-bindgen = [
_internal_dynamic-routing = [
"dep:arc-swap",
"dep:async-channel",
"dep:async-trait",
"dep:async-watch",
"dep:stop-token",
"tracing",
Expand Down
8 changes: 6 additions & 2 deletions ic-agent/src/agent/agent_config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use reqwest::Client;

use crate::{
agent::{NonceFactory, NonceGenerator},
identity::{anonymous::AnonymousIdentity, Identity},
};
use reqwest::Client;
use std::{sync::Arc, time::Duration};

use super::route_provider::RouteProvider;
use super::{route_provider::RouteProvider, HttpService};

/// A configuration for an agent.
#[non_exhaustive]
Expand All @@ -28,6 +29,8 @@ pub struct AgentConfig {
pub max_response_body_size: Option<usize>,
/// See [`with_max_tcp_error_retries`](super::AgentBuilder::with_max_tcp_error_retries).
pub max_tcp_error_retries: usize,
/// See [`with_arc_http_middleware`](super::AgentBuilder::with_arc_http_middleware).
pub http_service: Option<Arc<dyn HttpService>>,
}

impl Default for AgentConfig {
Expand All @@ -37,6 +40,7 @@ impl Default for AgentConfig {
identity: Arc::new(AnonymousIdentity {}),
ingress_expiry: None,
client: None,
http_service: None,
verify_query_signatures: true,
max_concurrent_requests: 50,
route_provider: None,
Expand Down
20 changes: 19 additions & 1 deletion ic-agent/src/agent/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
};
use std::sync::Arc;

use super::route_provider::RouteProvider;
use super::{route_provider::RouteProvider, HttpService};

/// A builder for an [`Agent`].
#[derive(Default)]
Expand Down Expand Up @@ -131,10 +131,28 @@ impl AgentBuilder {

/// Provide a pre-configured HTTP client to use. Use this to set e.g. HTTP timeouts or proxy configuration.
pub fn with_http_client(mut self, client: reqwest::Client) -> Self {
assert!(
self.config.http_service.is_none(),
"with_arc_http_middleware cannot be called with with_http_client"
);
self.config.client = Some(client);
self
}

/// Provide a custom `reqwest`-compatible HTTP service, e.g. to add per-request headers for custom boundary nodes.
/// Most users will not need this and should use `with_http_client`. Cannot be called with `with_http_client`.
///
/// The trait is automatically implemented for any `tower::Service` impl matching the one `reqwest::Client` uses,
/// including `reqwest-middleware`. This is a low-level interface, and direct implementations must provide all automatic retry logic.
pub fn with_arc_http_middleware(mut self, service: Arc<dyn HttpService>) -> Self {
assert!(
self.config.client.is_none(),
"with_arc_http_middleware cannot be called with with_http_client"
);
self.config.http_service = Some(service);
self
}

/// Retry up to the specified number of times upon encountering underlying TCP errors.
pub fn with_max_tcp_error_retries(mut self, retries: usize) -> Self {
self.config.max_tcp_error_retries = retries;
Expand Down
178 changes: 120 additions & 58 deletions ic-agent/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use agent_config::AgentConfig;
pub use agent_error::AgentError;
use agent_error::HttpErrorPayload;
use async_lock::Semaphore;
use async_trait::async_trait;
pub use builder::AgentBuilder;
use cached::{Cached, TimedCache};
use ed25519_consensus::{Error as Ed25519Error, Signature, VerificationKey};
Expand All @@ -27,9 +28,10 @@ pub use ic_transport_types::{
};
pub use nonce::{NonceFactory, NonceGenerator};
use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
use reqwest::{Body, Client, Request};
use reqwest::{Body, Client, Request, Response};
use route_provider::RouteProvider;
use time::OffsetDateTime;
use tower_service::Service;

#[cfg(test)]
mod agent_test;
Expand Down Expand Up @@ -148,7 +150,7 @@ pub struct Agent {
identity: Arc<dyn Identity>,
ingress_expiry: Duration,
root_key: Arc<RwLock<Vec<u8>>>,
client: Client,
client: Arc<dyn HttpService>,
route_provider: Arc<dyn RouteProvider>,
subnet_key_cache: Arc<Mutex<SubnetCache>>,
concurrent_requests_semaphore: Arc<Semaphore>,
Expand Down Expand Up @@ -180,19 +182,23 @@ impl Agent {
identity: config.identity,
ingress_expiry: config.ingress_expiry.unwrap_or(DEFAULT_INGRESS_EXPIRY),
root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
client: config.client.unwrap_or_else(|| {
#[cfg(not(target_family = "wasm"))]
{
Client::builder()
.use_rustls_tls()
.timeout(Duration::from_secs(360))
.build()
.expect("Could not create HTTP client.")
}
#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
{
Client::new()
}
client: config.http_service.unwrap_or_else(|| {
Arc::new(Retry429Logic {
client: config.client.unwrap_or_else(|| {
#[cfg(not(target_family = "wasm"))]
{
Client::builder()
.use_rustls_tls()
.timeout(Duration::from_secs(360))
.build()
.expect("Could not create HTTP client.")
}
#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
{
Client::new()
}
}),
})
}),
route_provider: config
.route_provider
Expand Down Expand Up @@ -1110,40 +1116,13 @@ impl Agent {
Ok(http_request)
};

// Dispatch request with a retry logic only for non-wasm builds.
let response = {
#[cfg(target_family = "wasm")]
{
let http_request = create_request_with_generated_url()?;
self.client.execute(http_request).await?
}
#[cfg(not(target_family = "wasm"))]
{
// RouteProvider generates urls dynamically. Some of these urls can be potentially unhealthy.
// TCP related errors (host unreachable, connection refused, connection timed out, connection reset) can be safely retried with a newly generated url.

let mut retry_count = 0;

loop {
let http_request = create_request_with_generated_url()?;

match self.client.execute(http_request).await {
Ok(response) => break response,
Err(err) => {
// Network-related errors can be retried.
if err.is_connect() {
if retry_count >= self.max_tcp_error_retries {
return Err(AgentError::TransportError(err));
}
retry_count += 1;
continue;
}
return Err(AgentError::TransportError(err));
}
}
}
}
};
let response = self
.client
.call(
&create_request_with_generated_url,
self.max_tcp_error_retries,
)
.await?;

let http_status = response.status();
let response_headers = response.headers().clone();
Expand Down Expand Up @@ -1184,15 +1163,10 @@ impl Agent {
endpoint: &str,
body: Option<Vec<u8>>,
) -> Result<(StatusCode, Vec<u8>), AgentError> {
let request_result = loop {
let result = self
.request(method.clone(), endpoint, body.as_ref().cloned())
.await?;
if result.0 != StatusCode::TOO_MANY_REQUESTS {
break result;
}
crate::util::sleep(Duration::from_millis(250)).await;
};
let request_result = self
.request(method.clone(), endpoint, body.as_ref().cloned())
.await?;

let status = request_result.0;
let headers = request_result.1;
let body = request_result.2;
Expand Down Expand Up @@ -1868,6 +1842,94 @@ impl<'agent> IntoFuture for UpdateBuilder<'agent> {
}
}

/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
pub trait HttpService: Send + Sync {
/// Perform a HTTP request. Any retry logic should call `req` again, instead of `Request::try_clone`.
async fn call<'a>(
&'a self,
req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
max_retries: usize,
) -> Result<Response, AgentError>;
}
#[cfg(not(target_family = "wasm"))]
#[async_trait]
impl<T> HttpService for T
where
for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
for<'a> <&'a Self as Service<Request>>::Future: Send,
T: Send + Sync + ?Sized,
{
#[allow(clippy::needless_arbitrary_self_type)]
async fn call<'a>(
mut self: &'a Self,
req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
max_retries: usize,
) -> Result<Response, AgentError> {
let mut retry_count = 0;
loop {
match Service::call(&mut self, req()?).await {
Err(err) => {
// Network-related errors can be retried.
if err.is_connect() {
if retry_count >= max_retries {
return Err(AgentError::TransportError(err));
}
retry_count += 1;
}
}
Ok(resp) => return Ok(resp),
}
}
}
}

#[cfg(target_family = "wasm")]
#[async_trait(?Send)]
impl<T> HttpService for T
where
for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
T: Send + Sync + ?Sized,
{
#[allow(clippy::needless_arbitrary_self_type)]
async fn call<'a>(
mut self: &'a Self,
req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
_: usize,
) -> Result<Response, AgentError> {
Ok(Service::call(&mut self, req()?).await?)
}
}

struct Retry429Logic {
client: Client,
}

#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
impl HttpService for Retry429Logic {
async fn call<'a>(
&'a self,
req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
_max_retries: usize,
) -> Result<Response, AgentError> {
loop {
#[cfg(not(target_family = "wasm"))]
let resp = self.client.call(req, _max_retries).await?;
// Client inconveniently does not implement Service on wasm
#[cfg(target_family = "wasm")]
let resp = self.client.execute(req()?).await?;
if resp.status() == StatusCode::TOO_MANY_REQUESTS {
crate::util::sleep(Duration::from_millis(250)).await;
continue;
} else {
break Ok(resp);
}
}
}
}

#[cfg(all(test, not(target_family = "wasm")))]
mod offline_tests {
use super::*;
Expand Down

0 comments on commit 8967a7f

Please sign in to comment.