diff --git a/Cargo.lock b/Cargo.lock index f6ea5a2ea3..35aec12e27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -557,6 +557,16 @@ dependencies = [ "nom 2.2.1", ] +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "assert_cmd" version = "2.0.13" @@ -1215,6 +1225,16 @@ dependencies = [ "cc", ] +[[package]] +name = "colored" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbf2150cce219b664a8a70df7a1f933836724b503f8a413af9365b4dcc4d90b8" +dependencies = [ + "lazy_static", + "windows-sys 0.48.0", +] + [[package]] name = "console" version = "0.10.3" @@ -2833,11 +2853,14 @@ dependencies = [ name = "gsb-http-proxy" version = "0.1.0" dependencies = [ + "actix-http", + "actix-web", "async-stream", "chrono", "futures 0.3.30", "http 1.0.0", "log", + "mockito", "reqwest", "serde", "serde_json", @@ -3962,6 +3985,25 @@ dependencies = [ "ws2_32-sys", ] +[[package]] +name = "mockito" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8d3038e23466858569c2d30a537f691fa0d53b51626630ae08262943e3bbb8b" +dependencies = [ + "assert-json-diff", + "colored", + "futures 0.3.30", + "hyper", + "log", + "rand 0.8.5", + "regex", + "serde_json", + "serde_urlencoded", + "similar", + "tokio", +] + [[package]] name = "multimap" version = "0.8.3" @@ -6359,6 +6401,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" +[[package]] +name = "similar" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32fea41aca09ee824cc9724996433064c89f7777e60762749a4170a14abbfa21" + [[package]] name = "sized-chunks" version = "0.6.5" diff --git a/core/activity/src/http_proxy.rs b/core/activity/src/http_proxy.rs index 7cc76127bd..7ff5f65d58 100644 --- a/core/activity/src/http_proxy.rs +++ b/core/activity/src/http_proxy.rs @@ -76,7 +76,12 @@ async fn proxy_http_request( .call_streaming(msg) }; - let stream = invoke(body, method, path_activity_url.url, call_streaming); + let gsb_call = GsbHttpCall { + method: method.to_string(), + path: path_activity_url.url, + body, + }; + let stream = gsb_call.invoke(call_streaming); Ok(HttpResponse::Ok() .keep_alive() diff --git a/exe-unit/components/http-proxy/Cargo.toml b/exe-unit/components/http-proxy/Cargo.toml index 52b08bede2..25c774d143 100644 --- a/exe-unit/components/http-proxy/Cargo.toml +++ b/exe-unit/components/http-proxy/Cargo.toml @@ -17,4 +17,7 @@ tokio = { version = "1.35.0", features = ["full"] } reqwest = { version = "0.11.23", features = ["json"] } log = { version = "0.4.20", features = [] } async-stream = "0.3.5" -futures = { version = "0.3", features = [] } \ No newline at end of file +futures = { version = "0.3", features = [] } +actix-web = "4" +actix-http = "3" +mockito = "1.2.0" \ No newline at end of file diff --git a/exe-unit/components/http-proxy/src/lib.rs b/exe-unit/components/http-proxy/src/lib.rs index 4e67c093f1..e4f337d68a 100644 --- a/exe-unit/components/http-proxy/src/lib.rs +++ b/exe-unit/components/http-proxy/src/lib.rs @@ -1,3 +1,4 @@ +use actix_http::body::MessageBody; use async_stream::stream; use chrono::Utc; use futures::prelude::*; @@ -105,20 +106,15 @@ impl GsbHttpCall { } }; - stream + Box::pin(stream) } pub fn invoke( &self, trigger_stream: F, - ) -> impl Stream> + Unpin + Sized + ) -> impl Stream> + Unpin + Sized where - T: Stream< - Item = Result< - Result, - ya_service_bus::Error, - >, - > + Unpin, + T: Stream, Error>> + Unpin, F: FnOnce(GsbHttpCall) -> T, { let path = if let Some(stripped_url) = self.path.strip_prefix('/') { @@ -135,15 +131,18 @@ impl GsbHttpCall { let stream = trigger_stream(msg); - stream + let stream = stream .map(|item| item.unwrap_or_else(|e| Err(HttpProxyStatusError::from(e)))) .map(move |result| { let msg = match result { - Ok(r) => r.msg, - Err(e) => format!("Error {}", e), + Ok(r) => actix_web::web::Bytes::from(r.msg), + Err(e) => actix_web::web::Bytes::from(format!("Error {}", e)), }; - Ok::(msg) - }) + msg.try_into_bytes().map_err(|_| { + Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string()) + }) + }); + Box::pin(stream) } } @@ -151,9 +150,38 @@ impl GsbHttpCall { mod tests { use super::*; use crate::GsbHttpCall; + use mockito; use tokio::pin; - #[tokio::test] + #[actix_web::test] + async fn gsb_proxy_execute() { + // Mock server + let mut server = mockito::Server::new(); + let url = server.url(); + + server + .mock("GET", "/endpoint") + .with_status(201) + .with_body("response") + .create(); + + let mut gsb_call = GsbHttpCall { + method: "GET".to_string(), + path: "/endpoint".to_string(), + body: None, + }; + + let mut response_stream = gsb_call.execute(url); + + let mut v = vec![]; + while let Some(event) = response_stream.next().await { + v.push(event.msg); + } + + assert_eq!(vec!["response"], v); + } + + #[actix_web::test] async fn gsb_proxy_invoke() { let gsb_call = GsbHttpCall { method: "GET".to_string(), @@ -166,7 +194,7 @@ mod tests { let event = GsbHttpCallEvent { index: i, timestamp: "timestamp".to_string(), - msg: "response".to_string() + msg: format!("response {}", i) }; let result = Ok(event); yield Ok(result) @@ -175,10 +203,13 @@ mod tests { pin!(stream); let mut response_stream = gsb_call.invoke(|_| stream); + let mut v = vec![]; while let Some(event) = response_stream.next().await { if let Ok(event) = event { - assert_eq!("response", event); + v.push(event); } } + + assert_eq!(vec!["response 0", "response 1", "response 2"], v); } }