From c6cd67b3f4cb8174eea1d0c8f36a6a5dcbb71b67 Mon Sep 17 00:00:00 2001 From: Mohammad Nejati Date: Sat, 4 Jan 2025 06:59:00 +0000 Subject: [PATCH] parser accepts elastic and sink bodies --- include/boost/http_proto/file_body.hpp | 10 +- include/boost/http_proto/parser.hpp | 49 +- src/file_body.cpp | 26 +- src/parser.cpp | 1316 +++++++++++------------- test/unit/parser.cpp | 65 +- test/unit/zlib.cpp | 236 +++-- 6 files changed, 856 insertions(+), 846 deletions(-) diff --git a/include/boost/http_proto/file_body.hpp b/include/boost/http_proto/file_body.hpp index 8eae7b3c..a42e5ef3 100644 --- a/include/boost/http_proto/file_body.hpp +++ b/include/boost/http_proto/file_body.hpp @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -20,7 +21,7 @@ namespace http_proto { class BOOST_SYMBOL_VISIBLE file_body - : public source + : public source, public sink { file f_; std::uint64_t n_; @@ -45,9 +46,14 @@ class BOOST_SYMBOL_VISIBLE std::uint64_t(-1)) noexcept; BOOST_HTTP_PROTO_DECL - results + source::results on_read( buffers::mutable_buffer b) override; + + BOOST_HTTP_PROTO_DECL + sink::results + on_write( + buffers::const_buffer b, bool more) override; }; } // http_proto diff --git a/include/boost/http_proto/parser.hpp b/include/boost/http_proto/parser.hpp index 7e0473cd..37d7255d 100644 --- a/include/boost/http_proto/parser.hpp +++ b/include/boost/http_proto/parser.hpp @@ -12,24 +12,23 @@ #define BOOST_HTTP_PROTO_PARSER_HPP #include -#include -#include -#include #include #include #include +#include +#include +#include + +#include #include #include #include #include #include -#include #include + #include #include -#include -#include -#include namespace boost { namespace http_proto { @@ -185,7 +184,7 @@ class BOOST_SYMBOL_VISIBLE bool is_complete() const noexcept { - return st_ == state::complete; + return st_ >= state::complete_in_place; } /** Returns `true` if the end of the stream was reached. @@ -209,8 +208,7 @@ class BOOST_SYMBOL_VISIBLE { return st_ == state::reset || - ( st_ == state::complete && - got_eof_); + (st_ >= state::complete_in_place && got_eof_); } //-------------------------------------------- @@ -366,30 +364,42 @@ class BOOST_SYMBOL_VISIBLE friend class response_parser; detail::header const* - safe_get_header() const; - bool is_plain() const noexcept; - void on_headers(system::error_code&); - BOOST_HTTP_PROTO_DECL void on_set_body(); - void init_dynamic(system::error_code&); + safe_get_header() const; + + bool + is_plain() const noexcept; + + void + on_headers(system::error_code&); + + BOOST_HTTP_PROTO_DECL + void + on_set_body(); + + std::size_t + apply_filter( + system::error_code&, + std::size_t, + bool); static constexpr unsigned buffers_N = 8; enum class state { - // order matters reset, start, header, body, set_body, + complete_in_place, complete }; enum class how { in_place, + sink, elastic, - sink }; context& ctx_; @@ -397,7 +407,7 @@ class BOOST_SYMBOL_VISIBLE detail::workspace ws_; detail::header h_; - std::uint64_t body_avail_ = 0; + std::size_t body_avail_ = 0; std::uint64_t body_total_ = 0; std::uint64_t payload_remain_ = 0; std::uint64_t chunk_remain_ = 0; @@ -421,7 +431,6 @@ class BOOST_SYMBOL_VISIBLE // `const_buffers_type` from relevant functions buffers::const_buffer_pair cbp_; - buffers::circular_buffer* body_buf_ = nullptr; buffers::any_dynamic_buffer* eb_ = nullptr; detail::filter* filter_ = nullptr; sink* sink_ = nullptr; @@ -429,10 +438,10 @@ class BOOST_SYMBOL_VISIBLE state st_ = state::start; how how_ = how::in_place; bool got_eof_ = false; -// bool need_more_; bool head_response_ = false; bool needs_chunk_close_ = false; bool trailer_headers_ = false; + bool chunked_body_ended = false; }; //------------------------------------------------ diff --git a/src/file_body.cpp b/src/file_body.cpp index 3ed44739..f333008b 100644 --- a/src/file_body.cpp +++ b/src/file_body.cpp @@ -34,9 +34,9 @@ auto file_body:: on_read( buffers::mutable_buffer b) -> - results + source::results { - results rv; + source::results rv; if(n_ > 0) { std::size_t n; @@ -53,5 +53,27 @@ on_read( return rv; } +auto +file_body:: +on_write( + buffers::const_buffer b, bool) -> + sink::results +{ + sink::results rv; + if(n_ > 0) + { + std::size_t n; + if( n_ >= b.size()) + n = b.size(); + else + n = static_cast(n_); + n = f_.write( + b.data(), n, rv.ec); + rv.bytes = n; + n_ -= n; + } + return rv; +} + } // http_proto } // boost diff --git a/src/parser.cpp b/src/parser.cpp index 0b781bd9..e45a1809 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -15,21 +15,15 @@ #include #include -#include "detail/filter.hpp" - +#include #include #include #include #include - #include #include -#include - -#include -#include -#include +#include "detail/filter.hpp" namespace boost { namespace http_proto { @@ -346,7 +340,10 @@ skip_trailer_headers( template std::size_t -clamp(UInt x, std::size_t limit) noexcept +clamp( + UInt x, + std::size_t limit = std::numeric_limits< + std::size_t>::max()) noexcept { if(x >= limit) return limit; @@ -528,59 +525,54 @@ start_impl( // current message is incomplete detail::throw_logic_error(); + case state::complete_in_place: + // remove available body. + if(is_plain()) + cb0_.consume(body_avail_); + BOOST_FALLTHROUGH; + case state::complete: { - // remove partial body. - if(is_plain() && (how_ == how::in_place)) - cb0_.consume( - static_cast(body_avail_)); - - if(cb0_.size() > 0) - { - // move unused octets to front + // move leftovers to front - ws_.clear(); - leftover = cb0_.size(); + ws_.clear(); + leftover = cb0_.size(); - auto* dest = reinterpret_cast(ws_.data()); - auto cbp = cb0_.data(); - auto* a = static_cast(cbp[0].data()); - auto* b = static_cast(cbp[1].data()); - auto an = cbp[0].size(); - auto bn = cbp[1].size(); + auto* dest = reinterpret_cast(ws_.data()); + auto cbp = cb0_.data(); + auto* a = static_cast(cbp[0].data()); + auto* b = static_cast(cbp[1].data()); + auto an = cbp[0].size(); + auto bn = cbp[1].size(); - if(bn == 0) - { - std::memmove(dest, a, an); - } - else - { - // if `a` can fit between `dest` and `b`, shift `b` to the left - // and copy `a` to its position. if `a` fits perfectly, the - // shift will be of size 0. - // if `a` requires more space, shift `b` to the right and - // copy `a` to its position. this process may require multiple - // iterations and should be done chunk by chunk to prevent `b` - // from overlapping with `a`. - do - { - // clamp right shifts to prevent overlap with `a` - auto* bp = (std::min)(dest + an, const_cast(a) - bn); - b = static_cast(std::memmove(bp, b, bn)); - - // a chunk or all of `a` based on available space - auto chunk_a = static_cast(b - dest); - std::memcpy(dest, a, chunk_a); // never overlap - an -= chunk_a; - dest += chunk_a; - a += chunk_a; - } while(an); - } + if(bn == 0) + { + std::memmove(dest, a, an); } else { - // leftover data after body + // if `a` can fit between `dest` and `b`, shift `b` to the left + // and copy `a` to its position. if `a` fits perfectly, the + // shift will be of size 0. + // if `a` requires more space, shift `b` to the right and + // copy `a` to its position. this process may require multiple + // iterations and should be done chunk by chunk to prevent `b` + // from overlapping with `a`. + do + { + // clamp right shifts to prevent overlap with `a` + auto* bp = (std::min)(dest + an, const_cast(a) - bn); + b = static_cast(std::memmove(bp, b, bn)); + + // a chunk or all of `a` based on available space + auto chunk_a = static_cast(b - dest); + std::memcpy(dest, a, chunk_a); // never overlap + an -= chunk_a; + dest += chunk_a; + a += chunk_a; + } while(an); } + break; } } @@ -589,16 +581,14 @@ start_impl( fb_ = { ws_.data(), - svc_.cfg.headers.max_size + - svc_.cfg.min_buffer, + svc_.cfg.headers.max_size + svc_.cfg.min_buffer, leftover }; - BOOST_ASSERT(fb_.capacity() == - svc_.max_overread() - leftover); - h_ = detail::header( - detail::empty{h_.kind}); - h_.buf = reinterpret_cast< - char*>(ws_.data()); + BOOST_ASSERT( + fb_.capacity() == svc_.max_overread() - leftover); + + h_ = detail::header(detail::empty{h_.kind}); + h_.buf = reinterpret_cast(ws_.data()); h_.cbuf = h_.buf; h_.cap = ws_.size(); @@ -613,7 +603,9 @@ start_impl( chunk_remain_ = 0; needs_chunk_close_ = false; trailer_headers_ = false; + chunked_body_ended = false; filter_ = nullptr; + sink_ = nullptr; body_avail_ = 0; body_total_ = 0; } @@ -638,160 +630,123 @@ prepare() -> case state::header: { - BOOST_ASSERT(h_.size < - svc_.cfg.headers.max_size); - auto n = fb_.capacity() - fb_.size(); + BOOST_ASSERT( + h_.size < svc_.cfg.headers.max_size); + std::size_t n = fb_.capacity() - fb_.size(); BOOST_ASSERT(n <= svc_.max_overread()); - if( n > svc_.cfg.max_prepare) - n = svc_.cfg.max_prepare; + n = clamp(n, svc_.cfg.max_prepare); mbp_[0] = fb_.prepare(n); nprepare_ = n; - return mutable_buffers_type( - &mbp_[0], 1); + return mutable_buffers_type(&mbp_[0], 1); } case state::body: { if(got_eof_) - return mutable_buffers_type{}; + { + // forgot to call parse() + detail::throw_logic_error(); + } - do_body: if(! is_plain()) { // buffered payload - auto n = cb0_.capacity(); - if( n > svc_.cfg.max_prepare) - n = svc_.cfg.max_prepare; - mbp_ = cb0_.prepare(n); + std::size_t n = cb0_.capacity(); + n = clamp(n, svc_.cfg.max_prepare); nprepare_ = n; + mbp_ = cb0_.prepare(n); return mutable_buffers_type(mbp_); } - - // plain payload - - if(how_ == how::in_place) + else { - auto n = cb0_.capacity(); - if( n > svc_.cfg.max_prepare) - n = svc_.cfg.max_prepare; - - // TODO: payload_remain_ + svc_.max_overread() might overflow - if( h_.md.payload == payload::size && - n > payload_remain_ + svc_.max_overread()) - n = static_cast( - payload_remain_ + svc_.max_overread()); + switch(how_) + { + default: + case how::in_place: + case how::sink: + { + std::size_t n = cb0_.capacity(); + n = clamp(n, svc_.cfg.max_prepare); - mbp_ = cb0_.prepare(n); - nprepare_ = n; - return mutable_buffers_type(mbp_); - } + if(h_.md.payload == payload::size) + { + if(n > payload_remain_) + { + std::size_t overread = + n - static_cast(payload_remain_); + if(overread > svc_.max_overread()) + n = payload_remain_ + svc_.max_overread(); + } + } + else + { + BOOST_ASSERT( + h_.md.payload == payload::to_eof); - if(how_ == how::elastic) - { - // Overreads are not allowed, or - // else the caller will see extra - // unrelated data. + n = clamp( + svc_.cfg.body_limit - body_total_ + 1, n); + } - if(h_.md.payload == payload::size) - { - // set_body moves avail to dyn - BOOST_ASSERT(body_buf_->size() == 0); - BOOST_ASSERT(body_avail_ == 0); - auto n = static_cast(payload_remain_); - if( n > svc_.cfg.max_prepare) - n = svc_.cfg.max_prepare; nprepare_ = n; - return eb_->prepare(n); + mbp_ = cb0_.prepare(n); + return mutable_buffers_type(mbp_); } - - BOOST_ASSERT( - h_.md.payload == payload::to_eof); - std::size_t n = 0; - if(! got_eof_) + case how::elastic: { - // calculate n heuristically - n = svc_.cfg.min_buffer; - if( n > svc_.cfg.max_prepare) - n = svc_.cfg.max_prepare; - { - // apply max_size() - auto avail = - eb_->max_size() - - eb_->size(); - if( n > avail) - n = avail; - } - // fill capacity() first, - // to avoid an allocation + BOOST_ASSERT(cb0_.size() == 0); + BOOST_ASSERT(body_avail_ == 0); + + std::size_t n = svc_.cfg.min_buffer; + + if(h_.md.payload == payload::size) { - auto avail = - eb_->capacity() - - eb_->size(); - if( n > avail && - avail != 0) - n = avail; + // Overreads are not allowed, or + // else the caller will see extra + // unrelated data. + n = clamp(n, payload_remain_); } - if(n == 0) + else { - // dynamic buffer is full - // attempt a 1 byte read so - // we can detect overflow BOOST_ASSERT( - body_buf_->size() == 0); - // handled in init_dynamic - BOOST_ASSERT( - body_avail_ == 0); - mbp_ = body_buf_->prepare(1); - nprepare_ = 1; - return - mutable_buffers_type(mbp_); + h_.md.payload == payload::to_eof); + n = clamp( + svc_.cfg.body_limit - body_total_, n); + n = clamp( + n, eb_->max_size() - eb_->size()); + // fill capacity first to avoid an allocation + std::size_t avail = + eb_->capacity() - eb_->size(); + if(avail != 0) + n = clamp(n, avail); + + if(n == 0) + { + // dynamic buffer is full + // attempt a 1 byte read so + // we can detect overflow + nprepare_ = 1; + mbp_ = cb0_.prepare(1); + return mutable_buffers_type(mbp_); + } } + + n = clamp(n, svc_.cfg.max_prepare); + BOOST_ASSERT(n != 0); + nprepare_ = n; + return eb_->prepare(n); + } } - nprepare_ = n; - return eb_->prepare(n); } - - // VFALCO TODO - detail::throw_logic_error(); } case state::set_body: - { - if(how_ == how::elastic) - { - // attempt to transfer in-place - // body into the dynamic buffer. - system::error_code ec; - init_dynamic(ec); - if(! ec.failed()) - { - if(st_ == state::body) - goto do_body; - BOOST_ASSERT( - st_ == state::complete); - return mutable_buffers_type{}; - } - - // not enough room, so we - // return this error from parse() - return - mutable_buffers_type{}; - } - - if(how_ == how::sink) - { - // this is a no-op, to get the - // caller to call parse next. - return mutable_buffers_type{}; - } - - // VFALCO TODO + // forgot to call parse() detail::throw_logic_error(); - } + case state::complete_in_place: case state::complete: - // intended no-op - return mutable_buffers_type{}; + // already complete + detail::throw_logic_error(); } } @@ -800,6 +755,19 @@ parser:: commit( std::size_t n) { + if(n > nprepare_) + { + // n can't be greater than size of + // the buffers returned by prepare() + detail::throw_invalid_argument(); + } + + if(got_eof_) + { + // can't commit after EOF + detail::throw_logic_error(); + } + switch(st_) { default: @@ -817,19 +785,6 @@ commit( case state::header: { - if(n > nprepare_) - { - // n can't be greater than size of - // the buffers returned by prepare() - detail::throw_invalid_argument(); - } - - if(got_eof_) - { - // can't commit after EOF - detail::throw_logic_error(); - } - nprepare_ = 0; // invalidate fb_.commit(n); break; @@ -837,124 +792,38 @@ commit( case state::body: { - if(n > nprepare_) - { - // n can't be greater than size of - // the buffers returned by prepare() - detail::throw_invalid_argument(); - } - - BOOST_ASSERT(! got_eof_ || n == 0); - - if(! is_plain()) - { - // buffered payload - cb0_.commit(n); - break; - } - - // plain payload - - if(how_ == how::in_place) - { - BOOST_ASSERT(body_buf_ == &cb0_); - cb0_.commit(n); - if(h_.md.payload == payload::size) - { - if(n < payload_remain_) - { - body_avail_ += n; - body_total_ += n; - payload_remain_ -= n; - break; - } - body_avail_ += payload_remain_; - body_total_ += payload_remain_; - payload_remain_ = 0; - st_ = state::complete; - break; - } - - BOOST_ASSERT( - h_.md.payload == payload::to_eof); - body_avail_ += n; - body_total_ += n; - break; - } - - if(how_ == how::elastic) + nprepare_ = 0; // invalidate + if(is_plain() && how_ == how::elastic) { - if(eb_->size() < eb_->max_size()) - { - BOOST_ASSERT(body_avail_ == 0); - BOOST_ASSERT( - body_buf_->size() == 0); - eb_->commit(n); - } - else + if(eb_->max_size() == eb_->size()) { - // If we get here then either - // n==0 as a no-op, or n==1 for - // an intended one byte read. + // borrowed 1 byte from cb0_ BOOST_ASSERT(n <= 1); - body_buf_->commit(n); - body_avail_ += n; - } - body_total_ += n; - if(h_.md.payload == payload::size) - { - BOOST_ASSERT( - n <= payload_remain_); - payload_remain_ -= n; - if(payload_remain_ == 0) - st_ = state::complete; + cb0_.commit(n); + break; } - break; + eb_->commit(n); + payload_remain_ -= n; + body_total_ += n; } - - if(how_ == how::sink) + else { cb0_.commit(n); - break; } break; } case state::set_body: { - if(n > nprepare_) - { - // n can't be greater than size of - // the buffers returned by prepare() - detail::throw_invalid_argument(); - } - - BOOST_ASSERT(is_plain()); - BOOST_ASSERT(n == 0); - if( how_ == how::elastic || - how_ == how::sink) - { - // intended no-op - break; - } - - // VFALCO TODO + // forgot to call parse() detail::throw_logic_error(); } + case state::complete_in_place: case state::complete: { - BOOST_ASSERT(nprepare_ == 0); - - if(n > 0) - { - // n can't be greater than size of - // the buffers returned by prepare() - detail::throw_invalid_argument(); - } - - // intended no-op - break; + // already complete + detail::throw_logic_error(); } } } @@ -985,9 +854,10 @@ commit_eof() break; case state::set_body: - got_eof_ = true; - break; + // forgot to call parse() + detail::throw_logic_error(); + case state::complete_in_place: case state::complete: // can't commit eof when complete detail::throw_logic_error(); @@ -1035,7 +905,7 @@ parse( if(fb_.size() == 0) { // stream closed cleanly - st_ = state::complete; + st_ = state::complete_in_place; ec = BOOST_HTTP_PROTO_ERR( error::end_of_stream); return; @@ -1063,7 +933,7 @@ parse( on_headers(ec); if(ec.failed()) return; - if(st_ == state::complete) + if(st_ == state::complete_in_place) break; BOOST_FALLTHROUGH; @@ -1073,422 +943,357 @@ parse( { do_body: BOOST_ASSERT(st_ == state::body); - BOOST_ASSERT( - h_.md.payload != payload::none); - BOOST_ASSERT( - h_.md.payload != payload::error); + BOOST_ASSERT(h_.md.payload != payload::none); + BOOST_ASSERT(h_.md.payload != payload::error); - if( h_.md.payload == payload::chunked ) + auto set_state_to_complete = [&]() { - if( how_ == how::in_place ) + if(how_ == how::in_place) { - for(;;) + st_ = state::complete_in_place; + return; + } + st_ = state::complete; + }; + + if(h_.md.payload == payload::chunked) + { + for(;;) + { + if(chunk_remain_ == 0 + && !chunked_body_ended) { - if( chunk_remain_ == 0 ) + if(cb0_.size() == 0) { - auto cs = chained_sequence(cb0_.data()); + ec = BOOST_HTTP_PROTO_ERR( + error::need_data); + return; + } - if( needs_chunk_close_ ) - { - parse_eol(cs, ec); - if(ec) - return; - } - else if( trailer_headers_ ) - { - skip_trailer_headers(cs, ec); - if(ec) - return; - cb0_.consume(cb0_.size() - cs.size()); - st_ = state::complete; - return; - } + auto cs = chained_sequence(cb0_.data()); - auto chunk_size = parse_hex(cs, ec); + if(needs_chunk_close_) + { + parse_eol(cs, ec); if(ec) return; - - // chunk extensions are skipped - find_eol(cs, ec); + } + else if(trailer_headers_) + { + skip_trailer_headers(cs, ec); if(ec) return; - cb0_.consume(cb0_.size() - cs.size()); - chunk_remain_ = chunk_size; - - needs_chunk_close_ = true; - if( chunk_remain_ == 0 ) - { - needs_chunk_close_ = false; - trailer_headers_ = true; - continue; - } + chunked_body_ended = true; + continue; } + + auto chunk_size = parse_hex(cs, ec); + if(ec) + return; - if( cb0_.size() == 0 ) - { - ec = BOOST_HTTP_PROTO_ERR( - error::need_data); + // skip chunk extensions + find_eol(cs, ec); + if(ec) return; + + cb0_.consume(cb0_.size() - cs.size()); + chunk_remain_ = chunk_size; + + needs_chunk_close_ = true; + if(chunk_remain_ == 0) + { + needs_chunk_close_ = false; + trailer_headers_ = true; + continue; } + } - if( cb1_.capacity() == 0 ) + if(cb0_.size() == 0 && !chunked_body_ended) + { + if(got_eof_) { ec = BOOST_HTTP_PROTO_ERR( - error::in_place_overflow); + error::incomplete); + st_ = state::reset; return; } - auto chunk = buffers::prefix(cb0_.data(), - clamp(chunk_remain_, cb0_.size())); + ec = BOOST_HTTP_PROTO_ERR( + error::need_data); + return; + } - if( filter_ ) - { - // TODO: gather available chunks and provide - // them as a const_buffer_span - auto rs = filter_->process( - cb1_.prepare(cb1_.capacity()), - chunk, - !trailer_headers_); + if(filter_) + { + chunk_remain_ -= apply_filter( + ec, + clamp(chunk_remain_, cb0_.size()), + !chunked_body_ended); - chunk_remain_ -= rs.in_bytes; - body_avail_ += rs.out_bytes; - body_total_ += rs.out_bytes; - cb0_.consume(rs.in_bytes); - cb1_.commit(rs.out_bytes); + if(ec || chunked_body_ended) + return; + } + else + { + const std::size_t chunk_avail = + clamp(chunk_remain_, cb0_.size()); + const auto chunk = + buffers::prefix(cb0_.data(), chunk_avail); - if( rs.ec.failed() || - (rs.finished && chunk_remain_ != 0) ) - { - ec = BOOST_HTTP_PROTO_ERR( - error::bad_payload); - return; - } + if(svc_.cfg.body_limit - body_total_ + < chunk_avail) + { + ec = BOOST_HTTP_PROTO_ERR( + error::body_too_large); + st_ = state::reset; + return; } - else + + switch(how_) + { + case how::in_place: { auto copied = buffers::buffer_copy( - cb1_.prepare(cb1_.capacity()), chunk); + cb1_.prepare(cb1_.capacity()), + chunk); chunk_remain_ -= copied; body_avail_ += copied; body_total_ += copied; cb0_.consume(copied); cb1_.commit(copied); + if(cb1_.capacity() == 0 + && !chunked_body_ended) + { + ec = BOOST_HTTP_PROTO_ERR( + error::in_place_overflow); + return; + } + break; + } + case how::sink: + { + auto sink_rs = sink_->write( + chunk, !chunked_body_ended); + chunk_remain_ -= sink_rs.bytes; + body_total_ += sink_rs.bytes; + cb0_.consume(sink_rs.bytes); + if(sink_rs.ec.failed()) + { + body_avail_ += + chunk_avail - sink_rs.bytes; + ec = sink_rs.ec; + st_ = state::reset; + return; + } + break; + } + case how::elastic: + { + if(eb_->max_size() - eb_->size() + < chunk_avail) + { + ec = BOOST_HTTP_PROTO_ERR( + error::buffer_overflow); + st_ = state::reset; + return; + } + buffers::buffer_copy( + eb_->prepare(chunk_avail), + chunk); + chunk_remain_ -= chunk_avail; + body_total_ += chunk_avail; + cb0_.consume(chunk_avail); + eb_->commit(chunk_avail); + break; + } } - if( body_total_ > svc_.cfg.body_limit ) + if(chunked_body_ended) { - ec = BOOST_HTTP_PROTO_ERR( - error::body_too_large); - st_ = state::reset; // unrecoverable + set_state_to_complete(); return; } } } - else - { - // TODO - detail::throw_logic_error(); - } } - else if( filter_ ) + else { - if( how_ == how::in_place ) - { - auto rs = [&]() -> detail::filter::results - { - if( h_.md.payload == payload::size ) - { - auto rv = filter_->process( - body_buf_->prepare(body_buf_->capacity()), - buffers::prefix(cb0_.data(), clamp( - payload_remain_, cb0_.size())), - cb0_.size() < payload_remain_); - - payload_remain_ -= rv.in_bytes; - return rv; - } - BOOST_ASSERT(h_.md.payload == payload::to_eof); - return filter_->process( - body_buf_->prepare(body_buf_->capacity()), - cb0_.data(), - !got_eof_); - }(); + // non-chunked payload - ec = rs.ec; - body_avail_ += rs.out_bytes; - body_total_ += rs.out_bytes; - cb0_.consume(rs.in_bytes); - body_buf_->commit(rs.out_bytes); + const std::size_t payload_avail = [&]() + { + auto ret = cb0_.size(); + if(!filter_) + ret -= body_avail_; + if(h_.md.payload == payload::size) + return clamp(payload_remain_, ret); + // payload::eof + return ret; + }(); + + const bool is_complete = [&]() + { + if(h_.md.payload == payload::size) + return payload_avail == payload_remain_; + // payload::eof + return got_eof_; + }(); - if( body_total_ > svc_.cfg.body_limit ) - { - ec = BOOST_HTTP_PROTO_ERR( - error::body_too_large); - st_ = state::reset; // unrecoverable + if(filter_) + { + payload_remain_ -= apply_filter( + ec, payload_avail, !is_complete); + if(ec || is_complete) return; - } - - if( ec.failed() ) + } + else + { + switch(how_) { - st_ = state::reset; // unrecoverable - return; - } - - if( rs.finished ) + case how::in_place: { - if( !got_eof_ && - h_.md.payload == payload::to_eof ) + payload_remain_ -= payload_avail; + body_avail_ += payload_avail; + body_total_ += payload_avail; + if(cb0_.capacity() == 0 && !is_complete) { ec = BOOST_HTTP_PROTO_ERR( - error::need_data); + error::in_place_overflow); return; } - - st_ = state::complete; - return; - } - - if( body_buf_->capacity() == 0 ) - { - ec = BOOST_HTTP_PROTO_ERR( - error::in_place_overflow); - return; - } - - if( got_eof_ ) - { - ec = BOOST_HTTP_PROTO_ERR( - error::incomplete); - st_ = state::reset; // unrecoverable - return; + break; } - - ec = BOOST_HTTP_PROTO_ERR( - error::need_data); - return; - } - else - { - // TODO - detail::throw_logic_error(); - } - } - - if(how_ == how::in_place) - { - if(h_.md.payload == payload::size) - { - if(body_avail_ < - h_.md.payload_size) + case how::sink: { - if(got_eof_) + payload_remain_ -= payload_avail; + body_total_ += payload_avail; + auto sink_rs = sink_->write( + buffers::prefix( + cb0_.data(), + payload_avail), + !is_complete); + cb0_.consume(sink_rs.bytes); + if(sink_rs.ec.failed()) { - // incomplete - ec = BOOST_HTTP_PROTO_ERR( - error::incomplete); + body_avail_ += + payload_avail - sink_rs.bytes; + ec = sink_rs.ec; + st_ = state::reset; return; } - if(body_buf_->capacity() == 0) + break; + } + case how::elastic: + { + // payload_remain_ and body_total_ + // are already updated in commit() + + if(cb0_.size() != 0) { - // in_place buffer limit - ec = BOOST_HTTP_PROTO_ERR( - error::in_place_overflow); + // a non-empty cb0_ indicates that + // the elastic buffer ran out of space + ec = BOOST_HTTP_PROTO_ERR( + error::buffer_overflow); + st_ = state::reset; return; } - ec = BOOST_HTTP_PROTO_ERR( - error::need_data); - return; + break; + } } - BOOST_ASSERT(body_avail_ == - h_.md.payload_size); - st_ = state::complete; - break; - } - if( body_total_ > svc_.cfg.body_limit ) - { - ec = BOOST_HTTP_PROTO_ERR( - error::body_too_large); - st_ = state::reset; // unrecoverable - return; - } - if( ! got_eof_ ) - { - ec = BOOST_HTTP_PROTO_ERR( - error::need_data); - return; - } - BOOST_ASSERT(got_eof_); - st_ = state::complete; - break; - } - if(how_ == how::elastic) - { - // state already updated in commit - if(h_.md.payload == payload::size) - { - BOOST_ASSERT(body_total_ < - h_.md.payload_size); - BOOST_ASSERT(payload_remain_ > 0); - if(body_avail_ != 0) + if(body_total_ > svc_.cfg.body_limit) { - BOOST_ASSERT( - eb_->max_size() - - eb_->size() < - payload_remain_); - ec = BOOST_HTTP_PROTO_ERR( - error::buffer_overflow); - st_ = state::reset; // unrecoverable + ec = BOOST_HTTP_PROTO_ERR( + error::body_too_large); + st_ = state::reset; return; } - if(got_eof_) + + if(is_complete) { - ec = BOOST_HTTP_PROTO_ERR( - error::incomplete); - st_ = state::reset; // unrecoverable + set_state_to_complete(); return; } - return; } - BOOST_ASSERT( - h_.md.payload == payload::to_eof); - if( eb_->size() == eb_->max_size() && - body_avail_ > 0) + + if(h_.md.payload == payload::size && got_eof_) { - // got here from the 1-byte read ec = BOOST_HTTP_PROTO_ERR( - error::buffer_overflow); - st_ = state::reset; // unrecoverable + error::incomplete); + st_ = state::reset; return; } - if(got_eof_) - { - BOOST_ASSERT(body_avail_ == 0); - st_ = state::complete; - break; - } - BOOST_ASSERT(body_avail_ == 0); - break; + + ec = BOOST_HTTP_PROTO_ERR( + error::need_data); + return; } - // VFALCO TODO - detail::throw_logic_error(); + break; } case state::set_body: + case state::complete_in_place: { - // transfer in_place data into set body + auto& body_buf = is_plain() ? cb0_ : cb1_; - if(how_ == how::elastic) - { - init_dynamic(ec); - if(! ec.failed()) - { - if(st_ == state::body) - goto do_body; - BOOST_ASSERT( - st_ == state::complete); - break; - } - st_ = state::reset; // unrecoverable - return; - } + BOOST_ASSERT(body_avail_ == body_buf.size()); + BOOST_ASSERT(body_total_ == body_avail_); - if(how_ == how::sink) - { - auto n = body_buf_->size(); - if(h_.md.payload == payload::size) - { - // sink_->size_hint(h_.md.payload_size, ec); - - if(n < h_.md.payload_size) - { - auto rv = sink_->write( - body_buf_->data(), false); - BOOST_ASSERT(rv.ec.failed() || - rv.bytes == body_buf_->size()); - BOOST_ASSERT( - rv.bytes >= body_avail_); - BOOST_ASSERT( - rv.bytes < payload_remain_); - body_buf_->consume(rv.bytes); - body_avail_ -= rv.bytes; - body_total_ += rv.bytes; - payload_remain_ -= rv.bytes; - if(rv.ec.failed()) - { - ec = rv.ec; - st_ = state::reset; // unrecoverable - return; - } - st_ = state::body; - goto do_body; - } - - n = static_cast(h_.md.payload_size); - } - // complete - BOOST_ASSERT(body_buf_ == &cb0_); - auto rv = sink_->write( - body_buf_->data(), true); - BOOST_ASSERT(rv.ec.failed() || - rv.bytes == body_buf_->size()); - body_buf_->consume(rv.bytes); - if(rv.ec.failed()) - { - ec = rv.ec; - st_ = state::reset; // unrecoverable - return; - } - st_ = state::complete; - return; - } - - // VFALCO TODO - detail::throw_logic_error(); - } - - case state::complete: - { - // This is a no-op except when set_body - // was called and we have in-place data. switch(how_) { - default: case how::in_place: - break; - - case how::elastic: + return; // no-op + case how::sink: { - if(body_buf_->size() == 0) - break; - BOOST_ASSERT(eb_->size() == 0); - auto n = buffers::buffer_copy( - eb_->prepare( - body_buf_->size()), - body_buf_->data()); - body_buf_->consume(n); + auto rs = sink_->write( + buffers::prefix( + body_buf.data(), + body_avail_), + st_ == state::set_body); + body_buf.consume(rs.bytes); + body_avail_ -= rs.bytes; + if(rs.ec.failed()) + { + ec = rs.ec; + st_ = state::reset; + return; + } break; } - - case how::sink: + case how::elastic: { - if(body_buf_->size() == 0) - break; - auto rv = sink_->write( - body_buf_->data(), false); - body_buf_->consume(rv.bytes); - if(rv.ec.failed()) + if(eb_->max_size() - eb_->size() + < body_avail_) { - ec = rv.ec; - st_ = state::reset; // unrecoverable + ec = BOOST_HTTP_PROTO_ERR( + error::buffer_overflow); return; } + buffers::buffer_copy( + eb_->prepare(body_avail_), + body_buf.data()); + body_buf.consume(body_avail_); + eb_->commit(body_avail_); + body_avail_ = 0; + // TODO: expand cb_0 when possible? break; } } + + if(st_ == state::set_body) + { + st_ = state::body; + goto do_body; + } + + st_ = state::complete; + break; } + + case state::complete: + break; } } @@ -1502,12 +1307,11 @@ pull_body() -> switch(st_) { case state::body: - case state::complete: - if(how_ != how::in_place) - detail::throw_logic_error(); - cbp_ = buffers::prefix(body_buf_->data(), - static_cast(body_avail_)); - return const_buffers_type{ cbp_ }; + case state::complete_in_place: + cbp_ = buffers::prefix( + (is_plain() ? cb0_ : cb1_).data(), + body_avail_); + return const_buffers_type(cbp_); default: detail::throw_logic_error(); } @@ -1520,11 +1324,9 @@ consume_body(std::size_t n) switch(st_) { case state::body: - case state::complete: - if(how_ != how::in_place) - detail::throw_logic_error(); - BOOST_ASSERT(n <= body_avail_); - body_buf_->consume(n); + case state::complete_in_place: + n = clamp(n, body_avail_); + (is_plain() ? cb0_ : cb1_).consume(n); body_avail_ -= n; return; default: @@ -1536,31 +1338,16 @@ core::string_view parser:: body() const noexcept { - switch(st_) + if(st_ == state::complete_in_place) { - default: - case state::reset: - case state::start: - case state::header: - case state::body: - case state::set_body: - // not complete - return {}; - - case state::complete: - if(how_ != how::in_place) - { - // not in_place - return {}; - } - auto cbp = body_buf_->data(); + auto cbp = (is_plain() ? cb0_ : cb1_).data(); BOOST_ASSERT(cbp[1].size() == 0); BOOST_ASSERT(cbp[0].size() == body_avail_); return core::string_view( - static_cast( - cbp[0].data()), - static_cast(body_avail_)); + static_cast(cbp[0].data()), + body_avail_); } + return {}; } core::string_view @@ -1594,8 +1381,7 @@ parser:: is_plain() const noexcept { return ! filter_ && - h_.md.payload != - payload::chunked; + h_.md.payload != payload::chunked; } // Called immediately after complete headers are received @@ -1640,8 +1426,7 @@ on_headers( ws_.data(), overread + fb_.capacity(), overread }; - body_buf_ = &cb0_; - st_ = state::complete; + st_ = state::complete_in_place; return; } @@ -1670,48 +1455,37 @@ on_headers( ws_.reserve_front(svc_.max_codec); } - if( !filter_ && - h_.md.payload != payload::chunked ) + if(h_.md.payload == payload::size ) + payload_remain_ = h_.md.payload_size; + + if(is_plain()) { cb0_ = { p, cap, overread }; - body_buf_ = &cb0_; - body_avail_ = cb0_.size(); - - if( h_.md.payload == payload::size ) + if(h_.md.payload == payload::size) { - if( h_.md.payload_size > - svc_.cfg.body_limit ) + if(h_.md.payload_size > + svc_.cfg.body_limit) { ec = BOOST_HTTP_PROTO_ERR( error::body_too_large); - st_ = state::reset; // unrecoverable + st_ = state::reset; return; } - - if( body_avail_ >= h_.md.payload_size ) - body_avail_ = h_.md.payload_size; - - payload_remain_ = - h_.md.payload_size - body_avail_; } - - body_total_ = body_avail_; st_ = state::body; return; } - if( h_.md.payload == payload::size ) - payload_remain_ = h_.md.payload_size; + // buffered payload - auto const n0 = overread > svc_.cfg.min_buffer ? - overread : svc_.cfg.min_buffer; - auto const n1 = svc_.cfg.min_buffer; + const auto n0 = (overread > svc_.cfg.min_buffer) + ? overread + : svc_.cfg.min_buffer; + const auto n1 = svc_.cfg.min_buffer; cb0_ = { p , n0, overread }; cb1_ = { p + n0 , n1 }; - body_buf_ = &cb1_; - - st_ = state::body; + st_ = state::body; } // Called at the end of set_body @@ -1727,80 +1501,146 @@ on_set_body() nprepare_ = 0; // invalidate - if(how_ == how::elastic) + if(st_ == state::body) { - if(h_.md.payload == payload::none) - { - BOOST_ASSERT(st_ == state::complete); - return; - } - - st_ = state::set_body; - return; - } - - if(how_ == how::sink) - { - if(h_.md.payload == payload::none) - { - BOOST_ASSERT(st_ == state::complete); - // force a trip through parse so - // we can calculate any error. - st_ = state::set_body; - return; - } - st_ = state::set_body; return; } - // VFALCO TODO - detail::throw_logic_error(); + BOOST_ASSERT( + st_ == state::complete_in_place); } -void +std::size_t parser:: -init_dynamic( - system::error_code& ec) +apply_filter( + system::error_code& ec, + std::size_t payload_avail, + bool more) { - // attempt to transfer in-place - // body into the dynamic buffer. - BOOST_ASSERT( - body_avail_ == body_buf_->size()); - BOOST_ASSERT( - body_total_ == body_avail_); - - auto const space_left = - eb_->max_size() - eb_->size(); + std::size_t p0 = payload_avail; - if(space_left < body_avail_) + for(;;) { - ec = BOOST_HTTP_PROTO_ERR( - error::buffer_overflow); - return; - } + if(payload_avail == 0 && more) + return p0 - payload_avail; - eb_->commit( - buffers::buffer_copy( - eb_->prepare(static_cast(body_avail_)), - body_buf_->data())); - body_buf_->consume(static_cast(body_avail_)); - body_avail_ = 0; - BOOST_ASSERT( - body_buf_->size() == 0); + auto f_rs = [&](){ + if(how_ == how::elastic) + { + std::size_t n = clamp( + svc_.cfg.body_limit - body_total_); + n = clamp(n, svc_.cfg.min_buffer); + n = clamp(n, eb_->max_size() - eb_->size()); + + // fill capacity first to avoid + // an allocation + std::size_t avail = + eb_->capacity() - eb_->size(); + if(avail != 0) + n = clamp(n, avail); + + return filter_->process( + eb_->prepare(n), + buffers::prefix( + cb0_.data(), + payload_avail), + more); + } + else // in-place and sink + { + std::size_t n = clamp( + svc_.cfg.body_limit - body_total_); + n = clamp(n, cb1_.capacity()); + + return filter_->process( + cb1_.prepare(n), + buffers::prefix( + cb0_.data(), + payload_avail), + more); + } + }(); - // TODO: expand cb_0? + cb0_.consume(f_rs.in_bytes); + payload_avail -= f_rs.in_bytes; + body_total_ += f_rs.out_bytes; - // TODO: we need a better way to recover the state. - if( !filter_ && - h_.md.payload == payload::size && - body_total_ == h_.md.payload_size) - { - st_ = state::complete; - return; - } + bool needs_more_space = !f_rs.finished && + payload_avail != 0; - st_ = state::body; + switch(how_) + { + case how::in_place: + { + cb1_.commit(f_rs.out_bytes); + body_avail_ += f_rs.out_bytes; + if( cb1_.capacity() == 0 && + needs_more_space) + { + ec = BOOST_HTTP_PROTO_ERR( + error::in_place_overflow); + return p0 - payload_avail; + } + break; + } + case how::sink: + { + cb1_.commit(f_rs.out_bytes); + auto sink_rs = sink_->write( + cb1_.data(), !f_rs.finished); + cb1_.consume(sink_rs.bytes); + if(sink_rs.ec.failed()) + { + ec = sink_rs.ec; + st_ = state::reset; + return p0 - payload_avail; + } + break; + } + case how::elastic: + { + eb_->commit(f_rs.out_bytes); + if( eb_->max_size() - eb_->size() == 0 && + needs_more_space) + { + ec = BOOST_HTTP_PROTO_ERR( + error::buffer_overflow); + st_ = state::reset; + return p0 - payload_avail; + } + break; + } + } + + if(f_rs.ec.failed()) + { + ec = f_rs.ec; + st_ = state::reset; + return p0 - payload_avail; + } + + if( svc_.cfg.body_limit == body_total_ && + needs_more_space) + { + ec = BOOST_HTTP_PROTO_ERR( + error::body_too_large); + st_ = state::reset; + return p0 - payload_avail; + } + + if(f_rs.finished) + { + if(!more) + { + st_ = (how_ == how::in_place) + ? state::complete_in_place + : state::complete; + } + + return p0 - payload_avail; + } + } } } // http_proto diff --git a/test/unit/parser.cpp b/test/unit/parser.cpp index ffc36df3..c07e2c61 100644 --- a/test/unit/parser.cpp +++ b/test/unit/parser.cpp @@ -542,7 +542,8 @@ struct parser_test auto const check_dynamic = [&]( request_parser::config const& cfg, std::size_t n, - core::string_view s) + core::string_view s, + system::error_code ex = error::need_data) { context ctx; install_parser_service(ctx, cfg); @@ -579,11 +580,16 @@ struct parser_test buffers::string_buffer sb( &tmp, dynamic_max_size); pr->set_body(std::move(sb)); - parser::mutable_buffers_type dest; - BOOST_TEST_NO_THROW( - dest = pr->prepare()); - BOOST_TEST_EQ( - buffers::buffer_size(dest), n); + pr->parse(ec); + BOOST_TEST(ec == ex); + if(! ec.failed()) + { + parser::mutable_buffers_type dest; + BOOST_TEST_NO_THROW( + dest = pr->prepare()); + BOOST_TEST_EQ( + buffers::buffer_size(dest), n); + } // the parser must be manually reset() to clear its inner workspace // otherwise, ~workspace itself will wind up clearing the registered @@ -673,6 +679,8 @@ struct parser_test BOOST_TEST_GT(s.capacity(), 0); BOOST_TEST_LT(s.capacity(), 5000); pr.set_body(buffers::string_buffer(&s)); + pr.parse(ec); + BOOST_TEST(ec == error::need_data); auto dest = pr.prepare(); BOOST_TEST_LE( buffers::buffer_size(dest), @@ -690,7 +698,8 @@ struct parser_test "HTTP/1.1 200 OK\r\n" "Content-Length: 10\r\n" "\r\n" - "1234567890"); + "1234567890", + error::buffer_overflow); } // prepare when complete @@ -708,10 +717,9 @@ struct parser_test BOOST_TEST(! ec.failed()); BOOST_TEST(pr.is_complete()); parser::mutable_buffers_type dest; - BOOST_TEST_NO_THROW( - dest = pr.prepare()); - BOOST_TEST_EQ( - buffers::buffer_size(dest), 0); + BOOST_TEST_THROWS( + dest = pr.prepare(), + std::logic_error); } } @@ -916,7 +924,8 @@ struct parser_test auto &s = *ps; pr.set_body( buffers::string_buffer(&s)); - pr.commit(0); + pr.parse(ec); + BOOST_TEST(! ec); pr.reset(); } @@ -960,10 +969,11 @@ struct parser_test system::error_code ec; pieces in = { "GET / HTTP/1.1\r\n" + "Content-Length: 1\r\n" "\r\n" }; read_header(pr, in, ec); BOOST_TEST(! ec.failed()); - BOOST_TEST(pr.is_complete()); + BOOST_TEST(!pr.is_complete()); auto dest = pr.prepare(); ignore_unused(dest); BOOST_TEST_NO_THROW(pr.commit(0)); @@ -977,14 +987,16 @@ struct parser_test system::error_code ec; pieces in = { "GET / HTTP/1.1\r\n" + "Content-Length: 1\r\n" "\r\n" }; read_header(pr, in, ec); BOOST_TEST(! ec.failed()); - BOOST_TEST(pr.is_complete()); + BOOST_TEST(!pr.is_complete()); auto dest = pr.prepare(); ignore_unused(dest); BOOST_TEST_THROWS( - pr.commit(1), + pr.commit( + buffers::buffer_size(dest) + 1), std::invalid_argument); } } @@ -1051,6 +1063,8 @@ struct parser_test auto &s = *ps; pr.set_body( buffers::string_buffer(&s)); + pr.parse(ec); + BOOST_TEST(ec == error::need_data); BOOST_TEST_NO_THROW( pr.commit_eof()); pr.reset(); @@ -1190,9 +1204,15 @@ struct parser_test "12345" }); } -#if 0 - // chunked, need data -#endif + { + // chunked, need data + response_parser::config cfg; + check_in_place(cfg, true, + error::need_data, false, { + "HTTP/1.1 200 OK\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n"}); + } { // to_eof, complete @@ -1279,6 +1299,7 @@ struct parser_test } buffers::flat_buffer fb(buf, sizeof(buf)); pr_->set_body(std::ref(fb)); + pr_->parse(ec); BOOST_TEST(pr_->body().empty()); if(! pr_->is_complete()) { @@ -1316,6 +1337,7 @@ struct parser_test return; } auto& ts = pr_->set_body(test_sink{}); + pr_->parse(ec); BOOST_TEST(pr_->body().empty()); if(! pr_->is_complete()) { @@ -1355,12 +1377,10 @@ struct parser_test } // sink -#if 0 { auto in = in0; check_sink(in, ex); } -#endif } void @@ -1381,13 +1401,10 @@ struct parser_test } // sink -#if 0 { auto in = in0; - check_sink( - res_pr_, in, ex); + check_sink(in, ex); } -#endif } // void Fn( pieces& ) diff --git a/test/unit/zlib.cpp b/test/unit/zlib.cpp index d6d2440f..d37574ff 100644 --- a/test/unit/zlib.cpp +++ b/test/unit/zlib.cpp @@ -50,7 +50,6 @@ TEST_SUITE( #include #include -#include #include #include @@ -223,10 +222,10 @@ struct zlib_test span body_view, span output) { - struct sink : public source + struct source_t : public source { span& body_view_; - sink(span& bv) : body_view_(bv) {} + source_t(span& bv) : body_view_(bv) {} results on_read(buffers::mutable_buffer b) @@ -251,7 +250,7 @@ struct zlib_test buffers::mutable_buffer output_buf( output.data(), output.size()); - auto& s = sr.start(res, body_view); + auto& s = sr.start(res, body_view); (void)s; while(! body_view.empty() || ! sr.is_done() ) @@ -535,6 +534,140 @@ struct zlib_test zlib_serializer_impl(fp, c, body, b); } + static + std::string + pull_body( + response_parser& pr, + buffers::const_buffer input) + { + std::string rs; + buffers::string_buffer buf(&rs); + for(;;) + { + auto n1 = buffers::buffer_copy( + pr.prepare(), input); + pr.commit(n1); + input = buffers::sans_prefix(input, n1); + + boost::system::error_code ec; + pr.parse(ec); + if( ec ) + BOOST_TEST(ec == error::in_place_overflow + || ec == error::need_data); + + // consume in_place body + auto n2 = buffers::buffer_copy( + buf.prepare(buffers::buffer_size(pr.pull_body())), + pr.pull_body()); + buf.commit(n2); + pr.consume_body(n2); + + if( input.size() == 0 && ec == error::need_data ) + { + pr.commit_eof(); + pr.parse(ec); + BOOST_TEST(!ec || ec == error::in_place_overflow); + } + if( pr.is_complete() ) + break; + } + return rs; + } + + static + std::string + elastic_body( + response_parser& pr, + buffers::const_buffer input) + { + std::string rs; + std::size_t n = buffers::buffer_copy( + pr.prepare(), input); + input = buffers::sans_prefix(input, n); + pr.commit(n); + system::error_code ec; + pr.parse(ec); + BOOST_TEST(pr.got_header()); + + buffers::string_buffer buf(&rs); + pr.set_body(std::ref(buf)); + pr.parse(ec); + + while(ec == error::need_data) + { + std::size_t n = buffers::buffer_copy( + pr.prepare(), input); + input = buffers::sans_prefix(input, n); + pr.commit(n); + pr.parse(ec); + if(n == 0) + { + pr.commit_eof(); + pr.parse(ec); + break; + } + } + return rs; + } + + static + std::string + sink_body( + response_parser& pr, + buffers::const_buffer input) + { + std::string rs; + std::size_t n = buffers::buffer_copy( + pr.prepare(), input); + input = buffers::sans_prefix(input, n); + pr.commit(n); + system::error_code ec; + pr.parse(ec); + BOOST_TEST(pr.got_header()); + + class sink_t : public sink + { + std::string* body_; + + public: + sink_t(std::string* body) + : body_{ body } + { + } + + results + on_write( + buffers::const_buffer b, + bool) override + { + body_->append( + static_cast< + const char*>(b.data()), + b.size()); + return { {}, b.size() }; + } + }; + + pr.set_body(&rs); + pr.parse(ec); + + while(ec == error::need_data) + { + std::size_t n = buffers::buffer_copy( + pr.prepare(), input); + input = buffers::sans_prefix(input, n); + pr.commit(n); + pr.parse(ec); + if(n == 0) + { + pr.commit_eof(); + pr.parse(ec); + break; + } + } + return rs; + } + void test_parser() { @@ -565,74 +698,56 @@ struct zlib_test response_parser::config cfg; cfg.apply_deflate_decoder = true; cfg.apply_gzip_decoder = true; - cfg.body_limit = 8 * 1024 * 1024; + cfg.body_limit = 1024 * 1024; install_parser_service(ctx, cfg); response_parser pr(ctx); + pr.reset(); - for(auto gzip : { false, true }) - for(auto chunked : { false, true }) + for(std::string encoding : { "gzip", "deflate" }) + for(std::string transfer : { "chunked", "sized", "to_eof" }) for(auto body_size : { 0, 7, 64 * 1024, 1024 * 1024 }) + for(auto receiver : { pull_body, sink_body, elastic_body }) { - std::string msg = "HTTP/1.1 200 OK\r\n"; + std::string msg = + "HTTP/1.1 200 OK\r\n" + "Content-Encoding: " + encoding + "\r\n"; - if(gzip) - msg += "Content-Encoding: gzip\r\n"; - else - msg += "Content-Encoding: deflate\r\n"; + auto body = generate_book(body_size); + auto deflated_body = deflate( + 15 + (encoding == "gzip" ? 16 : 0), + 8, + body); - if(chunked) + if(transfer == "chunked") + { msg += "Transfer-Encoding: chunked\r\n"; + msg += "\r\n"; + append_chunked(msg, deflated_body); + } + else if(transfer == "sized") + { + msg += "Content-Length: " + + std::to_string(deflated_body.size()) + "\r\n"; + msg += "\r\n"; + msg += deflated_body; + } + else // to_eof + { + msg += "\r\n"; + msg += deflated_body; + } - msg += "\r\n"; - - auto const body = generate_book(body_size); - auto const deflated = deflate(15 + (gzip ? 16 : 0), 8, body); - - if(chunked) - append_chunked(msg, deflated); - else - msg += deflated; - - pr.reset(); pr.start(); - auto msg_buf = - buffers::const_buffer{ msg.data(), msg.size() }; - - std::string parsed_body; - buffers::string_buffer parsed_body_buf{ - &parsed_body }; - for(;;) - { - auto n1 = buffers::buffer_copy( - pr.prepare(), msg_buf); - pr.commit(n1); - msg_buf = buffers::sans_prefix(msg_buf, n1); + auto rs = receiver( + pr, + buffers::const_buffer( + msg.data(), msg.size())); - boost::system::error_code ec; - pr.parse(ec); - if( ec ) - BOOST_TEST(ec == error::in_place_overflow - || ec == error::need_data); - - // consume in_place body - auto n2 = buffers::buffer_copy( - parsed_body_buf.prepare( - buffers::buffer_size(pr.pull_body())), - pr.pull_body()); - parsed_body_buf.commit(n2); - pr.consume_body(n2); - - if( msg_buf.size() == 0 && ec == error::need_data ) - { - pr.commit_eof(); - pr.parse(ec); - BOOST_TEST(!ec || ec == error::in_place_overflow); - } - if( pr.is_complete() ) - break; - } - BOOST_TEST(parsed_body == body); + BOOST_TEST(rs == body); + + if(transfer == "to_eof") + pr.reset(); } } @@ -679,6 +794,7 @@ struct zlib_test BOOST_TEST_EQ(n, msg.size()); pr.commit(n); + pr.commit_eof(); boost::system::error_code ec; pr.parse(ec);