Skip to content

Commit

Permalink
test execute
Browse files Browse the repository at this point in the history
  • Loading branch information
staszek-krotki committed Jan 15, 2024
1 parent ccf8c67 commit 6e07db5
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 18 deletions.
48 changes: 48 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion core/activity/src/http_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ async fn proxy_http_request(
.call_streaming(msg)
};

let stream = invoke(body, method, path_activity_url.url, call_streaming);
let gsb_call = GsbHttpCall {
method: method.to_string(),
path: path_activity_url.url,
body,
};
let stream = gsb_call.invoke(call_streaming);

Ok(HttpResponse::Ok()
.keep_alive()
Expand Down
5 changes: 4 additions & 1 deletion exe-unit/components/http-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ tokio = { version = "1.35.0", features = ["full"] }
reqwest = { version = "0.11.23", features = ["json"] }
log = { version = "0.4.20", features = [] }
async-stream = "0.3.5"
futures = { version = "0.3", features = [] }
futures = { version = "0.3", features = [] }
actix-web = "4"
actix-http = "3"
mockito = "1.2.0"
63 changes: 47 additions & 16 deletions exe-unit/components/http-proxy/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use actix_http::body::MessageBody;
use async_stream::stream;
use chrono::Utc;
use futures::prelude::*;
Expand Down Expand Up @@ -105,20 +106,15 @@ impl GsbHttpCall {
}
};

stream
Box::pin(stream)
}

pub fn invoke<T, F>(
&self,
trigger_stream: F,
) -> impl Stream<Item = Result<String, Error>> + Unpin + Sized
) -> impl Stream<Item = Result<actix_web::web::Bytes, Error>> + Unpin + Sized
where
T: Stream<
Item = Result<
Result<GsbHttpCallEvent, HttpProxyStatusError>,
ya_service_bus::Error,
>,
> + Unpin,
T: Stream<Item = Result<Result<GsbHttpCallEvent, HttpProxyStatusError>, Error>> + Unpin,
F: FnOnce(GsbHttpCall) -> T,
{
let path = if let Some(stripped_url) = self.path.strip_prefix('/') {
Expand All @@ -135,25 +131,57 @@ impl GsbHttpCall {

let stream = trigger_stream(msg);

stream
let stream = stream
.map(|item| item.unwrap_or_else(|e| Err(HttpProxyStatusError::from(e))))
.map(move |result| {
let msg = match result {
Ok(r) => r.msg,
Err(e) => format!("Error {}", e),
Ok(r) => actix_web::web::Bytes::from(r.msg),
Err(e) => actix_web::web::Bytes::from(format!("Error {}", e)),
};
Ok::<String, Error>(msg)
})
msg.try_into_bytes().map_err(|_| {
Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string())
})
});
Box::pin(stream)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::GsbHttpCall;
use mockito;
use tokio::pin;

#[tokio::test]
#[actix_web::test]
async fn gsb_proxy_execute() {
// Mock server
let mut server = mockito::Server::new();
let url = server.url();

server
.mock("GET", "/endpoint")
.with_status(201)
.with_body("response")
.create();

let mut gsb_call = GsbHttpCall {
method: "GET".to_string(),
path: "/endpoint".to_string(),
body: None,
};

let mut response_stream = gsb_call.execute(url);

let mut v = vec![];
while let Some(event) = response_stream.next().await {
v.push(event.msg);
}

assert_eq!(vec!["response"], v);
}

#[actix_web::test]
async fn gsb_proxy_invoke() {
let gsb_call = GsbHttpCall {
method: "GET".to_string(),
Expand All @@ -166,7 +194,7 @@ mod tests {
let event = GsbHttpCallEvent {
index: i,
timestamp: "timestamp".to_string(),
msg: "response".to_string()
msg: format!("response {}", i)
};
let result = Ok(event);
yield Ok(result)
Expand All @@ -175,10 +203,13 @@ mod tests {
pin!(stream);
let mut response_stream = gsb_call.invoke(|_| stream);

let mut v = vec![];
while let Some(event) = response_stream.next().await {
if let Ok(event) = event {
assert_eq!("response", event);
v.push(event);
}
}

assert_eq!(vec!["response 0", "response 1", "response 2"], v);
}
}

0 comments on commit 6e07db5

Please sign in to comment.