Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add experimental feature for synchronous update calls #572

Merged
merged 11 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ic-ref.yml
Copy link
Member Author

@DSharifi DSharifi Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the DFX version since CI would fail when it pulls the current latest stable,0.21, which uses a replica version that hasn't enabled the new endpoint for synchronous ingress messages.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Install dfx
uses: dfinity/setup-dfx@main
with:
dfx-version: "latest"
dfx-version: "0.22.0-beta.0"

- name: Cargo cache
uses: actions/cache@v4
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Changed `ic_utils::interfaces::management_canister::builders::InstallMode::Upgrade` variant to be `Option<CanisterUpgradeOptions>`:
* `CanisterUpgradeOptions` is a new struct which covers the new upgrade option: `wasm_memory_persistence: Option<WasmMemoryPersistence>`.
* `WasmMemoryPersistence` is a new enum which controls Wasm main memory retention on upgrades which has two variants: `Keep` and `Replace`.
* Added an experimental feature, `experimental_sync_call`, to enable synchronous update calls. The feature adds a toggle to the `ReqwestTransport` and `HyperTransport` to enable synchronous update calls.

## [0.36.0] - 2024-06-04

Expand Down
1 change: 1 addition & 0 deletions ic-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ web-sys = { version = "0.3", features = [

[features]
default = ["pem", "reqwest"]
experimental_sync_call = []
reqwest = ["dep:reqwest"]
hyper = [
"dep:hyper",
Expand Down
4 changes: 4 additions & 0 deletions ic-agent/src/agent/agent_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ pub enum AgentError {
/// Route provider failed to generate a url for some reason.
#[error("Route provider failed to generate url: {0}")]
RouteProviderError(String),

/// Invalid HTTP response.
#[error("Invalid HTTP response: {0}")]
InvalidHttpResponse(String),
}

impl PartialEq for AgentError {
Expand Down
68 changes: 56 additions & 12 deletions ic-agent/src/agent/agent_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,33 @@ use crate::{
use candid::{Encode, Nat};
use futures_util::FutureExt;
use ic_certification::{Delegation, Label};
use ic_transport_types::{NodeSignature, QueryResponse, RejectCode, RejectResponse, ReplyResponse};
use ic_transport_types::{
NodeSignature, QueryResponse, RejectCode, RejectResponse, ReplyResponse, TransportCallResponse,
};
use reqwest::Client;
use std::sync::Arc;
use std::{collections::BTreeMap, time::Duration};
use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration};
#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
use wasm_bindgen_test::wasm_bindgen_test;

use crate::agent::http_transport::route_provider::{RoundRobinRouteProvider, RouteProvider};
#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

fn make_transport(url: &str) -> ReqwestTransport {
let transport = ReqwestTransport::create(url).unwrap();
#[cfg(feature = "experimental_sync_call")]
{
transport.with_use_call_v3_endpoint()
}
#[cfg(not(feature = "experimental_sync_call"))]
{
transport
}
}

fn make_agent(url: &str) -> Agent {
Agent::builder()
.with_transport(ReqwestTransport::create(url).unwrap())
.with_transport(make_transport(url))
.with_verify_query_signatures(false)
.build()
.unwrap()
Expand Down Expand Up @@ -214,7 +227,20 @@ async fn query_rejected() -> Result<(), AgentError> {
#[cfg_attr(not(target_family = "wasm"), tokio::test)]
#[cfg_attr(target_family = "wasm", wasm_bindgen_test)]
async fn call_error() -> Result<(), AgentError> {
let (call_mock, url) = mock("POST", "/api/v2/canister/aaaaa-aa/call", 500, vec![], None).await;
let version = if cfg!(feature = "experimental_sync_call") {
"3"
} else {
"2"
};

let (call_mock, url) = mock(
"POST",
format!("/api/v{version}/canister/aaaaa-aa/call").as_str(),
500,
vec![],
None,
)
.await;

let agent = make_agent(&url);

Expand All @@ -234,17 +260,25 @@ async fn call_error() -> Result<(), AgentError> {
#[cfg_attr(not(target_family = "wasm"), tokio::test)]
#[cfg_attr(target_family = "wasm", wasm_bindgen_test)]
async fn call_rejected() -> Result<(), AgentError> {
let reject_body = RejectResponse {
let reject_response = RejectResponse {
reject_code: RejectCode::SysTransient,
reject_message: "Test reject message".to_string(),
error_code: Some("Test error code".to_string()),
};

let reject_body = TransportCallResponse::NonReplicatedRejection(reject_response.clone());

let body = serde_cbor::to_vec(&reject_body).unwrap();

let version = if cfg!(feature = "experimental_sync_call") {
"3"
} else {
"2"
};

let (call_mock, url) = mock(
"POST",
"/api/v2/canister/aaaaa-aa/call",
format!("/api/v{version}/canister/aaaaa-aa/call").as_str(),
200,
body,
Some("application/cbor"),
Expand All @@ -261,7 +295,7 @@ async fn call_rejected() -> Result<(), AgentError> {

assert_mock(call_mock).await;

let expected_response = Err(AgentError::UncertifiedReject(reject_body));
let expected_response = Err(AgentError::UncertifiedReject(reject_response));
assert_eq!(expected_response, result);

Ok(())
Expand All @@ -270,17 +304,27 @@ async fn call_rejected() -> Result<(), AgentError> {
#[cfg_attr(not(target_family = "wasm"), tokio::test)]
#[cfg_attr(target_family = "wasm", wasm_bindgen_test)]
async fn call_rejected_without_error_code() -> Result<(), AgentError> {
let reject_body = RejectResponse {
let non_replicated_reject = RejectResponse {
reject_code: RejectCode::SysTransient,
reject_message: "Test reject message".to_string(),
error_code: None,
};

let reject_body = TransportCallResponse::NonReplicatedRejection(non_replicated_reject.clone());

let canister_id_str = "aaaaa-aa";

let body = serde_cbor::to_vec(&reject_body).unwrap();

let version = if cfg!(feature = "experimental_sync_call") {
"3"
} else {
"2"
};

let (call_mock, url) = mock(
"POST",
"/api/v2/canister/aaaaa-aa/call",
format!("/api/v{version}/canister/{}/call", canister_id_str).as_str(),
200,
body,
Some("application/cbor"),
Expand All @@ -290,14 +334,14 @@ async fn call_rejected_without_error_code() -> Result<(), AgentError> {
let agent = make_agent(&url);

let result = agent
.update(&Principal::management_canister(), "greet")
.update(&Principal::from_str(canister_id_str).unwrap(), "greet")
.with_arg([])
.call()
.await;

assert_mock(call_mock).await;

let expected_response = Err(AgentError::UncertifiedReject(reject_body));
let expected_response = Err(AgentError::UncertifiedReject(non_replicated_reject));
assert_eq!(expected_response, result);

Ok(())
Expand Down
116 changes: 83 additions & 33 deletions ic-agent/src/agent/http_transport/hyper_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use hyper::{header::CONTENT_TYPE, Method, Request, Response};
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use hyper_util::client::legacy::{connect::HttpConnector, Client};
use hyper_util::rt::TokioExecutor;
use ic_transport_types::{RejectResponse, TransportCallResponse};
use tower::Service;

use crate::{
Expand All @@ -22,7 +23,7 @@ use crate::{
AgentFuture, Transport,
},
export::Principal,
AgentError, RequestId,
AgentError,
};

/// A [`Transport`] using [`hyper`] to make HTTP calls to the Internet Computer.
Expand All @@ -34,6 +35,7 @@ pub struct HyperTransport<B1, S = Client<HttpsConnector<HttpConnector>, B1>> {
#[allow(dead_code)]
max_tcp_error_retries: usize,
service: S,
use_call_v3_endpoint: bool,
}

/// Trait representing the contraints on [`HttpBody`] that [`HyperTransport`] requires
Expand All @@ -56,7 +58,7 @@ where
type BodyError = B::Error;
}

/// Trait representing the contraints on [`Service`] that [`HyperTransport`] requires.
/// Trait representing the constraints on [`Service`] that [`HyperTransport`] requires.
pub trait HyperService<B1: HyperBody>:
Send
+ Sync
Expand Down Expand Up @@ -124,6 +126,7 @@ where
service,
max_response_body_size: None,
max_tcp_error_retries: 0,
use_call_v3_endpoint: false,
})
}

Expand All @@ -143,12 +146,27 @@ where
}
}

/// Use call v3 endpoint for synchronous update calls.
/// __This is an experimental feature, and should not be used in production,
/// as the endpoint is not available yet on the mainnet IC.__
///
/// By enabling this feature, the agent will use the `v3` endpoint for update calls,
/// which is synchronous. This means the replica will wait for a certificate for the call,
/// meaning the agent will not need to poll for the certificate.
#[cfg(feature = "experimental_sync_call")]
pub fn with_use_call_v3_endpoint(self) -> Self {
Self {
use_call_v3_endpoint: true,
..self
}
}

async fn request(
&self,
method: Method,
endpoint: &str,
body: Option<Vec<u8>>,
) -> Result<Vec<u8>, AgentError> {
) -> Result<(StatusCode, Vec<u8>), AgentError> {
let body = body.unwrap_or_default();
fn map_error<E: Error + Send + Sync + 'static>(err: E) -> AgentError {
if any::TypeId::of::<E>() == any::TypeId::of::<AgentError>() {
Expand Down Expand Up @@ -253,7 +271,7 @@ where
content: body,
}))
} else {
Ok(body)
Ok((status, body))
}
}
}
Expand All @@ -267,12 +285,36 @@ where
&self,
effective_canister_id: Principal,
envelope: Vec<u8>,
_request_id: RequestId,
) -> AgentFuture<()> {
) -> AgentFuture<TransportCallResponse> {
Box::pin(async move {
let endpoint = &format!("canister/{effective_canister_id}/call");
self.request(Method::POST, endpoint, Some(envelope)).await?;
Ok(())
let api_version = if self.use_call_v3_endpoint {
"v3"
} else {
"v2"
};

let endpoint = format!(
"api/{}/canister/{}/call",
&api_version,
effective_canister_id.to_text()
);
let (status_code, response_body) = self
.request(Method::POST, &endpoint, Some(envelope))
.await?;

if status_code == StatusCode::ACCEPTED {
return Ok(TransportCallResponse::Accepted);
}

// status_code == OK (200)
if self.use_call_v3_endpoint {
serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
} else {
let reject_response = serde_cbor::from_slice::<RejectResponse>(&response_body)
.map_err(AgentError::InvalidCborData)?;

Err(AgentError::UncertifiedReject(reject_response))
}
})
}

Expand All @@ -282,29 +324,37 @@ where
envelope: Vec<u8>,
) -> AgentFuture<Vec<u8>> {
Box::pin(async move {
let endpoint = &format!("canister/{effective_canister_id}/read_state");
self.request(Method::POST, endpoint, Some(envelope)).await
let endpoint = format!("canister/{effective_canister_id}/read_state",);
self.request(Method::POST, &endpoint, Some(envelope))
.await
.map(|(_, body)| body)
})
}

fn read_subnet_state(&self, subnet_id: Principal, envelope: Vec<u8>) -> AgentFuture<Vec<u8>> {
Box::pin(async move {
let endpoint = &format!("subnet/{subnet_id}/read_state");
self.request(Method::POST, endpoint, Some(envelope)).await
let endpoint = format!("api/v2/subnet/{subnet_id}/read_state",);
self.request(Method::POST, &endpoint, Some(envelope))
.await
.map(|(_, body)| body)
})
}

fn query(&self, effective_canister_id: Principal, envelope: Vec<u8>) -> AgentFuture<Vec<u8>> {
Box::pin(async move {
let endpoint = &format!("canister/{effective_canister_id}/query");
self.request(Method::POST, endpoint, Some(envelope)).await
let endpoint = format!("api/v2/canister/{effective_canister_id}/query",);
self.request(Method::POST, &endpoint, Some(envelope))
.await
.map(|(_, body)| body)
})
}

fn status(&self) -> AgentFuture<Vec<u8>> {
Box::pin(async move {
let endpoint = "status".to_string();
self.request(Method::GET, &endpoint, None).await
let endpoint = "api/v2/status";
self.request(Method::GET, endpoint, None)
.await
.map(|(_, body)| body)
})
}
}
Expand Down Expand Up @@ -341,21 +391,21 @@ mod test {
);
}

test("https://ic0.app", "https://ic0.app/api/v2/");
test("https://IC0.app", "https://ic0.app/api/v2/");
test("https://foo.ic0.app", "https://ic0.app/api/v2/");
test("https://foo.IC0.app", "https://ic0.app/api/v2/");
test("https://foo.Ic0.app", "https://ic0.app/api/v2/");
test("https://foo.iC0.app", "https://ic0.app/api/v2/");
test("https://foo.bar.ic0.app", "https://ic0.app/api/v2/");
test("https://ic0.app/foo/", "https://ic0.app/foo/api/v2/");
test("https://foo.ic0.app/foo/", "https://ic0.app/foo/api/v2/");

test("https://ic1.app", "https://ic1.app/api/v2/");
test("https://foo.ic1.app", "https://foo.ic1.app/api/v2/");
test("https://ic0.app.ic1.app", "https://ic0.app.ic1.app/api/v2/");

test("https://fooic0.app", "https://fooic0.app/api/v2/");
test("https://fooic0.app.ic0.app", "https://ic0.app/api/v2/");
test("https://ic0.app", "https://ic0.app/");
test("https://IC0.app", "https://ic0.app/");
test("https://foo.ic0.app", "https://ic0.app/");
test("https://foo.IC0.app", "https://ic0.app/");
test("https://foo.Ic0.app", "https://ic0.app/");
test("https://foo.iC0.app", "https://ic0.app/");
test("https://foo.bar.ic0.app", "https://ic0.app/");
test("https://ic0.app/foo/", "https://ic0.app/foo/");
test("https://foo.ic0.app/foo/", "https://ic0.app/foo/");

test("https://ic1.app", "https://ic1.app/");
test("https://foo.ic1.app", "https://foo.ic1.app/");
test("https://ic0.app.ic1.app", "https://ic0.app.ic1.app/");

test("https://fooic0.app", "https://fooic0.app/");
test("https://fooic0.app.ic0.app", "https://ic0.app/");
}
}
Loading