Skip to content

Commit

Permalink
🕳 Implement finish semantics for streaming bodies (#203)
Browse files Browse the repository at this point in the history
* 🕳 Implement `finish` semantics for streaming bodies

Moving forward, streaming bodies must be affirmatively finished when Wasm programs are finished
writing to them. This helps prevent unexpected control flows due to error handling causing an
incomplete `transfer-encoding: chunked` streaming body from appearing complete to its recipient.
  • Loading branch information
acfoltzer authored Jan 18, 2023
1 parent ec6d3f2 commit c1fafea
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 58 deletions.
2 changes: 1 addition & 1 deletion cli/tests/integration/sending_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn responses_can_be_streamed_downstream() -> TestResult {
// accumulate the entire body to a vector
let mut body = Vec::new();
while let Some(chunk) = resp.data().await {
body.extend_from_slice(&chunk?);
body.extend_from_slice(&chunk.unwrap());
}

// work with the body as a string, breaking it into lines
Expand Down
22 changes: 14 additions & 8 deletions lib/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Body type, for request and response bodies.
use {
crate::error,
crate::{error, streaming_body::StreamingBodyItem, Error},
bytes::{BufMut, BytesMut},
flate2::write::GzDecoder,
futures::pin_mut,
Expand Down Expand Up @@ -36,7 +36,7 @@ pub enum Chunk {
/// Since the channel yields chunks, this variant represents a *stream* of chunks rather than
/// one individual chunk. That stream is effectively "flattened" on-demand, as the `Body`
/// containing it is read.
Channel(mpsc::Receiver<Chunk>),
Channel(mpsc::Receiver<StreamingBodyItem>),
/// A version of `HttpBody` that assumes that the interior data is gzip-compressed.
CompressedHttpBody(DecoderState, hyper::Body),
}
Expand Down Expand Up @@ -72,8 +72,8 @@ impl From<hyper::Body> for Chunk {
}
}

impl From<mpsc::Receiver<Chunk>> for Chunk {
fn from(chan: mpsc::Receiver<Chunk>) -> Self {
impl From<mpsc::Receiver<StreamingBodyItem>> for Chunk {
fn from(chan: mpsc::Receiver<StreamingBodyItem>) -> Self {
Chunk::Channel(chan)
}
}
Expand Down Expand Up @@ -209,11 +209,10 @@ impl HttpBody for Body {
return Poll::Pending;
}
Poll::Ready(None) => {
// no more chunks from this stream, so continue the loop now that it's been
// popped
continue;
// the channel completed without a Finish message, so yield an error
return Poll::Ready(Some(Err(Error::UnfinishedStreamingBody)));
}
Poll::Ready(Some(chunk)) => {
Poll::Ready(Some(StreamingBodyItem::Chunk(chunk))) => {
// put the channel back first, so we can poll it again after the chunk it
// just yielded
self.chunks.push_front(receiver.into());
Expand All @@ -222,6 +221,13 @@ impl HttpBody for Body {
self.chunks.push_front(chunk);
continue;
}
Poll::Ready(Some(StreamingBodyItem::Finished)) => {
// it shouldn't be possible for any more chunks to arrive on this
// channel, but just in case we won't try to read them; dropping the
// receiver means we won't hit the `Ready(None)` case above that
// indicates an unfinished streaming body
continue;
}
}
}
Chunk::CompressedHttpBody(ref mut decoder_state, ref mut body) => {
Expand Down
6 changes: 5 additions & 1 deletion lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ pub enum Error {

#[error("Invalid Object Store `key` value used: {0}.")]
ObjectStoreKeyValidationError(#[from] crate::object_store::KeyValidationError),

#[error("Unfinished streaming body")]
UnfinishedStreamingBody,
}

impl Error {
Expand Down Expand Up @@ -169,7 +172,8 @@ impl Error {
| Error::BackendNameRegistryError(_)
| Error::HttpError(_)
| Error::UnknownObjectStore(_)
| Error::ObjectStoreKeyValidationError(_) => FastlyStatus::Error,
| Error::ObjectStoreKeyValidationError(_)
| Error::UnfinishedStreamingBody => FastlyStatus::Error,
}
}

Expand Down
54 changes: 51 additions & 3 deletions lib/src/streaming_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,39 @@ const STREAMING_CHANNEL_SIZE: usize = 8;
/// The corresponding "read end" can be found in the [`Chunk`] type.
#[derive(Debug)]
pub struct StreamingBody {
sender: mpsc::Sender<Chunk>,
sender: mpsc::Sender<StreamingBodyItem>,
}

/// The items sent over the `StreamingBody` channel.
///
/// These are either a [`Chunk`] corresponding to a write, or else a "finish" message. The purpose
/// of the finish message is to ensure that we don't accidentally make incomplete messages appear
/// complete.
///
/// If the streaming body is associated with a `content-length` request or response, the finish
/// message is largely meaningless, as the content length provides the necessary framing information
/// required for recipients to recognize an incomplete message.
///
/// The situation is more delicate with `transfer-encoding: chunked` requests and responses. In
/// these cases, `hyper` will dutifully frame each chunk as it reads them from the `Body`. If the
/// `Body` suddenly returns `Ok(None)`, it will apply the proper `0\r\n\r\n` termination to the
/// message. The finish message ensures that this will only happen when the Wasm program
/// affirmitavely marks the body as finished.
#[derive(Debug)]
pub enum StreamingBodyItem {
Chunk(Chunk),
Finished,
}

impl From<Chunk> for StreamingBodyItem {
fn from(chunk: Chunk) -> Self {
Self::Chunk(chunk)
}
}

impl StreamingBody {
/// Create a new channel for streaming a body, returning write and read ends as a pair.
pub fn new() -> (StreamingBody, mpsc::Receiver<Chunk>) {
pub fn new() -> (StreamingBody, mpsc::Receiver<StreamingBodyItem>) {
let (sender, receiver) = mpsc::channel(STREAMING_CHANNEL_SIZE);
(StreamingBody { sender }, receiver)
}
Expand All @@ -29,7 +56,7 @@ impl StreamingBody {
/// sending, e.g. due to the receive end being closed.
pub async fn send_chunk(&mut self, chunk: impl Into<Chunk>) -> Result<(), Error> {
self.sender
.send(chunk.into())
.send(Chunk::from(chunk.into()).into())
.await
.map_err(|_| Error::StreamingChunkSend)
}
Expand All @@ -38,4 +65,25 @@ impl StreamingBody {
pub async fn await_ready(&mut self) {
let _ = self.sender.reserve().await;
}

/// Mark this streaming body as finished, so that it will be terminated correctly.
///
/// This is important primarily for `Transfer-Encoding: chunked` bodies where a premature close
/// is only noticed if the chunked encoding is not properly terminated.
pub fn finish(self) -> Result<(), Error> {
match self.sender.try_send(StreamingBodyItem::Finished) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Closed(_)) => Ok(()),
Err(mpsc::error::TrySendError::Full(_)) => {
// If the channel is full, maybe the other end is just taking a while to receive all
// the bytes. Spawn a task that will send a `finish` message as soon as there's room
// in the channel.
let sender = self.sender.clone();
tokio::task::spawn(async move {
let _ = sender.send(StreamingBodyItem::Finished).await;
});
Ok(())
}
}
}
}
8 changes: 6 additions & 2 deletions lib/src/wiggle_abi/body_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ impl FastlyHttpBody for Session {

fn close(&mut self, body_handle: BodyHandle) -> Result<(), Error> {
// Drop the body and pass up an error if the handle does not exist
self.drop_body(body_handle)?;
Ok(())
if self.is_streaming_body(body_handle) {
// Make sure a streaming body gets a `finish` message
self.take_streaming_body(body_handle)?.finish()
} else {
Ok(self.drop_body(body_handle)?)
}
}
}
72 changes: 33 additions & 39 deletions test-fixtures/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions test-fixtures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ license = "Apache-2.0 WITH LLVM-exception"
publish = false

[dependencies]
fastly = "=0.8.9"
fastly-shared = "=0.8.9"
fastly-sys = "=0.8.9"
fastly = "^0.9.1"
fastly-shared = "^0.9.1"
fastly-sys = "^0.9.1"
bytes = "1.0.0"
http = "0.2.1"
serde = "1.0.114"
2 changes: 2 additions & 0 deletions test-fixtures/src/bin/gzipped-response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ fn main() -> Result<(), SendError> {
streaming_body.write_bytes(tiny_bit);
}

streaming_body.finish().unwrap();

pending_req
};
let mut unpacked_stream_async = unpacked_stream_pending.wait()?;
Expand Down
2 changes: 2 additions & 0 deletions test-fixtures/src/bin/streaming-response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ fn main() {
for i in 0..1000 {
writeln!(stream, "{}", i).unwrap();
}

stream.finish().unwrap();
}
2 changes: 1 addition & 1 deletion test-fixtures/src/bin/upstream-streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ fn main() {
writeln!(stream, "{}", i).unwrap();
}

drop(stream);
stream.finish().unwrap();
req.wait().unwrap().send_to_client();
}

0 comments on commit c1fafea

Please sign in to comment.