From 8bcf81fae4a0a6c35c78e9f9e1298f6182d34580 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Mon, 13 Jul 2020 00:28:10 -0400 Subject: [PATCH 1/6] Add a `hyper` client implementation Allows using this client facade with a `tokio` executor --- Cargo.toml | 7 ++- src/hyper.rs | 175 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 4 ++ 3 files changed, 185 insertions(+), 1 deletion(-) create mode 100644 src/hyper.rs diff --git a/Cargo.toml b/Cargo.toml index 2461321..619e95a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,12 +16,13 @@ features = ["docs"] rustdoc-args = ["--cfg", "feature=\"docs\""] [features] -default = ["h1_client"] +default = ["h1_client", "hyper_client"] docs = ["h1_client"] h1_client = ["async-h1", "async-std", "async-native-tls"] native_client = ["curl_client", "wasm_client"] curl_client = ["isahc", "async-std"] wasm_client = ["js-sys", "web-sys", "wasm-bindgen", "wasm-bindgen-futures"] +hyper_client = ["hyper", "hyper-tls"] [dependencies] futures = { version = "0.3.1" } @@ -33,6 +34,10 @@ async-h1 = { version = "2.0.0", optional = true } async-std = { version = "1.6.0", default-features = false, optional = true } async-native-tls = { version = "0.3.1", optional = true } +# reqwest-client +hyper = { version = "0.13.6", features = ["tcp"], optional = true } +hyper-tls = { version = "0.4.3", optional = true } + # isahc-client [target.'cfg(not(target_arch = "wasm32"))'.dependencies] isahc = { version = "0.9", optional = true, default-features = false, features = ["http2"] } diff --git a/src/hyper.rs b/src/hyper.rs new file mode 100644 index 0000000..9df5bd9 --- /dev/null +++ b/src/hyper.rs @@ -0,0 +1,175 @@ +//! http-client implementation for reqwest +use super::{Error, HttpClient, Request, Response}; +use http_types::StatusCode; +use hyper::body::HttpBody; +use hyper_tls::HttpsConnector; +use std::convert::{TryFrom, TryInto}; +use std::str::FromStr; + +/// Hyper-based HTTP Client. +#[derive(Debug)] +struct HyperClient {} + +impl HttpClient for HyperClient { + fn send(&self, req: Request) -> futures::future::BoxFuture<'static, Result> { + Box::pin(async move { + let req = HyperHttpRequest::try_from(req).await?.into_inner(); + // UNWRAP: Scheme guaranteed to be "http" or "https" as part of conversion + let scheme = req.uri().scheme_str().unwrap(); + + let response = match scheme { + "http" => { + let client = hyper::Client::builder().build_http::(); + client.request(req).await + } + "https" => { + let https = HttpsConnector::new(); + let client = hyper::Client::builder().build::<_, hyper::Body>(https); + client.request(req).await + } + _ => unreachable!(), + }?; + + let resp = HttpTypesResponse::try_from(response).await?.into_inner(); + Ok(resp) + }) + } +} + +struct HyperHttpRequest { + inner: hyper::Request, +} + +impl HyperHttpRequest { + async fn try_from(mut value: http_types::Request) -> Result { + // Note: Much of this code was taken from the `http-types` compat implementation. Trying to + // figure out the feature flags to conditionally compile with compat support was rather + // difficult, so copying code was deemed a reasonable intermediate solution. + // Also, because converting the `http_types` body to bytes is async, we can't implement `TryFrom` + + // TODO: Do this without a `String` allocation + let method = hyper::Method::from_str(&value.method().to_string()).unwrap(); + + let version = value + .version() + .map(|v| match v { + http_types::Version::Http0_9 => Ok(hyper::Version::HTTP_09), + http_types::Version::Http1_0 => Ok(hyper::Version::HTTP_10), + http_types::Version::Http1_1 => Ok(hyper::Version::HTTP_11), + http_types::Version::Http2_0 => Ok(hyper::Version::HTTP_2), + http_types::Version::Http3_0 => Ok(hyper::Version::HTTP_3), + _ => Err(Error::from_str( + StatusCode::BadRequest, + "unrecognized HTTP version", + )), + }) + .or(Some(Ok(hyper::Version::default()))) + .unwrap()?; + + // UNWRAP: This unwrap is unjustified in `http-types`, need to check if it's actually safe. + let uri = hyper::Uri::try_from(&format!("{}", value.url())).unwrap(); + + // `HttpClient` depends on the scheme being either "http" or "https" + match uri.scheme_str() { + Some("http") | Some("https") => (), + _ => return Err(Error::from_str(StatusCode::BadRequest, "invalid scheme")), + }; + + let mut request = hyper::Request::builder(); + + // UNWRAP: Default builder is safe + let req_headers = request.headers_mut().unwrap(); + for (name, values) in &value { + // UNWRAP: http-types and http have equivalent validation rules + let name = hyper::header::HeaderName::from_str(name.as_str()).unwrap(); + + for value in values.iter() { + // UNWRAP: http-types and http have equivalent validation rules + let value = + hyper::header::HeaderValue::from_bytes(value.as_str().as_bytes()).unwrap(); + req_headers.append(&name, value); + } + } + + let body = value.body_bytes().await?; + let body = hyper::Body::from(body); + + let req = hyper::Request::builder() + .method(method) + .version(version) + .uri(uri) + .body(body)?; + + Ok(HyperHttpRequest { inner: req }) + } + + fn into_inner(self) -> hyper::Request { + self.inner + } +} + +struct HttpTypesResponse { + inner: http_types::Response, +} + +impl HttpTypesResponse { + + async fn try_from(value: hyper::Response) -> Result { + // Note: Much of this code was taken from the `http-types` compat implementation. Trying to + // figure out the feature flags to conditionally compile with compat support was rather + // difficult, so copying code was deemed a reasonable intermediate solution. + let (parts, mut body) = value.into_parts(); + + // UNWRAP: http and http-types implement the same status codes + let status: StatusCode = parts.status.as_u16().try_into().unwrap(); + + let version = match parts.version { + hyper::Version::HTTP_09 => Ok(http_types::Version::Http0_9), + hyper::Version::HTTP_10 => Ok(http_types::Version::Http1_0), + hyper::Version::HTTP_11 => Ok(http_types::Version::Http1_1), + hyper::Version::HTTP_2 => Ok(http_types::Version::Http2_0), + hyper::Version::HTTP_3 => Ok(http_types::Version::Http3_0), + // TODO: Is this realistically reachable, and should it be marked BadRequest? + _ => Err(Error::from_str( + StatusCode::BadRequest, + "unrecognized HTTP response version", + )), + }?; + + let body = match body.data().await { + None => None, + Some(Ok(b)) => Some(b), + Some(Err(_)) => { + return Err(Error::from_str( + StatusCode::BadRequest, + "unable to read HTTP response body", + )) + } + } + .map(|b| http_types::Body::from_bytes(b.to_vec())) + // TODO: How does `http-types` handle responses without bodies? + .unwrap_or(http_types::Body::from_bytes(Vec::new())); + + let mut res = Response::new(status); + res.set_version(Some(version)); + + for (name, value) in parts.headers { + // TODO: http_types uses an `unsafe` block here, should it be allowed for `hyper` as well? + let value = value.as_bytes().to_owned(); + let value = http_types::headers::HeaderValue::from_bytes(value)?; + + if let Some(name) = name { + let name = name.as_str(); + let name = http_types::headers::HeaderName::from_str(name)?; + res.insert_header(name, value); + } + } + + res.set_body(body); + Ok(HttpTypesResponse { inner: res }) + } + + fn into_inner(self) -> http_types::Response { + self.inner + } +} diff --git a/src/lib.rs b/src/lib.rs index 3e5c8cb..e9bd9d9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,10 @@ use futures::future::BoxFuture; +#[cfg_attr(feature = "docs", doc(cfg(curl_client)))] +#[cfg(all(feature = "hyper_client", not(target_arch = "wasm32")))] +pub mod hyper; + #[cfg_attr(feature = "docs", doc(cfg(curl_client)))] #[cfg(all(feature = "curl_client", not(target_arch = "wasm32")))] pub mod isahc; From 6b4c0a51813233777d26b22a0c5ff7b75cae6c1b Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Mon, 13 Jul 2020 00:38:17 -0400 Subject: [PATCH 2/6] Don't include `hyper` as a default client --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 619e95a..012700a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ features = ["docs"] rustdoc-args = ["--cfg", "feature=\"docs\""] [features] -default = ["h1_client", "hyper_client"] +default = ["h1_client"] docs = ["h1_client"] h1_client = ["async-h1", "async-std", "async-native-tls"] native_client = ["curl_client", "wasm_client"] From 054bcf2e5b0f7b8b6cbde49e2f0c943aadf84e86 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Mon, 13 Jul 2020 00:42:12 -0400 Subject: [PATCH 3/6] Fix fmt --- src/hyper.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/hyper.rs b/src/hyper.rs index 9df5bd9..43f2e44 100644 --- a/src/hyper.rs +++ b/src/hyper.rs @@ -113,7 +113,6 @@ struct HttpTypesResponse { } impl HttpTypesResponse { - async fn try_from(value: hyper::Response) -> Result { // Note: Much of this code was taken from the `http-types` compat implementation. Trying to // figure out the feature flags to conditionally compile with compat support was rather From b2388fac6dd052e5548fd4f2f92046e0acbcd188 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Mon, 13 Jul 2020 10:19:57 -0400 Subject: [PATCH 4/6] Clean up TODO and implement an `echo` test case --- Cargo.toml | 3 +- src/hyper.rs | 118 +++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 92 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 012700a..e97f58e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,5 +68,6 @@ features = [ [dev-dependencies] async-std = { version = "1.6.0", features = ["unstable", "attributes"] } -tide = { version = "0.9.0" } portpicker = "0.1.0" +tide = { version = "0.9.0" } +tokio = { version = "0.2.21", features = ["macros"] } diff --git a/src/hyper.rs b/src/hyper.rs index 43f2e44..a131a11 100644 --- a/src/hyper.rs +++ b/src/hyper.rs @@ -1,6 +1,8 @@ //! http-client implementation for reqwest + use super::{Error, HttpClient, Request, Response}; -use http_types::StatusCode; +use http_types::headers::{HeaderName, HeaderValue}; +use http_types::{Method, StatusCode, Version}; use hyper::body::HttpBody; use hyper_tls::HttpsConnector; use std::convert::{TryFrom, TryInto}; @@ -8,7 +10,16 @@ use std::str::FromStr; /// Hyper-based HTTP Client. #[derive(Debug)] -struct HyperClient {} +pub struct HyperClient {} + +impl HyperClient { + /// Create a new client. + /// + /// There is no specific benefit to reusing instances of this client. + pub fn new() -> Self { + HyperClient {} + } +} impl HttpClient for HyperClient { fn send(&self, req: Request) -> futures::future::BoxFuture<'static, Result> { @@ -41,23 +52,32 @@ struct HyperHttpRequest { } impl HyperHttpRequest { - async fn try_from(mut value: http_types::Request) -> Result { - // Note: Much of this code was taken from the `http-types` compat implementation. Trying to - // figure out the feature flags to conditionally compile with compat support was rather - // difficult, so copying code was deemed a reasonable intermediate solution. - // Also, because converting the `http_types` body to bytes is async, we can't implement `TryFrom` - - // TODO: Do this without a `String` allocation - let method = hyper::Method::from_str(&value.method().to_string()).unwrap(); + async fn try_from(mut value: Request) -> Result { + let method = match value.method() { + Method::Get => hyper::Method::GET, + Method::Head => hyper::Method::HEAD, + Method::Post => hyper::Method::POST, + Method::Put => hyper::Method::PUT, + Method::Patch => hyper::Method::PATCH, + Method::Options => hyper::Method::OPTIONS, + Method::Trace => hyper::Method::TRACE, + Method::Connect => hyper::Method::CONNECT, + _ => { + return Err(Error::from_str( + StatusCode::BadRequest, + "unrecognized HTTP method", + )) + } + }; let version = value .version() .map(|v| match v { - http_types::Version::Http0_9 => Ok(hyper::Version::HTTP_09), - http_types::Version::Http1_0 => Ok(hyper::Version::HTTP_10), - http_types::Version::Http1_1 => Ok(hyper::Version::HTTP_11), - http_types::Version::Http2_0 => Ok(hyper::Version::HTTP_2), - http_types::Version::Http3_0 => Ok(hyper::Version::HTTP_3), + Version::Http0_9 => Ok(hyper::Version::HTTP_09), + Version::Http1_0 => Ok(hyper::Version::HTTP_10), + Version::Http1_1 => Ok(hyper::Version::HTTP_11), + Version::Http2_0 => Ok(hyper::Version::HTTP_2), + Version::Http3_0 => Ok(hyper::Version::HTTP_3), _ => Err(Error::from_str( StatusCode::BadRequest, "unrecognized HTTP version", @@ -109,14 +129,11 @@ impl HyperHttpRequest { } struct HttpTypesResponse { - inner: http_types::Response, + inner: Response, } impl HttpTypesResponse { async fn try_from(value: hyper::Response) -> Result { - // Note: Much of this code was taken from the `http-types` compat implementation. Trying to - // figure out the feature flags to conditionally compile with compat support was rather - // difficult, so copying code was deemed a reasonable intermediate solution. let (parts, mut body) = value.into_parts(); // UNWRAP: http and http-types implement the same status codes @@ -128,9 +145,8 @@ impl HttpTypesResponse { hyper::Version::HTTP_11 => Ok(http_types::Version::Http1_1), hyper::Version::HTTP_2 => Ok(http_types::Version::Http2_0), hyper::Version::HTTP_3 => Ok(http_types::Version::Http3_0), - // TODO: Is this realistically reachable, and should it be marked BadRequest? _ => Err(Error::from_str( - StatusCode::BadRequest, + StatusCode::BadGateway, "unrecognized HTTP response version", )), }?; @@ -140,26 +156,24 @@ impl HttpTypesResponse { Some(Ok(b)) => Some(b), Some(Err(_)) => { return Err(Error::from_str( - StatusCode::BadRequest, + StatusCode::BadGateway, "unable to read HTTP response body", )) } } .map(|b| http_types::Body::from_bytes(b.to_vec())) - // TODO: How does `http-types` handle responses without bodies? - .unwrap_or(http_types::Body::from_bytes(Vec::new())); + .unwrap_or(http_types::Body::empty()); let mut res = Response::new(status); res.set_version(Some(version)); for (name, value) in parts.headers { - // TODO: http_types uses an `unsafe` block here, should it be allowed for `hyper` as well? let value = value.as_bytes().to_owned(); - let value = http_types::headers::HeaderValue::from_bytes(value)?; + let value = HeaderValue::from_bytes(value)?; if let Some(name) = name { let name = name.as_str(); - let name = http_types::headers::HeaderName::from_str(name)?; + let name = HeaderName::from_str(name)?; res.insert_header(name, value); } } @@ -168,7 +182,55 @@ impl HttpTypesResponse { Ok(HttpTypesResponse { inner: res }) } - fn into_inner(self) -> http_types::Response { + fn into_inner(self) -> Response { self.inner } } + +#[cfg(test)] +mod tests { + use crate::{Error, HttpClient}; + use http_types::{Method, Request, Url}; + use hyper::service::{make_service_fn, service_fn}; + use std::time::Duration; + use tokio::sync::oneshot::channel; + + use super::HyperClient; + + async fn echo( + req: hyper::Request, + ) -> Result, hyper::Error> { + Ok(hyper::Response::new(req.into_body())) + } + + #[tokio::test] + async fn basic_functionality() { + let (send, recv) = channel::<()>(); + + let recv = async move { recv.await.unwrap_or(()) }; + + let addr = ([127, 0, 0, 1], portpicker::pick_unused_port().unwrap()).into(); + let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(echo)) }); + let server = hyper::Server::bind(&addr) + .serve(service) + .with_graceful_shutdown(recv); + + let client = HyperClient::new(); + let url = Url::parse(&format!("http://localhost:{}", addr.port())).unwrap(); + let mut req = Request::new(Method::Get, url); + req.set_body("hello"); + + let client = async move { + tokio::time::delay_for(Duration::from_millis(100)).await; + let mut resp = client.send(req).await?; + send.send(()).unwrap(); + assert_eq!(resp.body_string().await?, "hello"); + + Result::<(), Error>::Ok(()) + }; + + let (client_res, server_res) = tokio::join!(client, server); + assert!(client_res.is_ok()); + assert!(server_res.is_ok()); + } +} From 0d15dadeb8cc974289926601f0aac0face5ba6ad Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Mon, 13 Jul 2020 10:24:32 -0400 Subject: [PATCH 5/6] Slight cleanup for module exporting --- src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e9bd9d9..490eef6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,10 +16,6 @@ use futures::future::BoxFuture; -#[cfg_attr(feature = "docs", doc(cfg(curl_client)))] -#[cfg(all(feature = "hyper_client", not(target_arch = "wasm32")))] -pub mod hyper; - #[cfg_attr(feature = "docs", doc(cfg(curl_client)))] #[cfg(all(feature = "curl_client", not(target_arch = "wasm32")))] pub mod isahc; @@ -36,6 +32,10 @@ pub mod native; #[cfg(feature = "h1_client")] pub mod h1; +#[cfg_attr(feature = "docs", doc(cfg(hyper_client)))] +#[cfg(feature = "hyper_client")] +pub mod hyper; + /// An HTTP Request type with a streaming body. pub type Request = http_types::Request; From e33e60faeb029769971c958977b0f45b6520f2d8 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Tue, 14 Jul 2020 09:20:45 -0400 Subject: [PATCH 6/6] Enable `hyperium_http` --- Cargo.toml | 2 +- src/hyper.rs | 66 +++++++--------------------------------------------- 2 files changed, 10 insertions(+), 58 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e97f58e..8810188 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ hyper_client = ["hyper", "hyper-tls"] [dependencies] futures = { version = "0.3.1" } -http-types = "2.0.1" +http-types = { version = "2.0.1", features = ["hyperium_http"] } log = "0.4.7" # h1-client diff --git a/src/hyper.rs b/src/hyper.rs index a131a11..7141a15 100644 --- a/src/hyper.rs +++ b/src/hyper.rs @@ -2,10 +2,10 @@ use super::{Error, HttpClient, Request, Response}; use http_types::headers::{HeaderName, HeaderValue}; -use http_types::{Method, StatusCode, Version}; +use http_types::StatusCode; use hyper::body::HttpBody; use hyper_tls::HttpsConnector; -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; use std::str::FromStr; /// Hyper-based HTTP Client. @@ -53,43 +53,10 @@ struct HyperHttpRequest { impl HyperHttpRequest { async fn try_from(mut value: Request) -> Result { - let method = match value.method() { - Method::Get => hyper::Method::GET, - Method::Head => hyper::Method::HEAD, - Method::Post => hyper::Method::POST, - Method::Put => hyper::Method::PUT, - Method::Patch => hyper::Method::PATCH, - Method::Options => hyper::Method::OPTIONS, - Method::Trace => hyper::Method::TRACE, - Method::Connect => hyper::Method::CONNECT, - _ => { - return Err(Error::from_str( - StatusCode::BadRequest, - "unrecognized HTTP method", - )) - } - }; - - let version = value - .version() - .map(|v| match v { - Version::Http0_9 => Ok(hyper::Version::HTTP_09), - Version::Http1_0 => Ok(hyper::Version::HTTP_10), - Version::Http1_1 => Ok(hyper::Version::HTTP_11), - Version::Http2_0 => Ok(hyper::Version::HTTP_2), - Version::Http3_0 => Ok(hyper::Version::HTTP_3), - _ => Err(Error::from_str( - StatusCode::BadRequest, - "unrecognized HTTP version", - )), - }) - .or(Some(Ok(hyper::Version::default()))) - .unwrap()?; - // UNWRAP: This unwrap is unjustified in `http-types`, need to check if it's actually safe. let uri = hyper::Uri::try_from(&format!("{}", value.url())).unwrap(); - // `HttpClient` depends on the scheme being either "http" or "https" + // `HyperClient` depends on the scheme being either "http" or "https" match uri.scheme_str() { Some("http") | Some("https") => (), _ => return Err(Error::from_str(StatusCode::BadRequest, "invalid scheme")), @@ -114,13 +81,13 @@ impl HyperHttpRequest { let body = value.body_bytes().await?; let body = hyper::Body::from(body); - let req = hyper::Request::builder() - .method(method) - .version(version) + let request = request + .method(value.method()) + .version(value.version().map(|v| v.into()).unwrap_or_default()) .uri(uri) .body(body)?; - Ok(HyperHttpRequest { inner: req }) + Ok(HyperHttpRequest { inner: request }) } fn into_inner(self) -> hyper::Request { @@ -136,21 +103,6 @@ impl HttpTypesResponse { async fn try_from(value: hyper::Response) -> Result { let (parts, mut body) = value.into_parts(); - // UNWRAP: http and http-types implement the same status codes - let status: StatusCode = parts.status.as_u16().try_into().unwrap(); - - let version = match parts.version { - hyper::Version::HTTP_09 => Ok(http_types::Version::Http0_9), - hyper::Version::HTTP_10 => Ok(http_types::Version::Http1_0), - hyper::Version::HTTP_11 => Ok(http_types::Version::Http1_1), - hyper::Version::HTTP_2 => Ok(http_types::Version::Http2_0), - hyper::Version::HTTP_3 => Ok(http_types::Version::Http3_0), - _ => Err(Error::from_str( - StatusCode::BadGateway, - "unrecognized HTTP response version", - )), - }?; - let body = match body.data().await { None => None, Some(Ok(b)) => Some(b), @@ -164,8 +116,8 @@ impl HttpTypesResponse { .map(|b| http_types::Body::from_bytes(b.to_vec())) .unwrap_or(http_types::Body::empty()); - let mut res = Response::new(status); - res.set_version(Some(version)); + let mut res = Response::new(parts.status); + res.set_version(Some(parts.version.into())); for (name, value) in parts.headers { let value = value.as_bytes().to_owned();