diff --git a/CHANGELOG.md b/CHANGELOG.md index c350b3393..e997f52fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Next release +- refactor: use `hyper` & `tower` instead of `reqwest` for feeder client - fix(namespace): versioning now works for methods without `starknet` namesapce - fix(compile): wrong struct field being used in state map conversion - fix: contract 0 state diff fixed diff --git a/Cargo.lock b/Cargo.lock index 1f8b272e5..47434ceb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4003,16 +4003,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "forwarded-header-value" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8835f84f38484cc86f110a805655697908257fb9a7af005234060891557198e9" -dependencies = [ - "nonempty", - "thiserror", -] - [[package]] name = "fragile" version = "2.0.0" @@ -4541,9 +4531,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" dependencies = [ "bytes", "futures-channel", @@ -4552,6 +4542,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -4581,7 +4572,7 @@ checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "rustls 0.23.14", "rustls-pki-types", @@ -4598,7 +4589,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "native-tls", "tokio", @@ -4617,7 +4608,7 @@ dependencies = [ "futures-util", "http 1.1.0", "http-body 1.0.1", - "hyper 1.4.1", + "hyper 1.5.0", "pin-project-lite", "socket2", "tokio", @@ -4782,12 +4773,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "ip_network" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa2f047c0a98b2f299aa5d6d7088443570faae494e9ae1305e48be000c9e0eb1" - [[package]] name = "ipnet" version = "2.10.1" @@ -5228,11 +5213,9 @@ dependencies = [ "clap", "env_logger 0.11.5", "fdlimit", - "forwarded-header-value", "futures", "governor", "hyper 0.14.30", - "ip_network", "jsonrpsee", "log", "mc-block-import", @@ -5249,16 +5232,13 @@ dependencies = [ "mp-chain-config", "mp-convert", "mp-utils", - "primitive-types", "rand", "rayon", "reqwest 0.12.8", "serde", "serde_json", "serde_yaml", - "starknet-core 0.11.0", "starknet-providers", - "starknet-signers", "starknet_api", "thiserror", "tokio", @@ -5458,7 +5438,12 @@ dependencies = [ "anyhow", "bytes", "flate2", - "hyper 0.14.30", + "futures", + "http 1.1.0", + "http-body-util", + "hyper 1.5.0", + "hyper-tls", + "hyper-util", "log", "mc-db", "mc-rpc", @@ -5466,15 +5451,14 @@ dependencies = [ "mp-class", "mp-gateway", "mp-utils", - "reqwest 0.12.8", "rstest 0.18.2", "serde", "serde_json", "starknet-core 0.11.0", - "starknet-signers", "starknet-types-core", "thiserror", "tokio", + "tower 0.4.13", "url", ] @@ -5546,7 +5530,6 @@ dependencies = [ "mp-receipt", "mp-state-update", "mp-transactions", - "paste", "rstest 0.18.2", "serde_json", "starknet-core 0.11.0", @@ -5563,6 +5546,7 @@ dependencies = [ "anyhow", "futures", "httpmock", + "hyper 1.5.0", "log", "mc-block-import", "mc-db", @@ -5577,7 +5561,6 @@ dependencies = [ "mp-transactions", "mp-utils", "regex", - "reqwest 0.12.8", "rstest 0.18.2", "serde_json", "starknet-core 0.11.0", @@ -5848,7 +5831,6 @@ dependencies = [ "serde_yaml", "starknet-core 0.11.0", "starknet-crypto 0.7.2", - "starknet-types-core", "tokio", "url", ] @@ -5905,12 +5887,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "nonempty" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7" - [[package]] name = "nonzero_ext" version = "0.3.0" @@ -6841,7 +6817,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-rustls 0.27.3", "hyper-tls", "hyper-util", @@ -8650,6 +8626,8 @@ dependencies = [ "futures-util", "pin-project", "pin-project-lite", + "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 65d104539..38869da94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -161,7 +161,11 @@ jsonrpsee = { version = "0.22", default-features = false, features = [ tower = { version = "0.4", features = ["util"] } tower-http = { version = "0.4", features = ["cors"] } governor = "0.6" -hyper = { version = "0.14", features = ["server"] } +hyper = { version = "1.5.0", features = ["full"] } +hyper-tls = "0.6" +hyper-util = "0.1.9" +http = "1.1.0" +http-body-util = "0.1.2" ip_network = "0.4" lazy_static = { version = "1.4", default-features = false } once_cell = "1.19" diff --git a/crates/client/gateway/Cargo.toml b/crates/client/gateway/Cargo.toml index b53130c98..3b59144df 100644 --- a/crates/client/gateway/Cargo.toml +++ b/crates/client/gateway/Cargo.toml @@ -26,19 +26,23 @@ mp-utils.workspace = true # Starknet starknet-core.workspace = true -starknet-signers.workspace = true starknet-types-core.workspace = true # Other anyhow.workspace = true bytes.workspace = true -hyper.workspace = true +futures.workspace = true +http-body-util.workspace = true +http.workspace = true +hyper = { workspace = true, features = ["full"] } +hyper-tls.workspace = true +hyper-util.workspace = true log.workspace = true -reqwest.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true thiserror.workspace = true tokio.workspace = true +tower = { version = "0.4", features = ["timeout", "retry", "util", "limit"] } url.workspace = true [dev-dependencies] diff --git a/crates/client/gateway/src/client/builder.rs b/crates/client/gateway/src/client/builder.rs index 2eb792950..1a0c49984 100644 --- a/crates/client/gateway/src/client/builder.rs +++ b/crates/client/gateway/src/client/builder.rs @@ -1,26 +1,56 @@ -use reqwest::{ - header::{HeaderMap, HeaderName, HeaderValue}, - Client, -}; +use futures::FutureExt; +use http::StatusCode; +use hyper::body::Incoming; +use hyper::header::{HeaderMap, HeaderName, HeaderValue}; +use hyper::{Request, Response}; +use hyper_tls::HttpsConnector; +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::client::legacy::Client; +use hyper_util::rt::TokioExecutor; +use std::error::Error; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; +use tokio::sync::RwLock; +use tower::retry; +use tower::Service; +use tower::{retry::Retry, timeout::Timeout}; use url::Url; +type HttpsClient = Client, String>; +type TimeoutRetryClient = Retry>; +pub type PausedClient = PauseLayerMiddleware; #[derive(Debug, Clone)] pub struct FeederClient { - pub(crate) client: Client, + pub(crate) client: PausedClient, #[allow(dead_code)] pub(crate) gateway_url: Url, pub(crate) feeder_gateway_url: Url, pub(crate) headers: HeaderMap, + pause_until: Arc>>, // Locker for global pause in rate-limiting } impl FeederClient { pub fn new(gateway_url: Url, feeder_gateway_url: Url) -> Self { - Self { client: Client::new(), gateway_url, feeder_gateway_url, headers: HeaderMap::new() } + let pause_until = Arc::new(RwLock::new(None)); + let connector = HttpsConnector::new(); + let base_client = Client::builder(TokioExecutor::new()).build::<_, String>(connector); + + let timeout_layer = Timeout::new(base_client, Duration::from_secs(20)); // Timeout after 20 seconds + let retry_policy = RetryPolicy::new(5, Duration::from_secs(1), Arc::clone(&pause_until)); // Retry 5 times with 1 second backoff + let retry_layer = Retry::new(retry_policy, timeout_layer); + let client = PauseLayerMiddleware::new(retry_layer, Arc::clone(&pause_until)); + + Self { client, gateway_url, feeder_gateway_url, headers: HeaderMap::new(), pause_until } } pub fn new_with_headers(gateway_url: Url, feeder_gateway_url: Url, headers: &[(HeaderName, HeaderValue)]) -> Self { + let feeder_client = Self::new(gateway_url, feeder_gateway_url); let headers = headers.iter().cloned().collect(); - Self { client: Client::new(), gateway_url, feeder_gateway_url, headers } + + Self { headers, ..feeder_client } } pub fn add_header(&mut self, name: HeaderName, value: HeaderValue) { @@ -48,4 +78,175 @@ impl FeederClient { .expect("Failed to parse Starknet Alpha Sepolia feeder gateway url. This should not fail in prod."), ) } + + pub async fn request_with_pause_handling(&self, f: F) -> Result> + where + F: FnOnce() -> R, + { + let pause_until = self.pause_until.clone(); + + // Check if a pause is active + let pause_duration = { + let lock = pause_until.read().await; + if let Some(pause_instant) = *lock { + let now = Instant::now(); + if pause_instant > now { + Some(pause_instant - now) + } else { + *pause_until.write().await = None; + None + } + } else { + None + } + }; + + // Wait for the pause duration if needed + if let Some(duration) = pause_duration { + tokio::time::sleep(duration).await; + } + + Ok(f()) + } +} + +#[derive(Clone, Debug)] +pub struct RetryPolicy { + max_retries: usize, + backoff: Duration, + pause_until: Arc>>, +} + +impl RetryPolicy { + pub fn new(max_retries: usize, backoff: Duration, pause_until: Arc>>) -> Self { + RetryPolicy { max_retries, backoff, pause_until } + } +} + +impl retry::Policy, Response, Box> for RetryPolicy { + type Future = Pin + Send>>; + + fn retry( + &self, + _: &Request, + result: Result<&Response, &Box>, + ) -> Option { + let pause_until = self.pause_until.clone(); + + match result { + Ok(response) => { + if response.status() == StatusCode::TOO_MANY_REQUESTS { + let retry_after = get_retry_after(response).unwrap_or(Duration::from_secs(10)); // Default 10 seconds + + let next_policy = self.clone(); + let fut = async move { + if (*pause_until.read().await).is_none() { + log::info!("The fetching process has been rate limited, retrying"); + } + + let mut pause_lock = pause_until.write().await; + *pause_lock = Some(Instant::now() + retry_after); + + // wait for the retry_after duration + tokio::time::sleep(retry_after).await; + + next_policy + } + .boxed(); + Some(fut) + } else { + None + } + } + Err(_) if self.max_retries > 0 => { + // If the request failed, retry after backoff duration + let next_policy = RetryPolicy { + max_retries: self.max_retries - 1, + backoff: self.backoff, + pause_until: self.pause_until.clone(), + }; + let sleep = tokio::time::sleep(self.backoff); + let fut = async move { + sleep.await; + next_policy + } + .boxed(); + Some(fut) + } + _ => None, // No more retries + } + } + + fn clone_request(&self, req: &Request) -> Option> { + Some(req.clone()) + } +} + +fn get_retry_after(response: &Response) -> Option { + if let Some(retry_after_header) = response.headers().get("Retry-After") { + if let Ok(retry_after_str) = retry_after_header.to_str() { + if let Ok(retry_seconds) = retry_after_str.parse::() { + return Some(Duration::from_secs(retry_seconds)); + } + } + } + None +} + +#[derive(Clone, Debug)] +pub struct PauseLayerMiddleware { + inner: S, + pause_until: Arc>>, +} + +impl PauseLayerMiddleware { + pub fn new(inner: S, pause_until: Arc>>) -> Self { + PauseLayerMiddleware { inner, pause_until } + } +} + +impl Service> for PauseLayerMiddleware +where + S: Service, Response = Response, Error = Box> + + Clone + + Send + + 'static, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let pause_until = self.pause_until.clone(); + let mut inner = self.inner.clone(); + + async move { + // Check if a pause is active + let pause_duration = { + let lock = pause_until.read().await; + if let Some(pause_instant) = *lock { + let now = Instant::now(); + if pause_instant > now { + Some(pause_instant - now) + } else { + None + } + } else { + None + } + }; + + if let Some(duration) = pause_duration { + tokio::time::sleep(duration).await; + } + + inner.call(req).await + } + .boxed() + } } diff --git a/crates/client/gateway/src/client/methods.rs b/crates/client/gateway/src/client/methods.rs index a6440c039..5805a1367 100644 --- a/crates/client/gateway/src/client/methods.rs +++ b/crates/client/gateway/src/client/methods.rs @@ -1,7 +1,7 @@ use super::{builder::FeederClient, request_builder::RequestBuilder}; use crate::error::{SequencerError, StarknetError}; use mp_block::{BlockId, BlockTag}; -use mp_class::{CompressedLegacyContractClass, ContractClass, FlattenedSierraClass}; +use mp_class::{ContractClass, FlattenedSierraClass}; use mp_gateway::{ block::{ProviderBlock, ProviderBlockPending, ProviderBlockPendingMaybe, ProviderBlockSignature}, state_update::{ @@ -9,6 +9,7 @@ use mp_gateway::{ ProviderStateUpdateWithBlockPending, ProviderStateUpdateWithBlockPendingMaybe, }, }; +use serde_json::Value; use starknet_core::types::{contract::legacy::LegacyContractClass, Felt}; use std::{borrow::Cow, sync::Arc}; @@ -85,16 +86,17 @@ impl FeederClient { .with_block_id(block_id) .with_class_hash(class_hash); - match request.send_get::().await { - Ok(class_sierra) => Ok(ContractClass::Sierra(Arc::new(class_sierra))), - Err(SequencerError::DeserializeBody { serde_error: _, body }) => { - // if it failed with flattebed sierra, it might be a legacy class. - let class_legacy = serde_json::from_slice::(&body) - .map_err(|serde_error| SequencerError::DeserializeBody { serde_error, body })?; - let class_compressed: CompressedLegacyContractClass = class_legacy.compress()?.into(); - Ok(ContractClass::Legacy(Arc::new(class_compressed))) - } - Err(err) => Err(err), + let value = request.send_get::().await?; + + if value.get("sierra_program").is_some() { + let sierra: FlattenedSierraClass = serde_json::from_value(value)?; + Ok(ContractClass::Sierra(Arc::new(sierra))) + } else if value.get("program").is_some() { + let legacy: LegacyContractClass = serde_json::from_value(value)?; + Ok(ContractClass::Legacy(Arc::new(legacy.compress()?.into()))) + } else { + let err = serde::de::Error::custom("Unknown contract type".to_string()); + Err(SequencerError::DeserializeBody { serde_error: err }) } } } @@ -107,6 +109,7 @@ mod tests { Compression, }; use mp_block::BlockTag; + use mp_class::CompressedLegacyContractClass; use rstest::*; use serde::de::DeserializeOwned; use starknet_core::types::Felt; diff --git a/crates/client/gateway/src/client/request_builder.rs b/crates/client/gateway/src/client/request_builder.rs index 5f358c13c..cc0a7db58 100644 --- a/crates/client/gateway/src/client/request_builder.rs +++ b/crates/client/gateway/src/client/request_builder.rs @@ -1,26 +1,31 @@ use std::{borrow::Cow, collections::HashMap}; +use bytes::Buf; +use http::Method; +use http_body_util::BodyExt; +use hyper::body::Incoming; +use hyper::header::{HeaderName, HeaderValue, CONTENT_TYPE}; +use hyper::{HeaderMap, Request, Response, StatusCode, Uri}; use mp_block::{BlockId, BlockTag}; -use reqwest::{ - header::{HeaderMap, HeaderName, HeaderValue}, - Client, Response, -}; use serde::de::DeserializeOwned; use starknet_types_core::felt::Felt; +use tower::Service; use url::Url; use crate::error::{SequencerError, StarknetError}; -#[derive(Debug, Clone)] +use super::builder::PausedClient; + +#[derive(Debug)] pub struct RequestBuilder<'a> { - client: &'a Client, + client: &'a PausedClient, url: Url, params: HashMap, String>, headers: HeaderMap, } impl<'a> RequestBuilder<'a> { - pub fn new(client: &'a Client, base_url: Url, headers: HeaderMap) -> Self { + pub fn new(client: &'a PausedClient, base_url: Url, headers: HeaderMap) -> Self { Self { client, url: base_url, params: HashMap::new(), headers } } @@ -71,8 +76,18 @@ impl<'a> RequestBuilder<'a> { unpack(self.send_get_raw().await?).await } - pub async fn send_get_raw(self) -> Result { - self.client.get(self.url).headers(self.headers).query(&self.params).send().await.map_err(Into::into) + pub async fn send_get_raw(self) -> Result, SequencerError> { + let uri = self.build_uri()?; + + let mut req_builder = Request::builder().method(Method::GET).uri(uri); + + req_builder.headers_mut().expect("Failed to get mutable reference to request headers").extend(self.headers); + + let req = req_builder.body(String::new())?; + + let response: Response = + self.client.clone().call(req).await.map_err(SequencerError::HttpCallError)?; + Ok(response) } #[allow(dead_code)] @@ -80,35 +95,52 @@ impl<'a> RequestBuilder<'a> { where T: DeserializeOwned, { - let mut request = self.client.post(self.url); + let uri = self.build_uri()?; + + let mut req_builder = Request::builder().method(Method::POST).uri(uri); + + req_builder.headers_mut().expect("Failed to get mutable reference to request headers").extend(self.headers); - for (key, value) in self.headers.iter() { - request = request.header(key, value); + let body = serde_json::to_string(&self.params)?; + + let req = req_builder.header(CONTENT_TYPE, "application/json").body(body)?; + + let response = self.client.clone().call(req).await.map_err(SequencerError::HttpCallError)?; + unpack(response).await + } + + fn build_uri(&self) -> Result { + let mut url = self.url.clone(); + let query: String = + self.params.iter().map(|(key, value)| format!("{}={}", key, value)).collect::>().join("&"); + + if !query.is_empty() { + url.set_query(Some(&query)); } - let response = request.form(&self.params).send().await?; - Ok(response.json().await?) + let uri: Uri = url.as_str().try_into().map_err(|_| SequencerError::InvalidUrl(url))?; + Ok(uri) } } -async fn unpack(response: reqwest::Response) -> Result +async fn unpack(response: Response) -> Result where T: ::serde::de::DeserializeOwned, { let http_status = response.status(); - if http_status == reqwest::StatusCode::TOO_MANY_REQUESTS { + let whole_body = response.collect().await?.aggregate(); + + if http_status == StatusCode::TOO_MANY_REQUESTS { return Err(SequencerError::StarknetError(StarknetError::rate_limited())); } else if !http_status.is_success() { - let body = response.bytes().await?; - let starknet_error = serde_json::from_slice::(&body) - .map_err(|serde_error| SequencerError::InvalidStarknetError { http_status, serde_error, body })?; + let starknet_error = serde_json::from_reader::<_, StarknetError>(whole_body.reader()) + .map_err(|serde_error| SequencerError::InvalidStarknetError { http_status, serde_error })?; return Err(starknet_error.into()); } - let body = response.bytes().await?; - let res = - serde_json::from_slice(&body).map_err(|serde_error| SequencerError::DeserializeBody { serde_error, body })?; + let res = serde_json::from_reader(whole_body.reader()) + .map_err(|serde_error| SequencerError::DeserializeBody { serde_error })?; Ok(res) } diff --git a/crates/client/gateway/src/error.rs b/crates/client/gateway/src/error.rs index 6ea6f2230..06ccfbffd 100644 --- a/crates/client/gateway/src/error.rs +++ b/crates/client/gateway/src/error.rs @@ -1,5 +1,4 @@ -use bytes::Bytes; -use reqwest::StatusCode; +use hyper::StatusCode; use serde::{Deserialize, Serialize}; use starknet_core::types::Felt; use starknet_types_core::felt::FromStrError; @@ -8,14 +7,22 @@ use starknet_types_core::felt::FromStrError; pub enum SequencerError { #[error("Starknet error: {0:#}")] StarknetError(#[from] StarknetError), - #[error("Reqwest error: {0:#}")] - ReqwestError(#[from] reqwest::Error), + #[error("Hyper error: {0:#}")] + ReqwestError(#[from] hyper::Error), + #[error("Invalid URL: {0}")] + InvalidUrl(url::Url), + #[error("HTTP error: {0:#}")] + HttpError(#[from] hyper::http::Error), + #[error("Error calling HTTP client: {0:#}")] + HttpCallError(Box), #[error("Error deserializing response: {serde_error:#}")] - DeserializeBody { serde_error: serde_json::Error, body: Bytes }, + DeserializeBody { serde_error: serde_json::Error }, + #[error("Error serializing request: {0:#}")] + SerializeRequest(#[from] serde_json::Error), #[error("Error compressing class: {0:#}")] CompressError(#[from] starknet_core::types::contract::CompressProgramError), #[error("Failed to parse returned error with http status {http_status}: {serde_error:#}")] - InvalidStarknetError { http_status: StatusCode, serde_error: serde_json::Error, body: Bytes }, + InvalidStarknetError { http_status: StatusCode, serde_error: serde_json::Error }, } #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] diff --git a/crates/client/gateway/src/server/error.rs b/crates/client/gateway/src/server/error.rs index 73691c400..b22743f98 100644 --- a/crates/client/gateway/src/server/error.rs +++ b/crates/client/gateway/src/server/error.rs @@ -1,6 +1,6 @@ use std::fmt::{self, Display}; -use hyper::{Body, Response}; +use hyper::Response; use mc_db::MadaraStorageError; use crate::error::StarknetError; @@ -22,8 +22,8 @@ impl From for GatewayError { } } -impl From for Response { - fn from(e: GatewayError) -> Response { +impl From for Response { + fn from(e: GatewayError) -> Response { match e { GatewayError::StarknetError(e) => e.into(), GatewayError::InternalServerError(msg) => internal_error_response(&msg), diff --git a/crates/client/gateway/src/server/handler.rs b/crates/client/gateway/src/server/handler.rs index 03d138e41..25e13fbd6 100644 --- a/crates/client/gateway/src/server/handler.rs +++ b/crates/client/gateway/src/server/handler.rs @@ -1,6 +1,8 @@ use std::sync::Arc; -use hyper::{body, Body, Request, Response}; +use bytes::Buf; +use http_body_util::BodyExt; +use hyper::{body::Incoming, Request, Response}; use mc_db::MadaraBackend; use mc_rpc::providers::AddTransactionProvider; use mp_block::{BlockId, BlockTag, MadaraBlock, MadaraMaybePendingBlockInfo, MadaraPendingBlock}; @@ -26,7 +28,10 @@ use super::{ }, }; -pub async fn handle_get_block(req: Request, backend: Arc) -> Result, GatewayError> { +pub async fn handle_get_block( + req: Request, + backend: Arc, +) -> Result, GatewayError> { let params = get_params_from_request(&req); let block_id = block_id_from_params(¶ms).or_internal_server_error("Retrieving block id")?; @@ -80,9 +85,9 @@ pub async fn handle_get_block(req: Request, backend: Arc) - } pub async fn handle_get_signature( - req: Request, + req: Request, backend: Arc, -) -> Result, GatewayError> { +) -> Result, GatewayError> { let params = get_params_from_request(&req); let block_id = block_id_from_params(¶ms).or_internal_server_error("Retrieving block id")?; @@ -112,9 +117,9 @@ pub async fn handle_get_signature( } pub async fn handle_get_state_update( - req: Request, + req: Request, backend: Arc, -) -> Result, GatewayError> { +) -> Result, GatewayError> { let params = get_params_from_request(&req); let block_id = block_id_from_params(¶ms).or_internal_server_error("Retrieving block id")?; @@ -217,9 +222,9 @@ pub async fn handle_get_state_update( } pub async fn handle_get_class_by_hash( - req: Request, + req: Request, backend: Arc, -) -> Result, GatewayError> { +) -> Result, GatewayError> { let params = get_params_from_request(&req); let block_id = block_id_from_params(¶ms).unwrap_or(BlockId::Tag(BlockTag::Latest)); @@ -240,7 +245,7 @@ pub async fn handle_get_class_by_hash( .as_ref() .serialize_to_json() .or_internal_server_error("Failed to serialize legacy class")?; - create_response_with_json_body(hyper::StatusCode::OK, &class) + create_response_with_json_body(hyper::StatusCode::OK, class) } }; @@ -248,9 +253,9 @@ pub async fn handle_get_class_by_hash( } pub async fn handle_get_compiled_class_by_class_hash( - req: Request, + req: Request, backend: Arc, -) -> Result, GatewayError> { +) -> Result, GatewayError> { let params = get_params_from_request(&req); let block_id = block_id_from_params(¶ms).unwrap_or(BlockId::Tag(BlockTag::Latest)); @@ -274,10 +279,10 @@ pub async fn handle_get_compiled_class_by_class_hash( .or_internal_server_error(format!("Retrieving compiled Sierra class from class hash {class_hash:x}"))? .ok_or(StarknetError::class_not_found(class_hash))?; - Ok(create_response_with_json_body(hyper::StatusCode::OK, class_compiled.as_ref())) + Ok(create_response_with_json_body(hyper::StatusCode::OK, class_compiled.0)) } -pub async fn handle_get_contract_addresses(backend: Arc) -> Result, GatewayError> { +pub async fn handle_get_contract_addresses(backend: Arc) -> Result, GatewayError> { let chain_config = &backend.chain_config(); Ok(create_json_response( hyper::StatusCode::OK, @@ -288,18 +293,18 @@ pub async fn handle_get_contract_addresses(backend: Arc) -> Resul )) } -pub async fn handle_get_public_key(backend: Arc) -> Result, GatewayError> { +pub async fn handle_get_public_key(backend: Arc) -> Result, GatewayError> { let public_key = &backend.chain_config().private_key.public; Ok(create_string_response(hyper::StatusCode::OK, format!("\"{:#x}\"", public_key))) } pub async fn handle_add_transaction( - req: Request, + req: Request, add_transaction_provider: Arc, -) -> Result, GatewayError> { - let whole_body = body::to_bytes(req.into_body()).await.or_internal_server_error("Failed to read request body")?; +) -> Result, GatewayError> { + let whole_body = req.collect().await.or_internal_server_error("Failed to read request body")?.aggregate(); - let transaction = serde_json::from_slice::(whole_body.as_ref()) + let transaction = serde_json::from_reader::<_, BroadcastedTransaction>(whole_body.reader()) .map_err(|e| GatewayError::StarknetError(StarknetError::malformed_request(e)))?; let response = match transaction { @@ -314,7 +319,7 @@ pub async fn handle_add_transaction( async fn declare_transaction( tx: BroadcastedDeclareTransaction, add_transaction_provider: Arc, -) -> Response { +) -> Response { match add_transaction_provider.add_declare_transaction(tx).await { Ok(result) => create_json_response(hyper::StatusCode::OK, &result), Err(e) => create_json_response(hyper::StatusCode::OK, &e), @@ -324,7 +329,7 @@ async fn declare_transaction( async fn deploy_account_transaction( tx: BroadcastedDeployAccountTransaction, add_transaction_provider: Arc, -) -> Response { +) -> Response { match add_transaction_provider.add_deploy_account_transaction(tx).await { Ok(result) => create_json_response(hyper::StatusCode::OK, &result), Err(e) => create_json_response(hyper::StatusCode::OK, &e), @@ -334,7 +339,7 @@ async fn deploy_account_transaction( async fn invoke_transaction( tx: BroadcastedInvokeTransaction, add_transaction_provider: Arc, -) -> Response { +) -> Response { match add_transaction_provider.add_invoke_transaction(tx).await { Ok(result) => create_json_response(hyper::StatusCode::OK, &result), Err(e) => create_json_response(hyper::StatusCode::OK, &e), diff --git a/crates/client/gateway/src/server/helpers.rs b/crates/client/gateway/src/server/helpers.rs index 49725c49b..bc3ab86e8 100644 --- a/crates/client/gateway/src/server/helpers.rs +++ b/crates/client/gateway/src/server/helpers.rs @@ -1,37 +1,37 @@ use std::collections::HashMap; -use hyper::{header, Body, Request, Response, StatusCode}; +use hyper::{body::Incoming, header, Request, Response, StatusCode}; use mp_block::{BlockId, BlockTag}; use serde::Serialize; use starknet_types_core::felt::Felt; use crate::error::{StarknetError, StarknetErrorCode}; -pub(crate) fn service_unavailable_response(service_name: &str) -> Response { +pub(crate) fn service_unavailable_response(service_name: &str) -> Response { Response::builder() .status(StatusCode::SERVICE_UNAVAILABLE) - .body(Body::from(format!("{} Service disabled", service_name))) + .body(format!("{} Service disabled", service_name)) .expect("Failed to build SERVICE_UNAVAILABLE response with a valid status and body") } -pub(crate) fn not_found_response() -> Response { +pub(crate) fn not_found_response() -> Response { Response::builder() .status(StatusCode::NOT_FOUND) - .body(Body::from("Not Found")) + .body("Not Found".to_string()) .expect("Failed to build NOT_FOUND response with a valid status and body") } -pub(crate) fn internal_error_response(msg: &str) -> Response { +pub(crate) fn internal_error_response(msg: &str) -> Response { Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from(format!("Internal Server Error: {msg}"))) + .body(format!("Internal Server Error: {msg}")) .expect("Failed to build INTERNAL_SERVER_ERROR response with a valid status and body") } /// Creates a JSON response with the given status code and a body that can be serialized to JSON. /// /// If the serialization fails, this function returns a 500 Internal Server Error response. -pub(crate) fn create_json_response(status: StatusCode, body: &T) -> Response +pub(crate) fn create_json_response(status: StatusCode, body: &T) -> Response where T: Serialize, { @@ -45,7 +45,7 @@ where }; // Build the response with the specified status code and serialized body - match Response::builder().status(status).header(header::CONTENT_TYPE, "application/json").body(Body::from(body)) { + match Response::builder().status(status).header(header::CONTENT_TYPE, "application/json").body(body) { Ok(response) => response, Err(e) => { log::error!("Failed to build response: {}", e); @@ -57,8 +57,7 @@ where /// Creates a JSON response with the given status code and a body that can be serialized to JSON. /// /// If the serialization fails, this function returns a 500 Internal Server Error response. -pub(crate) fn create_string_response(status: StatusCode, body: String) -> Response { - let body = Body::from(body); +pub(crate) fn create_string_response(status: StatusCode, body: String) -> Response { // Build the response with the specified status code and serialized body match Response::builder().status(status).body(body) { Ok(response) => response, @@ -70,13 +69,9 @@ pub(crate) fn create_string_response(status: StatusCode, body: String) -> Respon } /// Creates a JSON response with the given status code and a body that is already serialized to a string. -pub(crate) fn create_response_with_json_body(status: StatusCode, body: &str) -> Response { +pub(crate) fn create_response_with_json_body(status: StatusCode, body: String) -> Response { // Build the response with the specified status code and serialized body - match Response::builder() - .status(status) - .header(header::CONTENT_TYPE, "application/json") - .body(Body::from(body.to_string())) - { + match Response::builder().status(status).header(header::CONTENT_TYPE, "application/json").body(body) { Ok(response) => response, Err(e) => { log::error!("Failed to build response: {}", e); @@ -85,7 +80,7 @@ pub(crate) fn create_response_with_json_body(status: StatusCode, body: &str) -> } } -pub(crate) fn get_params_from_request(req: &Request) -> HashMap { +pub(crate) fn get_params_from_request(req: &Request) -> HashMap { let query = req.uri().query().unwrap_or(""); let params = query.split('&'); let mut query_params = HashMap::new(); @@ -123,7 +118,7 @@ pub(crate) fn include_block_params(params: &HashMap) -> bool { params.get("includeBlock").map_or(false, |v| v == "true") } -impl From for hyper::Response { +impl From for Response { fn from(error: StarknetError) -> Self { create_json_response(hyper::StatusCode::BAD_REQUEST, &error) } diff --git a/crates/client/gateway/src/server/router.rs b/crates/client/gateway/src/server/router.rs index eaa79e02d..782aa032f 100644 --- a/crates/client/gateway/src/server/router.rs +++ b/crates/client/gateway/src/server/router.rs @@ -1,6 +1,6 @@ use std::{convert::Infallible, sync::Arc}; -use hyper::{Body, Method, Request, Response}; +use hyper::{body::Incoming, Method, Request, Response}; use mc_db::MadaraBackend; use mc_rpc::providers::AddTransactionProvider; @@ -12,15 +12,15 @@ use super::helpers::{not_found_response, service_unavailable_response}; // Main router to redirect to the appropriate sub-router pub(crate) async fn main_router( - req: Request, + req: Request, backend: Arc, add_transaction_provider: Arc, feeder_gateway_enable: bool, gateway_enable: bool, -) -> Result, Infallible> { +) -> Result, Infallible> { let path = req.uri().path().split('/').filter(|segment| !segment.is_empty()).collect::>().join("/"); match (path.as_ref(), feeder_gateway_enable, gateway_enable) { - ("health", _, _) => Ok(Response::new(Body::from("OK"))), + ("health", _, _) => Ok(Response::new("OK".to_string())), (path, true, _) if path.starts_with("feeder_gateway/") => feeder_gateway_router(req, path, backend).await, (path, _, true) if path.starts_with("gateway/") => gateway_router(req, path, add_transaction_provider).await, (path, false, _) if path.starts_with("feeder_gateway/") => Ok(service_unavailable_response("Feeder Gateway")), @@ -34,10 +34,10 @@ pub(crate) async fn main_router( // Router for requests related to feeder_gateway async fn feeder_gateway_router( - req: Request, + req: Request, path: &str, backend: Arc, -) -> Result, Infallible> { +) -> Result, Infallible> { match (req.method(), path) { (&Method::GET, "feeder_gateway/get_block") => { Ok(handle_get_block(req, backend).await.unwrap_or_else(Into::into)) @@ -69,10 +69,10 @@ async fn feeder_gateway_router( // Router for requests related to feeder async fn gateway_router( - req: Request, + req: Request, path: &str, add_transaction_provider: Arc, -) -> Result, Infallible> { +) -> Result, Infallible> { match (req.method(), req.uri().path()) { (&Method::POST, "gateway/add_transaction") => { Ok(handle_add_transaction(req, add_transaction_provider).await.unwrap_or_else(Into::into)) diff --git a/crates/client/gateway/src/server/service.rs b/crates/client/gateway/src/server/service.rs index 52468cbd2..f816b74c1 100644 --- a/crates/client/gateway/src/server/service.rs +++ b/crates/client/gateway/src/server/service.rs @@ -1,18 +1,15 @@ use std::{ - convert::Infallible, net::{Ipv4Addr, SocketAddr}, sync::Arc, }; use anyhow::Context; -use hyper::{ - service::{make_service_fn, service_fn}, - Server, -}; +use hyper::{server::conn::http1, service::service_fn}; +use hyper_util::rt::TokioIo; use mc_db::MadaraBackend; use mc_rpc::providers::AddTransactionProvider; use mp_utils::graceful_shutdown; -use tokio::net::TcpListener; +use tokio::{net::TcpListener, sync::Notify}; use super::router::main_router; @@ -34,33 +31,50 @@ pub async fn start_server( Ipv4Addr::LOCALHOST }; let addr = SocketAddr::new(listen_addr.into(), gateway_port); + let listener = TcpListener::bind(addr).await.with_context(|| format!("Opening socket server at {addr}"))?; - let socket = TcpListener::bind(addr).await.with_context(|| format!("Opening socket server at {addr}"))?; + log::info!("🌐 Gateway endpoint started at {}", addr); - let listener = hyper::server::conn::AddrIncoming::from_listener(socket) - .with_context(|| format!("Opening socket server at {addr}"))?; + let shutdown_notify = Arc::new(Notify::new()); - let make_service = make_service_fn(move |_| { - let db_backend = Arc::clone(&db_backend); - let add_transaction_provider = Arc::clone(&add_transaction_provider); - async move { - Ok::<_, Infallible>(service_fn(move |req| { - main_router( - req, - Arc::clone(&db_backend), - Arc::clone(&add_transaction_provider), - feeder_gateway_enable, - gateway_enable, - ) - })) - } - }); + { + let shutdown_notify = Arc::clone(&shutdown_notify); + tokio::spawn(async move { + graceful_shutdown().await; + shutdown_notify.notify_waiters(); + }); + } - log::info!("🌐 Gateway endpoint started at {}", listener.local_addr()); + loop { + tokio::select! { + // Handle new incoming connections + Ok((stream, _)) = listener.accept() => { + let io = TokioIo::new(stream); - let server = Server::builder(listener).serve(make_service).with_graceful_shutdown(graceful_shutdown()); + let db_backend = Arc::clone(&db_backend); + let add_transaction_provider = Arc::clone(&add_transaction_provider); - server.await.context("gateway server")?; + tokio::task::spawn(async move { + let service = service_fn(move |req| { + main_router( + req, + Arc::clone(&db_backend), + Arc::clone(&add_transaction_provider), + feeder_gateway_enable, + gateway_enable, + ) + }); - Ok(()) + if let Err(err) = http1::Builder::new().serve_connection(io, service).await { + log::error!("Error serving connection: {:?}", err); + } + }); + }, + + // Await the shutdown signal + _ = shutdown_notify.notified() => { + break Ok(()); + } + } + } } diff --git a/crates/client/metrics/Cargo.toml b/crates/client/metrics/Cargo.toml index 0c3f0cc86..065346d40 100644 --- a/crates/client/metrics/Cargo.toml +++ b/crates/client/metrics/Cargo.toml @@ -22,7 +22,7 @@ mp-utils.workspace = true # Other anyhow.workspace = true async-trait.workspace = true -hyper.workspace = true +hyper = "0.14" log.workspace = true prometheus.workspace = true thiserror.workspace = true diff --git a/crates/client/metrics/src/lib.rs b/crates/client/metrics/src/lib.rs index ce6409f60..c1a6cd036 100644 --- a/crates/client/metrics/src/lib.rs +++ b/crates/client/metrics/src/lib.rs @@ -2,8 +2,10 @@ use std::net::{Ipv4Addr, SocketAddr}; use anyhow::Context; use hyper::{ + body::Body, + server::Server, service::{make_service_fn, service_fn}, - Body, Request, Response, Server, StatusCode, + Request, Response, StatusCode, }; use mp_utils::{service::Service, wait_or_graceful_shutdown, StopHandle}; use prometheus::{core::Collector, Encoder, TextEncoder}; diff --git a/crates/client/rpc/Cargo.toml b/crates/client/rpc/Cargo.toml index 473aa5add..cc28870a7 100644 --- a/crates/client/rpc/Cargo.toml +++ b/crates/client/rpc/Cargo.toml @@ -49,6 +49,5 @@ jsonrpsee = { workspace = true, default-features = true, features = [ "server", ] } log = { workspace = true, default-features = true } -paste = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/crates/client/sync/Cargo.toml b/crates/client/sync/Cargo.toml index e4b417838..77dd9aaa3 100644 --- a/crates/client/sync/Cargo.toml +++ b/crates/client/sync/Cargo.toml @@ -40,8 +40,8 @@ starknet_api.workspace = true # Other anyhow.workspace = true futures = { workspace = true, default-features = true } +hyper.workspace = true log.workspace = true -reqwest.workspace = true serde_json.workspace = true thiserror.workspace = true tokio = { workspace = true, features = [ diff --git a/crates/client/sync/src/fetch/fetchers.rs b/crates/client/sync/src/fetch/fetchers.rs index 124ebc28e..3b4d67e05 100644 --- a/crates/client/sync/src/fetch/fetchers.rs +++ b/crates/client/sync/src/fetch/fetchers.rs @@ -98,7 +98,7 @@ pub async fn fetch_pending_block_and_updates( Ok(block) => Ok(Some(block)), // Ignore (this is the case where we returned a closed block when we asked for a pending one) // When the FGW does not have a pending block, it can return the latest block instead - Err(SequencerError::DeserializeBody { body: _, serde_error }) => { + Err(SequencerError::DeserializeBody { serde_error }) => { log::debug!("Serde error when fetching the pending block: {serde_error:#}"); Ok(None) } diff --git a/crates/client/sync/src/lib.rs b/crates/client/sync/src/lib.rs index b657c6a34..4c6a7a3b1 100644 --- a/crates/client/sync/src/lib.rs +++ b/crates/client/sync/src/lib.rs @@ -1,11 +1,11 @@ use crate::l2::L2SyncConfig; use anyhow::Context; use fetch::fetchers::FetchConfig; +use hyper::header::{HeaderName, HeaderValue}; use mc_block_import::BlockImporter; use mc_db::MadaraBackend; use mc_gateway::client::builder::FeederClient; use mc_telemetry::TelemetryHandle; -use reqwest::header::{HeaderName, HeaderValue}; use std::{sync::Arc, time::Duration}; pub mod fetch; diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 108723f39..3936a7c2c 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -38,9 +38,7 @@ mp-utils.workspace = true # Starknet blockifier.workspace = true -starknet-core.workspace = true starknet-providers.workspace = true -starknet-signers.workspace = true starknet_api.workspace = true # Other @@ -51,14 +49,11 @@ chrono = "0.4.38" clap = { workspace = true, features = ["derive", "env"] } env_logger.workspace = true fdlimit.workspace = true -forwarded-header-value = "0.1.1" futures = { workspace = true, features = ["thread-pool"] } governor.workspace = true -hyper.workspace = true -ip_network.workspace = true +hyper = { version = "0.14", features = ["server"] } jsonrpsee.workspace = true log.workspace = true -primitive-types.workspace = true rand.workspace = true rayon.workspace = true reqwest.workspace = true diff --git a/crates/primitives/class/src/lib.rs b/crates/primitives/class/src/lib.rs index 197843e9b..d84b73b1f 100644 --- a/crates/primitives/class/src/lib.rs +++ b/crates/primitives/class/src/lib.rs @@ -251,7 +251,7 @@ pub enum FunctionStateMutability { } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct CompiledSierra(String); +pub struct CompiledSierra(pub String); impl AsRef for CompiledSierra { fn as_ref(&self) -> &str { diff --git a/crates/primitives/utils/Cargo.toml b/crates/primitives/utils/Cargo.toml index 2642f9bab..b3b602e54 100644 --- a/crates/primitives/utils/Cargo.toml +++ b/crates/primitives/utils/Cargo.toml @@ -19,7 +19,6 @@ targets = ["x86_64-unknown-linux-gnu"] # Starknet starknet-core.workspace = true starknet-crypto.workspace = true -starknet-types-core.workspace = true # Other anyhow.workspace = true