diff --git a/cli/tests/integration/sending_response.rs b/cli/tests/integration/sending_response.rs index e18157e5..4846c8db 100644 --- a/cli/tests/integration/sending_response.rs +++ b/cli/tests/integration/sending_response.rs @@ -72,7 +72,7 @@ async fn responses_can_be_streamed_downstream() -> TestResult { // accumulate the entire body to a vector let mut body = Vec::new(); while let Some(chunk) = resp.data().await { - body.extend_from_slice(&chunk?); + body.extend_from_slice(&chunk.unwrap()); } // work with the body as a string, breaking it into lines diff --git a/lib/src/body.rs b/lib/src/body.rs index e26186f7..bc9d26ac 100644 --- a/lib/src/body.rs +++ b/lib/src/body.rs @@ -1,7 +1,7 @@ //! Body type, for request and response bodies. use { - crate::error, + crate::{error, streaming_body::StreamingBodyItem, Error}, bytes::{BufMut, BytesMut}, flate2::write::GzDecoder, futures::pin_mut, @@ -36,7 +36,7 @@ pub enum Chunk { /// Since the channel yields chunks, this variant represents a *stream* of chunks rather than /// one individual chunk. That stream is effectively "flattened" on-demand, as the `Body` /// containing it is read. - Channel(mpsc::Receiver), + Channel(mpsc::Receiver), /// A version of `HttpBody` that assumes that the interior data is gzip-compressed. CompressedHttpBody(DecoderState, hyper::Body), } @@ -72,8 +72,8 @@ impl From for Chunk { } } -impl From> for Chunk { - fn from(chan: mpsc::Receiver) -> Self { +impl From> for Chunk { + fn from(chan: mpsc::Receiver) -> Self { Chunk::Channel(chan) } } @@ -209,11 +209,10 @@ impl HttpBody for Body { return Poll::Pending; } Poll::Ready(None) => { - // no more chunks from this stream, so continue the loop now that it's been - // popped - continue; + // the channel completed without a Finish message, so yield an error + return Poll::Ready(Some(Err(Error::UnfinishedStreamingBody))); } - Poll::Ready(Some(chunk)) => { + Poll::Ready(Some(StreamingBodyItem::Chunk(chunk))) => { // put the channel back first, so we can poll it again after the chunk it // just yielded self.chunks.push_front(receiver.into()); @@ -222,6 +221,13 @@ impl HttpBody for Body { self.chunks.push_front(chunk); continue; } + Poll::Ready(Some(StreamingBodyItem::Finished)) => { + // it shouldn't be possible for any more chunks to arrive on this + // channel, but just in case we won't try to read them; dropping the + // receiver means we won't hit the `Ready(None)` case above that + // indicates an unfinished streaming body + continue; + } } } Chunk::CompressedHttpBody(ref mut decoder_state, ref mut body) => { diff --git a/lib/src/error.rs b/lib/src/error.rs index f6729c5b..0542e9f9 100644 --- a/lib/src/error.rs +++ b/lib/src/error.rs @@ -118,6 +118,9 @@ pub enum Error { #[error("Invalid Object Store `key` value used: {0}.")] ObjectStoreKeyValidationError(#[from] crate::object_store::KeyValidationError), + + #[error("Unfinished streaming body")] + UnfinishedStreamingBody, } impl Error { @@ -169,7 +172,8 @@ impl Error { | Error::BackendNameRegistryError(_) | Error::HttpError(_) | Error::UnknownObjectStore(_) - | Error::ObjectStoreKeyValidationError(_) => FastlyStatus::Error, + | Error::ObjectStoreKeyValidationError(_) + | Error::UnfinishedStreamingBody => FastlyStatus::Error, } } diff --git a/lib/src/streaming_body.rs b/lib/src/streaming_body.rs index 78c3d427..fd6d882c 100644 --- a/lib/src/streaming_body.rs +++ b/lib/src/streaming_body.rs @@ -13,12 +13,39 @@ const STREAMING_CHANNEL_SIZE: usize = 8; /// The corresponding "read end" can be found in the [`Chunk`] type. #[derive(Debug)] pub struct StreamingBody { - sender: mpsc::Sender, + sender: mpsc::Sender, +} + +/// The items sent over the `StreamingBody` channel. +/// +/// These are either a [`Chunk`] corresponding to a write, or else a "finish" message. The purpose +/// of the finish message is to ensure that we don't accidentally make incomplete messages appear +/// complete. +/// +/// If the streaming body is associated with a `content-length` request or response, the finish +/// message is largely meaningless, as the content length provides the necessary framing information +/// required for recipients to recognize an incomplete message. +/// +/// The situation is more delicate with `transfer-encoding: chunked` requests and responses. In +/// these cases, `hyper` will dutifully frame each chunk as it reads them from the `Body`. If the +/// `Body` suddenly returns `Ok(None)`, it will apply the proper `0\r\n\r\n` termination to the +/// message. The finish message ensures that this will only happen when the Wasm program +/// affirmitavely marks the body as finished. +#[derive(Debug)] +pub enum StreamingBodyItem { + Chunk(Chunk), + Finished, +} + +impl From for StreamingBodyItem { + fn from(chunk: Chunk) -> Self { + Self::Chunk(chunk) + } } impl StreamingBody { /// Create a new channel for streaming a body, returning write and read ends as a pair. - pub fn new() -> (StreamingBody, mpsc::Receiver) { + pub fn new() -> (StreamingBody, mpsc::Receiver) { let (sender, receiver) = mpsc::channel(STREAMING_CHANNEL_SIZE); (StreamingBody { sender }, receiver) } @@ -29,7 +56,7 @@ impl StreamingBody { /// sending, e.g. due to the receive end being closed. pub async fn send_chunk(&mut self, chunk: impl Into) -> Result<(), Error> { self.sender - .send(chunk.into()) + .send(Chunk::from(chunk.into()).into()) .await .map_err(|_| Error::StreamingChunkSend) } @@ -38,4 +65,25 @@ impl StreamingBody { pub async fn await_ready(&mut self) { let _ = self.sender.reserve().await; } + + /// Mark this streaming body as finished, so that it will be terminated correctly. + /// + /// This is important primarily for `Transfer-Encoding: chunked` bodies where a premature close + /// is only noticed if the chunked encoding is not properly terminated. + pub fn finish(self) -> Result<(), Error> { + match self.sender.try_send(StreamingBodyItem::Finished) { + Ok(()) => Ok(()), + Err(mpsc::error::TrySendError::Closed(_)) => Ok(()), + Err(mpsc::error::TrySendError::Full(_)) => { + // If the channel is full, maybe the other end is just taking a while to receive all + // the bytes. Spawn a task that will send a `finish` message as soon as there's room + // in the channel. + let sender = self.sender.clone(); + tokio::task::spawn(async move { + let _ = sender.send(StreamingBodyItem::Finished).await; + }); + Ok(()) + } + } + } } diff --git a/lib/src/wiggle_abi/body_impl.rs b/lib/src/wiggle_abi/body_impl.rs index 454b9530..54313403 100644 --- a/lib/src/wiggle_abi/body_impl.rs +++ b/lib/src/wiggle_abi/body_impl.rs @@ -101,7 +101,11 @@ impl FastlyHttpBody for Session { fn close(&mut self, body_handle: BodyHandle) -> Result<(), Error> { // Drop the body and pass up an error if the handle does not exist - self.drop_body(body_handle)?; - Ok(()) + if self.is_streaming_body(body_handle) { + // Make sure a streaming body gets a `finish` message + self.take_streaming_body(body_handle)?.finish() + } else { + Ok(self.drop_body(body_handle)?) + } } } diff --git a/test-fixtures/Cargo.lock b/test-fixtures/Cargo.lock index 7a8b79ce..9e23fdcd 100644 --- a/test-fixtures/Cargo.lock +++ b/test-fixtures/Cargo.lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "anyhow" -version = "1.0.66" +version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" +checksum = "98161a4e3e2184da77bb14f02184cdd111e83bbbcc9979dfee3c44b9a85f5602" [[package]] name = "bitflags" @@ -61,9 +61,9 @@ dependencies = [ [[package]] name = "fastly" -version = "0.8.9" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d14ee2f12a3191582449a2e080287eff8898fc52da61c437869d91c41b3ae2a" +checksum = "645e7a0c3782166bcab2c7f0704d51db0fed2d27093d5beea94cef878ba11bb9" dependencies = [ "anyhow", "bytes 0.5.6", @@ -85,9 +85,9 @@ dependencies = [ [[package]] name = "fastly-macros" -version = "0.8.9" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3d1b2980c09148cb84bfd18591943ee449a350dc1ff1ead69edbf012f7e2eef" +checksum = "6b5163881fe9bab8e865258351e931635d2d17ed88113e546db7c7e57f7249e9" dependencies = [ "proc-macro2", "quote", @@ -96,9 +96,9 @@ dependencies = [ [[package]] name = "fastly-shared" -version = "0.8.9" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f79939bdbbab8b1de759d584f386319454f8623681400675a07f9ad35f2b158" +checksum = "40d95b3a3816d17b02f3972d16fefddd164c0ec7c1b4e090b380177131a4a22d" dependencies = [ "bitflags", "http", @@ -107,9 +107,9 @@ dependencies = [ [[package]] name = "fastly-sys" -version = "0.8.9" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d01e98274a6b12c85c0eef22924871cc12c54b5322268947322463e59d09336" +checksum = "286a15f3231704bfadc3e2a739b04fdd6fadfa4706157d482f05c0539c681594" dependencies = [ "bitflags", "fastly-shared", @@ -175,9 +175,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.137" +version = "0.2.135" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc7fcc620a3bff7cdd7a365be3376c97191aeaccc2a603e600951e452615bf89" +checksum = "68783febc7782c6c5cb401fbda4de5a9898be1762314da0bb2c10ced61f18b0c" [[package]] name = "mime" @@ -185,6 +185,15 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "num_threads" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +dependencies = [ + "libc", +] + [[package]] name = "opaque-debug" version = "0.3.0" @@ -223,18 +232,18 @@ checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" [[package]] name = "serde" -version = "1.0.147" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d193d69bae983fc11a79df82342761dfbf28a99fc8d203dca4c3c1b590948965" +checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.147" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f1d362ca8fc9c3e3a7484440752472d68a6caa98f1ab81d99b5dfe517cec852" +checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c" dependencies = [ "proc-macro2", "quote", @@ -243,9 +252,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.87" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ce777b7b150d76b9cf60d28b55f5847135a003f7d7350c6be7a773508ce7d45" +checksum = "41feea4228a6f1cd09ec7a3593a682276702cd67b5273544757dae23c096f074" dependencies = [ "itoa", "ryu", @@ -279,9 +288,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.103" +version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a864042229133ada95abf3b54fdc62ef5ccabe9515b64717bcb9a1919e59445d" +checksum = "3fcd952facd492f9be3ef0d0b7032a6e442ee9b361d4acc2b1d0c4aaa5f613a1" dependencies = [ "proc-macro2", "quote", @@ -322,28 +331,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.17" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376" +checksum = "d634a985c4d4238ec39cacaed2e7ae552fbd3c476b552c1deac3021b7d7eaf0c" dependencies = [ + "libc", + "num_threads", "serde", - "time-core", - "time-macros", -] - -[[package]] -name = "time-core" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" - -[[package]] -name = "time-macros" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2" -dependencies = [ - "time-core", ] [[package]] diff --git a/test-fixtures/Cargo.toml b/test-fixtures/Cargo.toml index bdd23455..832b54ff 100644 --- a/test-fixtures/Cargo.toml +++ b/test-fixtures/Cargo.toml @@ -8,9 +8,9 @@ license = "Apache-2.0 WITH LLVM-exception" publish = false [dependencies] -fastly = "=0.8.9" -fastly-shared = "=0.8.9" -fastly-sys = "=0.8.9" +fastly = "^0.9.1" +fastly-shared = "^0.9.1" +fastly-sys = "^0.9.1" bytes = "1.0.0" http = "0.2.1" serde = "1.0.114" diff --git a/test-fixtures/src/bin/gzipped-response.rs b/test-fixtures/src/bin/gzipped-response.rs index a338dc3b..837a224f 100644 --- a/test-fixtures/src/bin/gzipped-response.rs +++ b/test-fixtures/src/bin/gzipped-response.rs @@ -87,6 +87,8 @@ fn main() -> Result<(), SendError> { streaming_body.write_bytes(tiny_bit); } + streaming_body.finish().unwrap(); + pending_req }; let mut unpacked_stream_async = unpacked_stream_pending.wait()?; diff --git a/test-fixtures/src/bin/streaming-response.rs b/test-fixtures/src/bin/streaming-response.rs index f21fd63f..a65e9f19 100644 --- a/test-fixtures/src/bin/streaming-response.rs +++ b/test-fixtures/src/bin/streaming-response.rs @@ -7,4 +7,6 @@ fn main() { for i in 0..1000 { writeln!(stream, "{}", i).unwrap(); } + + stream.finish().unwrap(); } diff --git a/test-fixtures/src/bin/upstream-streaming.rs b/test-fixtures/src/bin/upstream-streaming.rs index 60d3e3b1..a61462af 100644 --- a/test-fixtures/src/bin/upstream-streaming.rs +++ b/test-fixtures/src/bin/upstream-streaming.rs @@ -10,6 +10,6 @@ fn main() { writeln!(stream, "{}", i).unwrap(); } - drop(stream); + stream.finish().unwrap(); req.wait().unwrap().send_to_client(); }