diff --git a/src/cow_http3_machine.erl b/src/cow_http3_machine.erl index 54cce0124..3b9abcbd2 100644 --- a/src/cow_http3_machine.erl +++ b/src/cow_http3_machine.erl @@ -31,7 +31,7 @@ -export([get_stream_remote_state/2]). -record(stream, { - ref :: any(), %% @todo specs + id :: non_neg_integer(), dir :: unidi_local | unidi_remote | bidi, type :: undefined | req | control | push | encoder | decoder, @@ -65,13 +65,9 @@ %% Maximum Push ID. max_push_id = -1 :: -1 | non_neg_integer(), - %% Quick pointers for commonly used streams. - local_encoder_ref :: any(), %% @todo specs - local_decoder_ref :: any(), %% @todo specs - %% Currently active HTTP/3 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. - streams = #{} :: #{reference() => stream()}, + streams = #{} :: #{non_neg_integer() => stream()}, %% @todo Maybe merge these two in some kind of control stream state. has_peer_control_stream = false :: boolean(), has_received_peer_settings = false :: boolean(), @@ -88,22 +84,20 @@ init(Mode, _Opts) -> -spec init_unidi_local_streams(_, _ ,_ ,_) -> _. %% @todo -init_unidi_local_streams(ControlRef, EncoderRef, DecoderRef, +init_unidi_local_streams(ControlID, EncoderID, DecoderID, State=#http3_machine{streams=Streams}) -> State#http3_machine{ - local_encoder_ref=EncoderRef, - local_decoder_ref=DecoderRef, streams=Streams#{ - ControlRef => #stream{ref=ControlRef, dir=unidi_local, type=control}, - EncoderRef => #stream{ref=EncoderRef, dir=unidi_local, type=encoder}, - DecoderRef => #stream{ref=DecoderRef, dir=unidi_local, type=decoder} + ControlID => #stream{id=ControlID, dir=unidi_local, type=control}, + EncoderID => #stream{id=EncoderID, dir=unidi_local, type=encoder}, + DecoderID => #stream{id=DecoderID, dir=unidi_local, type=decoder} }}. -spec init_unidi_stream(_, _, _) -> _. %% @todo -init_unidi_stream(StreamRef, StreamDir, State=#http3_machine{streams=Streams}) -> - State#http3_machine{streams=Streams#{StreamRef => #stream{ - ref=StreamRef, dir=StreamDir, type=undefined}}}. +init_unidi_stream(StreamID, StreamDir, State=#http3_machine{streams=Streams}) -> + State#http3_machine{streams=Streams#{StreamID => #stream{ + id=StreamID, dir=StreamDir, type=undefined}}}. -spec set_unidi_remote_stream_type(_, _, _) -> _. %% @todo @@ -111,40 +105,40 @@ set_unidi_remote_stream_type(_, control, State=#http3_machine{has_peer_control_s {error, {connection_error, h3_stream_creation_error, 'There can be only one control stream; yet a second one was opened. (RFC9114 6.2.1)'}, State}; -set_unidi_remote_stream_type(StreamRef, Type, +set_unidi_remote_stream_type(StreamID, Type, State=#http3_machine{streams=Streams, has_peer_control_stream=HasControl}) -> - #{StreamRef := Stream} = Streams, + #{StreamID := Stream} = Streams, {ok, State#http3_machine{ - streams=Streams#{StreamRef => Stream#stream{type=Type}}, + streams=Streams#{StreamID => Stream#stream{type=Type}}, has_peer_control_stream=HasControl orelse (Type =:= control) }}. -spec init_bidi_stream(_, _) -> _. %% @todo %% All bidi streams are request/response. -init_bidi_stream(StreamRef, State=#http3_machine{streams=Streams}) -> - State#http3_machine{streams=Streams#{StreamRef => #stream{ - ref=StreamRef, dir=bidi, type=req}}}. +init_bidi_stream(StreamID, State=#http3_machine{streams=Streams}) -> + State#http3_machine{streams=Streams#{StreamID => #stream{ + id=StreamID, dir=bidi, type=req}}}. -spec init_bidi_stream(_, _, _) -> _. %% @todo %% All bidi streams are request/response. -init_bidi_stream(StreamRef, Method, State=#http3_machine{streams=Streams}) -> - State#http3_machine{streams=Streams#{StreamRef => #stream{ - ref=StreamRef, dir=bidi, type=req, method=Method}}}. +init_bidi_stream(StreamID, Method, State=#http3_machine{streams=Streams}) -> + State#http3_machine{streams=Streams#{StreamID => #stream{ + id=StreamID, dir=bidi, type=req, method=Method}}}. %% @todo set_bidi_method? -spec close_bidi_stream_for_sending(_, _) -> _. %% @todo -close_bidi_stream_for_sending(StreamRef, State=#http3_machine{streams=Streams}) -> - #{StreamRef := Stream} = Streams, +close_bidi_stream_for_sending(StreamID, State=#http3_machine{streams=Streams}) -> + #{StreamID := Stream} = Streams, stream_store(Stream#stream{local=fin}, State). -spec close_stream(_, _) -> _. %% @todo -close_stream(StreamRef, State=#http3_machine{streams=Streams0}) -> - case maps:take(StreamRef, Streams0) of +close_stream(StreamID, State=#http3_machine{streams=Streams0}) -> + case maps:take(StreamID, Streams0) of {#stream{type=control}, Streams} -> {error, {connection_error, h3_closed_critical_stream, 'The control stream was closed. (RFC9114 6.2.1)'}, @@ -155,29 +149,22 @@ close_stream(StreamRef, State=#http3_machine{streams=Streams0}) -> -spec frame(_, _, _, _) -> _. %% @todo -frame(Frame, IsFin, StreamRef, State) -> +frame(Frame, IsFin, StreamID, State) -> case element(1, Frame) of - data -> data_frame(Frame, IsFin, StreamRef, State); - headers -> headers_frame(Frame, IsFin, StreamRef, State); - cancel_push -> cancel_push_frame(Frame, IsFin, StreamRef, State); - settings -> settings_frame(Frame, IsFin, StreamRef, State); - push_promise -> push_promise_frame(Frame, IsFin, StreamRef, State); - goaway -> goaway_frame(Frame, IsFin, StreamRef, State); - max_push_id -> max_push_id_frame(Frame, IsFin, StreamRef, State) + data -> data_frame(Frame, IsFin, StreamID, State); + headers -> headers_frame(Frame, IsFin, StreamID, State); + cancel_push -> cancel_push_frame(Frame, IsFin, StreamID, State); + settings -> settings_frame(Frame, IsFin, StreamID, State); + push_promise -> push_promise_frame(Frame, IsFin, StreamID, State); + goaway -> goaway_frame(Frame, IsFin, StreamID, State); + max_push_id -> max_push_id_frame(Frame, IsFin, StreamID, State) end. %% DATA frame. -%data_frame({data, StreamID, _, _}, State=#http2_machine{mode=Mode, -% local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) -% when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID)) -% orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) -> -% {error, {connection_error, protocol_error, -% 'DATA frame received on a stream in idle state. (RFC7540 5.1)'}, -% State}; -data_frame(Frame={data, Data}, IsFin, StreamRef, State) -> +data_frame(Frame={data, Data}, IsFin, StreamID, State) -> DataLen = byte_size(Data), - case stream_get(StreamRef, State) of + case stream_get(StreamID, State) of Stream = #stream{type=req, remote=nofin} -> data_frame(Frame, IsFin, Stream, State, DataLen); #stream{type=req, remote=idle} -> @@ -222,8 +209,8 @@ is_body_size_valid(_) -> %% HEADERS frame. -headers_frame(Frame, IsFin, StreamRef, State=#http3_machine{mode=Mode}) -> - case stream_get(StreamRef, State) of +headers_frame(Frame, IsFin, StreamID, State=#http3_machine{mode=Mode}) -> + case stream_get(StreamID, State) of %% Headers. Stream=#stream{type=req, remote=idle} -> headers_decode(Frame, IsFin, Stream, State, case Mode of @@ -243,9 +230,8 @@ headers_frame(Frame, IsFin, StreamRef, State=#http3_machine{mode=Mode}) -> end. %% @todo Check whether connection_error or stream_error fits better. -headers_decode({headers, EncodedFieldSection}, IsFin, Stream=#stream{ref=StreamRef}, +headers_decode({headers, EncodedFieldSection}, IsFin, Stream=#stream{id=StreamID}, State=#http3_machine{decode_state=DecodeState0}, Type) -> - {ok, StreamID} = quicer:get_stream_id(StreamRef), try cow_qpack:decode_field_section(EncodedFieldSection, StreamID, DecodeState0) of {ok, Headers, DecData, DecodeState} -> headers_pseudo_headers(Stream, @@ -492,8 +478,7 @@ headers_parse_expected_size(Stream, State, IsFin, Type, DecData, end end. -headers_frame(Stream0, State0=#http3_machine{local_decoder_ref=DecoderRef}, - IsFin, Type, DecData, PseudoHeaders, Headers, Len) -> +headers_frame(Stream0, State0, IsFin, Type, DecData, PseudoHeaders, Headers, Len) -> Stream = case Type of request -> TE = case lists:keyfind(<<"te">>, 1, Headers) of @@ -513,10 +498,10 @@ headers_frame(Stream0, State0=#http3_machine{local_decoder_ref=DecoderRef}, <<>> -> {ok, {headers, IsFin, Headers, PseudoHeaders, Len}, State}; _ -> - {ok, {headers, IsFin, Headers, PseudoHeaders, Len}, {DecoderRef, DecData}, State} + {ok, {headers, IsFin, Headers, PseudoHeaders, Len}, DecData, State} end. -trailers_frame(Stream0, State0=#http3_machine{local_decoder_ref=DecoderRef}, DecData, Headers) -> +trailers_frame(Stream0, State0, DecData, Headers) -> Stream = Stream0#stream{remote=fin}, State = stream_store(Stream, State0), %% @todo Error out if we didn't get the full body. @@ -526,21 +511,21 @@ trailers_frame(Stream0, State0=#http3_machine{local_decoder_ref=DecoderRef}, Dec <<>> -> {ok, {trailers, Headers}, State}; _ -> - {ok, {trailers, Headers}, {DecoderRef, DecData}, State} + {ok, {trailers, Headers}, DecData, State} end.%; % false -> % stream_reset(StreamID, State, protocol_error, % 'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)') % end. -cancel_push_frame(Frame, _IsFin, StreamRef, State) -> - case stream_get(StreamRef, State) of +cancel_push_frame(Frame, _IsFin, StreamID, State) -> + case stream_get(StreamID, State) of #stream{type=control} -> control_frame(Frame, State) end. -settings_frame(Frame, _IsFin, StreamRef, State) -> - case stream_get(StreamRef, State) of +settings_frame(Frame, _IsFin, StreamID, State) -> + case stream_get(StreamID, State) of #stream{type=control} -> control_frame(Frame, State); #stream{type=req} -> @@ -549,14 +534,14 @@ settings_frame(Frame, _IsFin, StreamRef, State) -> State} end. -push_promise_frame(Frame, _IsFin, StreamRef, State) -> - case stream_get(StreamRef, State) of +push_promise_frame(Frame, _IsFin, StreamID, State) -> + case stream_get(StreamID, State) of #stream{type=control} -> control_frame(Frame, State) end. -goaway_frame(Frame, _IsFin, StreamRef, State) -> - case stream_get(StreamRef, State) of +goaway_frame(Frame, _IsFin, StreamID, State) -> + case stream_get(StreamID, State) of #stream{type=control} -> control_frame(Frame, State); #stream{type=req} -> @@ -565,8 +550,8 @@ goaway_frame(Frame, _IsFin, StreamRef, State) -> State} end. -max_push_id_frame(Frame, _IsFin, StreamRef, State) -> - case stream_get(StreamRef, State) of +max_push_id_frame(Frame, _IsFin, StreamID, State) -> + case stream_get(StreamID, State) of #stream{type=control} -> control_frame(Frame, State); #stream{type=req} -> @@ -608,8 +593,8 @@ control_frame(_Frame, State) -> -spec ignored_frame(_, _) -> _. %% @todo -ignored_frame(StreamRef, State) -> - case stream_get(StreamRef, State) of +ignored_frame(StreamID, State) -> + case stream_get(StreamID, State) of #stream{type=control} -> control_frame(ignored_frame, State); _ -> @@ -625,10 +610,9 @@ ignored_frame(StreamRef, State) -> % -> {ok, cow_http2:fin(), iodata(), State} when State::http2_machine(). -spec prepare_headers(_, _, _, _, _) -> todo. -prepare_headers(StreamRef, State=#http3_machine{encode_state=EncodeState0}, +prepare_headers(StreamID, State=#http3_machine{encode_state=EncodeState0}, IsFin0, PseudoHeaders, Headers0) -> - {ok, StreamID} = quicer:get_stream_id(StreamRef), - Stream = #stream{method=Method, local=idle} = stream_get(StreamRef, State), + Stream = #stream{method=Method, local=idle} = stream_get(StreamID, State), IsFin = case {IsFin0, Method} of {idle, _} -> nofin; {_, <<"HEAD">>} -> fin; @@ -672,8 +656,8 @@ merge_pseudo_headers(PseudoHeaders, Headers0) -> -spec prepare_trailers(_, _, _) -> todo. -prepare_trailers(StreamRef, State=#http3_machine{encode_state=EncodeState0}, Trailers) -> - Stream = #stream{local=nofin, te=TE0} = stream_get(StreamRef, State), +prepare_trailers(StreamID, State=#http3_machine{encode_state=EncodeState0}, Trailers) -> + Stream = #stream{local=nofin, te=TE0} = stream_get(StreamID, State), TE = try cow_http_hd:parse_te(TE0) of {trailers, []} -> trailers; _ -> no_trailers @@ -683,7 +667,6 @@ prepare_trailers(StreamRef, State=#http3_machine{encode_state=EncodeState0}, Tra end, case TE of trailers -> - {ok, StreamID} = quicer:get_stream_id(StreamRef), {ok, HeaderBlock, EncData, EncodeState} = cow_qpack:encode_field_section(Trailers, StreamID, EncodeState0), {trailers, HeaderBlock, EncData, stream_store(Stream#stream{local=fin}, State#http3_machine{encode_state=EncodeState})}; @@ -695,8 +678,8 @@ prepare_trailers(StreamRef, State=#http3_machine{encode_state=EncodeState0}, Tra -spec reset_stream(_, _) -> todo. -reset_stream(StreamRef, State=#http3_machine{streams=Streams0}) -> - case maps:take(StreamRef, Streams0) of +reset_stream(StreamID, State=#http3_machine{streams=Streams0}) -> + case maps:take(StreamID, Streams0) of {_, Streams} -> {ok, State#http3_machine{streams=Streams}}; error -> @@ -707,8 +690,8 @@ reset_stream(StreamRef, State=#http3_machine{streams=Streams0}) -> -spec get_stream_local_state(_, _) -> todo. -get_stream_local_state(StreamRef, State) -> - case stream_get(StreamRef, State) of +get_stream_local_state(StreamID, State) -> + case stream_get(StreamID, State) of #stream{local=IsFin} -> {ok, IsFin}; %% Stream may never have been opened, or could have @@ -721,8 +704,8 @@ get_stream_local_state(StreamRef, State) -> -spec get_stream_remote_state(_, _) -> todo. -get_stream_remote_state(StreamRef, State) -> - case stream_get(StreamRef, State) of +get_stream_remote_state(StreamID, State) -> + case stream_get(StreamID, State) of #stream{remote=IsFin} -> {ok, IsFin}; %% Stream may never have been opened, or could have @@ -733,14 +716,14 @@ get_stream_remote_state(StreamRef, State) -> %% Stream-related functions. -stream_get(StreamRef, #http3_machine{streams=Streams}) -> - maps:get(StreamRef, Streams, undefined). +stream_get(StreamID, #http3_machine{streams=Streams}) -> + maps:get(StreamID, Streams, undefined). -%stream_store(#stream{ref=StreamRef, local=fin, remote=fin}, +%stream_store(#stream{id=StreamID, local=fin, remote=fin}, % State=#http3_machine{streams=Streams0}) -> % %% @todo We may want to wait for the QUIC message about stream closure. -% Streams = maps:remove(StreamRef, Streams0), +% Streams = maps:remove(StreamID, Streams0), % State#http3_machine{streams=Streams}; -stream_store(Stream=#stream{ref=StreamRef}, +stream_store(Stream=#stream{id=StreamID}, State=#http3_machine{streams=Streams}) -> - State#http3_machine{streams=Streams#{StreamRef => Stream}}. + State#http3_machine{streams=Streams#{StreamID => Stream}}. diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index 9038f8687..81bc2e8f2 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -15,10 +15,7 @@ %% A key difference between cowboy_http2 and cowboy_http3 %% is that HTTP/3 streams are QUIC streams and therefore %% much of the connection state is handled outside of -%% Cowboy. The quicer library uses a reference for -%% identifying streams, and we use that same reference -%% for our StreamID internally. The real StreamID can -%% be retrieved via quicer:get_stream_id(StreamRef). +%% Cowboy. -module(cowboy_http3). @@ -30,12 +27,11 @@ -include_lib("quicer/include/quicer.hrl"). -record(stream, { - %% @todo We shouldn't use the QUIC reference because it is a NIF object - %% and if it's stored somewhere by the user it'll never get GC. + id :: non_neg_integer(), %% @todo specs ref :: any(), %% @todo specs %% Whether the stream is currently in a special state. - status :: header | normal | {data, non_neg_integer()} | discard, + status :: header | normal | {data, non_neg_integer()} | stopping, %% Stream buffer. buffer = <<>> :: binary(), @@ -58,7 +54,13 @@ %% HTTP/3 state machine. http3_machine :: cow_http3_machine:http3_machine(), - %% Bidirectional streams are used for requests and responses. + %% Quick pointers for commonly used streams. + local_control_ref :: any(), %% @todo specs Control stream must not be closed. + local_encoder_ref :: any(), %% @todo specs + local_decoder_ref :: any(), %% @todo specs + + %% Bidirectional streams used for requests and responses, + %% as well as unidirectional streams initiated by the client. streams = #{} :: map(), %% @todo specs %% Lingering streams that were recently reset. We may receive @@ -77,16 +79,19 @@ init(Parent, Conn, Opts) -> %% Immediately open a control, encoder and decoder stream. {ok, ControlRef} = quicer:start_stream(Conn, #{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}), + {ok, ControlID} = quicer:get_stream_id(ControlRef), {ok, _} = quicer:send(ControlRef, [<<0>>, SettingsBin]), {ok, EncoderRef} = quicer:start_stream(Conn, #{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}), + {ok, EncoderID} = quicer:get_stream_id(EncoderRef), {ok, _} = quicer:send(EncoderRef, <<2>>), {ok, DecoderRef} = quicer:start_stream(Conn, #{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}), + {ok, DecoderID} = quicer:get_stream_id(DecoderRef), {ok, _} = quicer:send(DecoderRef, <<3>>), %% Set the control, encoder and decoder streams in the machine. HTTP3Machine = cow_http3_machine:init_unidi_local_streams( - ControlRef, EncoderRef, DecoderRef, HTTP3Machine0), + ControlID, EncoderID, DecoderID, HTTP3Machine0), %% Get the peername/sockname. Peer0 = quicer:peername(Conn), Sock0 = quicer:sockname(Conn), @@ -95,7 +100,10 @@ init(Parent, Conn, Opts) -> {{ok, Peer}, {ok, Sock}} -> %% Quick! Let's go! loop(#state{parent=Parent, conn=Conn, opts=Opts, - peer=Peer, sock=Sock, http3_machine=HTTP3Machine}); + peer=Peer, sock=Sock, http3_machine=HTTP3Machine, + local_control_ref=ControlRef, + local_encoder_ref=EncoderRef, + local_decoder_ref=DecoderRef}); {{error, Reason}, _} -> terminate(undefined, {socket_error, Reason, 'A socket error occurred when retrieving the peer name.'}); @@ -110,17 +118,20 @@ loop(State0=#state{conn=Conn}) -> %% @todo IsFin is inside Props. But it may not be set once the data was sent. {quic, Data, StreamRef, Props} when is_binary(Data) -> % ct:pal("DATA ~p props ~p", [StreamRef, Props]), - parse(State0, Data, StreamRef, Props); + {ok, StreamID} = quicer:get_stream_id(StreamRef), + parse(State0, Data, StreamID, Props); %% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED {quic, new_stream, StreamRef, #{flags := Flags}} -> % ct:pal("new_stream ~p flags ~p", [StreamRef, Flags]), ok = quicer:setopt(StreamRef, active, true), - State = stream_new_remote(State0, StreamRef, Flags), + {ok, StreamID} = quicer:get_stream_id(StreamRef), + State = stream_new_remote(State0, StreamID, StreamRef, Flags), loop(State); %% QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE {quic, stream_closed, StreamRef, Flags} -> % ct:pal("stream_closed ~p flags ~p", [StreamRef, Flags]), - State = stream_closed(State0, StreamRef, Flags), + {ok, StreamID} = quicer:get_stream_id(StreamRef), + State = stream_closed(State0, StreamID, Flags), loop(State); %% QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE %% @@ -146,8 +157,8 @@ loop(State0=#state{conn=Conn}) -> {quic, send_shutdown_complete, _StreamRef, _IsGraceful} -> loop(State0); %% Messages pertaining to a stream. - {{Pid, StreamRef}, Msg} when Pid =:= self() -> - loop(info(State0, StreamRef, Msg)); + {{Pid, StreamID}, Msg} when Pid =:= self() -> + loop(info(State0, StreamID, Msg)); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> loop(down(State0, Pid, Msg)); @@ -156,34 +167,31 @@ loop(State0=#state{conn=Conn}) -> loop(State0) end. -parse(State=#state{streams=Streams, opts=Opts}, Data, StreamRef, Props) -> - case Streams of - #{StreamRef := Stream=#stream{buffer= <<>>}} -> +parse(State=#state{opts=Opts}, Data, StreamID, Props) -> + case stream_get(State, StreamID) of + Stream=#stream{buffer= <<>>} -> parse1(State, Data, Stream, Props); - #{StreamRef := Stream=#stream{buffer=Buffer}} -> - %% @todo OK we should only keep the StreamRef forward - %% and update the stream in the state here. + Stream=#stream{buffer=Buffer} -> Stream1 = Stream#stream{buffer= <<>>}, - parse1(stream_update(State, Stream1), + parse1(stream_store(State, Stream1), <>, Stream1, Props); %% Pending data for a stream that has been reset. Ignore. - %% @todo Maybe keep a few pending to ignore this and stream process messages. - #{} -> - case is_lingering_stream(State, StreamRef) of + error -> + case is_lingering_stream(State, StreamID) of true -> ok; false -> %% We avoid logging the data as it could be quite large. cowboy:log(warning, "Received data for unknown stream ~p.", - [StreamRef], Opts) + [StreamID], Opts) end, loop(State) end. -%% @todo Swap Data and Stream/StreamRef. +%% @todo Swap Data and Stream/StreamID? parse1(State, Data, Stream=#stream{status=header}, Props) -> parse_unidirectional_stream_header(State, Data, Stream, Props); -parse1(State, Data, Stream=#stream{status={data, Len}, ref=StreamRef}, Props) -> +parse1(State, Data, Stream=#stream{status={data, Len}, id=StreamID}, Props) -> DataLen = byte_size(Data), if DataLen < Len -> @@ -193,15 +201,16 @@ parse1(State, Data, Stream=#stream{status={data, Len}, ref=StreamRef}, Props) -> <> = Data, IsFin = is_fin(Props, Rest), parse(frame(State, Stream#stream{status=normal}, {data, Data1}, IsFin), - Rest, StreamRef, Props) + Rest, StreamID, Props) end; -%% @todo Clause that discards receiving data for aborted streams. -parse1(State, Data, Stream=#stream{ref=StreamRef}, Props) -> +%% @todo Clause that discards receiving data for stopping streams. +%% We may receive a few more frames after we abort receiving. +parse1(State, Data, Stream=#stream{id=StreamID}, Props) -> case cow_http3:parse(Data) of {ok, Frame, Rest} -> IsFin = is_fin(Props, Rest), % ct:pal("parse1 Frame= ~p Rest= ~p", [Frame, Rest]), - parse(frame(State, Stream, Frame, IsFin), Rest, StreamRef, Props); + parse(frame(State, Stream, Frame, IsFin), Rest, StreamID, Props); {more, Frame, Len} -> IsFin = is_fin(Props, <<>>), case IsFin of @@ -212,16 +221,16 @@ parse1(State, Data, Stream=#stream{ref=StreamRef}, Props) -> 'Last frame on stream was truncated. (RFC9114 7.1)'}) end; {ignore, Rest} -> - parse(ignored_frame(State, Stream), Rest, StreamRef, Props); + parse(ignored_frame(State, Stream), Rest, StreamID, Props); Error = {connection_error, _, _} -> terminate(State, Error); more when Data =:= <<>> -> - loop(stream_update(State, Stream#stream{buffer=Data})); + loop(stream_store(State, Stream#stream{buffer=Data})); more -> IsFin = is_fin(Props, <<>>), case IsFin of nofin -> - loop(stream_update(State, Stream#stream{buffer=Data})); + loop(stream_store(State, Stream#stream{buffer=Data})); fin -> terminate(State, {connection_error, h3_frame_error, 'Last frame on stream was truncated. (RFC9114 7.1)'}) @@ -239,15 +248,15 @@ is_fin(#{flags := Flags}, Rest) -> end. parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0}, - Data, Stream0=#stream{ref=StreamRef}, Props) -> + Data, Stream0=#stream{id=StreamID}, Props) -> case cow_http3:parse_unidi_stream_header(Data) of {ok, Type, Rest} when Type =:= control; Type =:= encoder; Type =:= decoder -> case cow_http3_machine:set_unidi_remote_stream_type( - StreamRef, Type, HTTP3Machine0) of + StreamID, Type, HTTP3Machine0) of {ok, HTTP3Machine} -> State = State0#state{http3_machine=HTTP3Machine}, Stream = Stream0#stream{status=normal}, - parse(stream_update(State, Stream), Rest, StreamRef, Props); + parse(stream_store(State, Stream), Rest, StreamID, Props); {error, Error={connection_error, _, _}, HTTP3Machine} -> terminate(State0#state{http3_machine=HTTP3Machine}, Error) end; @@ -260,9 +269,10 @@ parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0}, loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error)) end. -frame(State=#state{http3_machine=HTTP3Machine0}, Stream=#stream{ref=StreamRef}, Frame, IsFin) -> +frame(State=#state{http3_machine=HTTP3Machine0, local_decoder_ref=DecoderRef}, + Stream=#stream{id=StreamID}, Frame, IsFin) -> % ct:pal("cowboy frame ~p ~p", [Frame, IsFin]), - case cow_http3_machine:frame(Frame, IsFin, StreamRef, HTTP3Machine0) of + case cow_http3_machine:frame(Frame, IsFin, StreamID, HTTP3Machine0) of {ok, HTTP3Machine} -> State#state{http3_machine=HTTP3Machine}; {ok, {data, Data}, HTTP3Machine} -> @@ -271,8 +281,7 @@ frame(State=#state{http3_machine=HTTP3Machine0}, Stream=#stream{ref=StreamRef}, {ok, {headers, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP3Machine} -> headers_frame(State#state{http3_machine=HTTP3Machine}, Stream, IsFin, Headers, PseudoHeaders, BodyLen); - {ok, {headers, IsFin, Headers, PseudoHeaders, BodyLen}, - {DecoderRef, DecData}, HTTP3Machine} -> + {ok, {headers, IsFin, Headers, PseudoHeaders, BodyLen}, DecData, HTTP3Machine} -> %% Send the decoder data. {ok, _} = quicer:send(DecoderRef, DecData), headers_frame(State#state{http3_machine=HTTP3Machine}, @@ -283,40 +292,46 @@ frame(State=#state{http3_machine=HTTP3Machine0}, Stream=#stream{ref=StreamRef}, {ok, GoAway={goaway, _}, HTTP3Machine} -> goaway(State#state{http3_machine=HTTP3Machine}, GoAway); {error, Error={stream_error, _Reason, _Human}, HTTP3Machine} -> - reset_stream(State#state{http3_machine=HTTP3Machine}, StreamRef, Error); + reset_stream(State#state{http3_machine=HTTP3Machine}, Stream, Error); {error, Error={connection_error, _, _}, HTTP3Machine} -> terminate(State#state{http3_machine=HTTP3Machine}, Error) end. -data_frame(State=#state{opts=Opts, streams=Streams}, - Stream=#stream{ref=StreamRef, state=StreamState0}, IsFin, Data) -> - try cowboy_stream:data(StreamRef, IsFin, Data, StreamState0) of +data_frame(State=#state{opts=Opts}, + Stream=#stream{id=StreamID, state=StreamState0}, IsFin, Data) -> + try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of {Commands, StreamState} -> - commands(State#state{ - streams=Streams#{StreamRef => Stream#stream{state=StreamState}}}, - StreamRef, Commands) + commands(State, Stream#stream{state=StreamState}, Commands) catch Class:Exception:Stacktrace -> cowboy:log(cowboy_stream:make_error_log(data, - [StreamRef, IsFin, Data, StreamState0], + [StreamID, IsFin, Data, StreamState0], Class, Exception, Stacktrace), Opts), - reset_stream(State, StreamRef, {internal_error, {Class, Exception}, + reset_stream(State, Stream, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:data/4.'}) end. -%% @todo CONNECT, TRACE. +headers_frame(State, Stream, IsFin, Headers, + PseudoHeaders=#{method := <<"CONNECT">>}, _) + when map_size(PseudoHeaders) =:= 2 -> + early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501, + 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'); +headers_frame(State, Stream, IsFin, Headers, + PseudoHeaders=#{method := <<"TRACE">>}, _) -> + early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501, + 'The TRACE method is currently not implemented. (RFC9114 4.4, RFC7231 4.3.8)'); headers_frame(State, Stream, IsFin, Headers, PseudoHeaders=#{authority := Authority}, BodyLen) -> headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority); -headers_frame(State, Stream=#stream{ref=StreamRef}, IsFin, Headers, PseudoHeaders, BodyLen) -> +headers_frame(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen) -> case lists:keyfind(<<"host">>, 1, Headers) of {_, Authority} -> headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority); _ -> - reset_stream(State, StreamRef, {stream_error, h3_message_error, + reset_stream(State, Stream, {stream_error, h3_message_error, 'Requests translated from HTTP/1.1 must include a host header. (RFC7540 8.1.2.3, RFC7230 5.4)'}) end. headers_frame_parse_host(State=#state{peer=Peer, sock=Sock}, - Stream=#stream{ref=StreamRef}, IsFin, Headers, + Stream=#stream{id=StreamID}, IsFin, Headers, #{method := Method, scheme := Scheme, path := PathWithQs}, BodyLen, Authority) -> try cow_http_hd:parse_host(Authority) of @@ -324,13 +339,13 @@ headers_frame_parse_host(State=#state{peer=Peer, sock=Sock}, Port = ensure_port(Scheme, Port0), try cow_http:parse_fullpath(PathWithQs) of {<<>>, _} -> - reset_stream(State, StreamRef, {stream_error, h3_message_error, + reset_stream(State, Stream, {stream_error, h3_message_error, 'The path component must not be empty. (RFC7540 8.1.2.3)'}); {Path, Qs} -> Req = #{ ref => quic, %% @todo Ref, pid => self(), - streamid => StreamRef, + streamid => StreamID, peer => Peer, sock => Sock, cert => undefined, %Cert, %% @todo @@ -352,11 +367,11 @@ headers_frame_parse_host(State=#state{peer=Peer, sock=Sock}, % end, headers_frame(State, Stream, Req) catch _:_ -> - reset_stream(State, StreamRef, {stream_error, h3_message_error, + reset_stream(State, Stream, {stream_error, h3_message_error, 'The :path pseudo-header is invalid. (RFC7540 8.1.2.3)'}) end catch _:_ -> - reset_stream(State, StreamRef, {stream_error, h3_message_error, + reset_stream(State, Stream, {stream_error, h3_message_error, 'The :authority pseudo-header is invalid. (RFC7540 8.1.2.3)'}) end. @@ -383,24 +398,60 @@ headers_to_map([{Name, Value}|Tail], Acc0) -> end, headers_to_map(Tail, Acc). -headers_frame(State=#state{opts=Opts, streams=Streams}, - Stream=#stream{ref=StreamRef}, Req) -> +headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) -> ct:pal("req ~p", [Req]), - try cowboy_stream:init(StreamRef, Req, Opts) of + try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> %logger:error("~p", [Commands]), %logger:error("~p", [StreamState]), - commands(State#state{ - streams=Streams#{StreamRef => Stream#stream{state=StreamState}}}, - StreamRef, Commands) + commands(State, Stream#stream{state=StreamState}, Commands) catch Class:Exception:Stacktrace -> cowboy:log(cowboy_stream:make_error_log(init, - [StreamRef, Req, Opts], + [StreamID, Req, Opts], Class, Exception, Stacktrace), Opts), - reset_stream(State, StreamRef, {internal_error, {Class, Exception}, + reset_stream(State, Stream, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:init/3.'}) end. + + + + + +early_error(State0=#state{opts=Opts, peer=Peer}, + Stream=#stream{id=StreamID}, _IsFin, Headers, #{method := Method}, + StatusCode0, HumanReadable) -> + %% We automatically terminate the stream but it is not an error + %% per se (at least not in the first implementation). + Reason = {stream_error, h3_no_error, HumanReadable}, + %% The partial Req is minimal for now. We only have one case + %% where it can be called (when a method is completely disabled). + %% @todo Fill in the other elements. + PartialReq = #{ + ref => quic, %% @todo Ref, + peer => Peer, + method => Method, + headers => headers_to_map(Headers, #{}) + }, + Resp = {response, StatusCode0, RespHeaders0=#{<<"content-length">> => <<"0">>}, <<>>}, + try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of + {response, StatusCode, RespHeaders, RespBody} -> + send_response(State0, Stream, StatusCode, RespHeaders, RespBody) + catch Class:Exception:Stacktrace -> + cowboy:log(cowboy_stream:make_error_log(early_error, + [StreamID, Reason, PartialReq, Resp, Opts], + Class, Exception, Stacktrace), Opts), + %% We still need to send an error response, so send what we initially + %% wanted to send. It's better than nothing. + send_headers(State0, StreamID, fin, StatusCode0, RespHeaders0) + end. + + + + + + + %% Erlang messages. down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) -> @@ -409,8 +460,8 @@ down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) -> {ok, undefined, Children} -> State0#state{children=Children}; %% The stream is still running. - {ok, StreamRef, Children} -> - info(State0#state{children=Children}, StreamRef, Msg); + {ok, StreamID, Children} -> + info(State0#state{children=Children}, StreamID, Msg); %% The process was unknown. error -> cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n", @@ -425,64 +476,62 @@ down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) -> State end. -info(State=#state{opts=Opts, http3_machine=_HTTP3Machine, streams=Streams}, StreamRef, Msg) -> +info(State=#state{opts=Opts, http3_machine=_HTTP3Machine}, StreamID, Msg) -> %ct:pal("INFO ~p", [Msg]), - case Streams of - #{StreamRef := Stream=#stream{state=StreamState0}} -> - try cowboy_stream:info(StreamRef, Msg, StreamState0) of + case stream_get(State, StreamID) of + Stream=#stream{state=StreamState0} -> + try cowboy_stream:info(StreamID, Msg, StreamState0) of {Commands, StreamState} -> %ct:pal("~p", [Commands]), -%logger:error("~p ~p", [StreamRef, Streams]), - commands(State#state{streams=Streams#{StreamRef => Stream#stream{state=StreamState}}}, - StreamRef, Commands) +%logger:error("~p ~p", [StreamID, Streams]), + commands(State, Stream#stream{state=StreamState}, Commands) catch Class:Exception:Stacktrace -> cowboy:log(cowboy_stream:make_error_log(info, - [StreamRef, Msg, StreamState0], + [StreamID, Msg, StreamState0], Class, Exception, Stacktrace), Opts), - reset_stream(State, StreamRef, {internal_error, {Class, Exception}, + reset_stream(State, Stream, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:info/3.'}) end; - _ -> - case is_lingering_stream(State, StreamRef) of + error -> + case is_lingering_stream(State, StreamID) of true -> ok; false -> cowboy:log(warning, "Received message ~p for unknown stream ~p.", - [Msg, StreamRef], Opts) + [Msg, StreamID], Opts) end, State end. %% Stream handler commands. -commands(State, _, []) -> - State; +commands(State, Stream, []) -> + stream_store(State, Stream); %% Error responses are sent only if a response wasn't sent already. -commands(State=#state{http3_machine=HTTP3Machine}, StreamRef, +commands(State=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID}, [{error_response, StatusCode, Headers, Body}|Tail]) -> - %% @todo -% case cow_http2_machine:get_stream_local_state(StreamRef, HTTP2Machine) of -% {ok, idle, _} -> - commands(State, StreamRef, [{response, StatusCode, Headers, Body}|Tail]); -% _ -> -% commands(State, StreamRef, Tail) -% end; + case cow_http3_machine:get_stream_local_state(StreamID, HTTP3Machine) of + {ok, idle} -> + commands(State, Stream, [{response, StatusCode, Headers, Body}|Tail]); + _ -> + commands(State, Stream, Tail) + end; %% Send an informational response. -commands(State0, StreamRef, [{inform, StatusCode, Headers}|Tail]) -> - State = send_headers(State0, StreamRef, idle, StatusCode, Headers), - commands(State, StreamRef, Tail); +commands(State0, Stream, [{inform, StatusCode, Headers}|Tail]) -> + State = send_headers(State0, Stream, idle, StatusCode, Headers), + commands(State, Stream, Tail); %% Send response headers. -commands(State0, StreamRef, [{response, StatusCode, Headers, Body}|Tail]) -> +commands(State0, Stream, [{response, StatusCode, Headers, Body}|Tail]) -> ct:pal("commands response ~p ~p ~p", [StatusCode, Headers, try iolist_size(Body) catch _:_ -> Body end]), - State = send_response(State0, StreamRef, StatusCode, Headers, Body), - commands(State, StreamRef, Tail); + State = send_response(State0, Stream, StatusCode, Headers, Body), + commands(State, Stream, Tail); %% Send response headers. -commands(State0, StreamRef, [{headers, StatusCode, Headers}|Tail]) -> +commands(State0, Stream, [{headers, StatusCode, Headers}|Tail]) -> ct:pal("commands headers ~p ~p", [StatusCode, Headers]), - State = send_headers(State0, StreamRef, nofin, StatusCode, Headers), - commands(State, StreamRef, Tail); + State = send_headers(State0, Stream, nofin, StatusCode, Headers), + commands(State, Stream, Tail); %%% Send a response body chunk. -commands(State0, StreamRef, [{data, IsFin, Data}|Tail]) -> +commands(State0, Stream=#stream{ref=StreamRef}, [{data, IsFin, Data}|Tail]) -> ct:pal("commands data ~p ~p", [IsFin, try iolist_size(Data) catch _:_ -> Data end]), _ = case Data of {sendfile, Offset, Bytes, Path} -> @@ -493,13 +542,15 @@ commands(State0, StreamRef, [{data, IsFin, Data}|Tail]) -> _ -> {ok, _} = quicer:send(StreamRef, cow_http3:data(Data), send_flag(IsFin)) end, - State = maybe_send_is_fin(State0, StreamRef, IsFin), - commands(State, StreamRef, Tail); + State = maybe_send_is_fin(State0, Stream, IsFin), + commands(State, Stream, Tail); %%% Send trailers. -commands(State=#state{http3_machine=HTTP3Machine0}, StreamRef, [{trailers, Trailers}|Tail]) -> +commands(State=#state{http3_machine=HTTP3Machine0}, + Stream=#stream{id=StreamID, ref=StreamRef}, + [{trailers, Trailers}|Tail]) -> ct:pal("commands trailers ~p", [Trailers]), HTTP3Machine = case cow_http3_machine:prepare_trailers( - StreamRef, HTTP3Machine0, maps:to_list(Trailers)) of + StreamID, HTTP3Machine0, maps:to_list(Trailers)) of {trailers, HeaderBlock, _EncData, HTTP3Machine1} -> ct:pal("trailers"), %% @todo EncData!! @@ -510,7 +561,7 @@ commands(State=#state{http3_machine=HTTP3Machine0}, StreamRef, [{trailers, Trail {ok, _} = quicer:send(StreamRef, cow_http3:data(<<>>), send_flag(fin)), HTTP3Machine1 end, - commands(State#state{http3_machine=HTTP3Machine}, StreamRef, Tail); + commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail); %% Send a push promise. %% %% @todo Responses sent as a result of a push_promise request @@ -519,7 +570,7 @@ commands(State=#state{http3_machine=HTTP3Machine0}, StreamRef, [{trailers, Trail %% @todo We should not send push_promise frames when we are %% in the closing http2_status. %commands(State0=#state{socket=Socket, transport=Transport, http3_machine=HTTP3Machine0}, -% StreamRef, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) -> +% Stream, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) -> % Authority = case {Scheme, Port} of % {<<"http">>, 80} -> Host; % {<<"https">>, 443} -> Host; @@ -539,75 +590,77 @@ commands(State=#state{http3_machine=HTTP3Machine0}, StreamRef, [{trailers, Trail % %% create the Req object, as it expects them to be flat. % Headers = maps:to_list(maps:map(fun(_, V) -> iolist_to_binary(V) end, Headers0)), % %% @todo -% State = case cow_http2_machine:prepare_push_promise(StreamRef, HTTP3Machine0, +% State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP3Machine0, % PseudoHeaders, Headers) of -% {ok, PromisedStreamRef, HeaderBlock, HTTP3Machine} -> +% {ok, PromisedStreamID, HeaderBlock, HTTP3Machine} -> % Transport:send(Socket, cow_http2:push_promise( -% StreamRef, PromisedStreamRef, HeaderBlock)), +% StreamID, PromisedStreamID, HeaderBlock)), % headers_frame(State0#state{http3_machine=HTTP2Machine}, -% PromisedStreamRef, fin, Headers, PseudoHeaders, 0); +% PromisedStreamID, fin, Headers, PseudoHeaders, 0); % {error, no_push} -> % State0 % end, -% commands(State, StreamRef, Tail); +% commands(State, Stream, Tail); %%% Read the request body. -%commands(State0=#state{flow=Flow, streams=Streams}, StreamRef, [{flow, Size}|Tail]) -> -commands(State, StreamRef, [{flow, _Size}|Tail]) -> +%commands(State0=#state{flow=Flow, streams=Streams}, Stream, [{flow, Size}|Tail]) -> +commands(State, Stream, [{flow, _Size}|Tail]) -> %% @todo We should tell the QUIC stream to increase its window size. -% #{StreamRef := Stream=#stream{flow=StreamFlow}} = Streams, +% #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams, % State = update_window(State0#state{flow=Flow + Size, -% streams=Streams#{StreamRef => Stream#stream{flow=StreamFlow + Size}}}, -% StreamRef), - commands(State, StreamRef, Tail); +% streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}}, +% StreamID), + commands(State, Stream, Tail); %% Supervise a child process. -commands(State=#state{children=Children}, StreamRef, [{spawn, Pid, Shutdown}|Tail]) -> - commands(State#state{children=cowboy_children:up(Children, Pid, StreamRef, Shutdown)}, - StreamRef, Tail); +commands(State=#state{children=Children}, Stream=#stream{id=StreamID}, + [{spawn, Pid, Shutdown}|Tail]) -> + commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)}, + Stream, Tail); %% Error handling. -%commands(State, StreamRef, [Error = {internal_error, _, _}|_Tail]) -> -% %% @todo Do we want to run the commands after an internal_error? -% %% @todo Do we even allow commands after? -% %% @todo Only reset when the stream still exists. -% reset_stream(State, StreamRef, Error); +commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) -> + %% @todo Do we want to run the commands after an internal_error? + %% @todo Do we even allow commands after? + %% @todo Only reset when the stream still exists. + reset_stream(State, Stream, Error); %% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself. %commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade}, -% StreamRef, [{switch_protocol, Headers, ?MODULE, _}|Tail]) -> +% Stream, [{switch_protocol, Headers, ?MODULE, _}|Tail]) -> % %% @todo This 101 response needs to be passed through stream handlers. % Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))), -% commands(State, StreamRef, Tail); +% commands(State, Stream, Tail); %% Use a different protocol within the stream (CONNECT :protocol). %% @todo Make sure we error out when the feature is disabled. -%commands(State0, StreamRef, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) -> -% State = info(State0, StreamRef, {headers, 200, Headers}), -% commands(State, StreamRef, Tail); +%commands(State0, Stream, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) -> +% State = info(State0, Stream, {headers, 200, Headers}), +% commands(State, Stream, Tail); %% Set options dynamically. -commands(State, StreamRef, [{set_options, _Opts}|Tail]) -> - commands(State, StreamRef, Tail); -commands(State, StreamRef, [stop|_Tail]) -> +commands(State, Stream, [{set_options, _Opts}|Tail]) -> + commands(State, Stream, Tail); +commands(State, Stream, [stop|_Tail]) -> ct:pal("stop"), %% @todo Do we want to run the commands after a stop? %% @todo Do we even allow commands after? - stop_stream(State, StreamRef); + stop_stream(stream_store(State, Stream), Stream); %% Log event. -commands(State=#state{opts=Opts}, StreamRef, [Log={log, _, _, _}|Tail]) -> +commands(State=#state{opts=Opts}, Stream, [Log={log, _, _, _}|Tail]) -> cowboy:log(Log, Opts), - commands(State, StreamRef, Tail). + commands(State, Stream, Tail). -send_response(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, StatusCode, Headers, Body) -> +send_response(State0=#state{http3_machine=HTTP3Machine0}, + Stream=#stream{id=StreamID, ref=StreamRef}, StatusCode, Headers, Body) -> Size = case Body of {sendfile, _, Bytes0, _} -> Bytes0; _ -> iolist_size(Body) end, case Size of 0 -> - State = send_headers(State0, StreamRef, fin, StatusCode, Headers), - maybe_send_is_fin(State, StreamRef, fin); + State = send_headers(State0, Stream, fin, StatusCode, Headers), + maybe_send_is_fin(State, Stream, fin); _ -> %% @todo Add a test for HEAD to make sure we don't send the body when %% returning {response...} from a stream handler (or {headers...} then {data...}). %% @todo We must send EncData! {ok, _IsFin, HeaderBlock, _EncData, HTTP3Machine} - = cow_http3_machine:prepare_headers(StreamRef, HTTP3Machine0, nofin, + = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, nofin, #{status => cow_http:status_to_integer(StatusCode)}, headers_to_list(Headers)), %% @todo It might be better to do async sends. @@ -624,12 +677,13 @@ send_response(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, StatusCode, cow_http3:data(Body) ], send_flag(fin)) end, - maybe_send_is_fin(State0#state{http3_machine=HTTP3Machine}, StreamRef, fin) + maybe_send_is_fin(State0#state{http3_machine=HTTP3Machine}, Stream, fin) end. -maybe_send_is_fin(State=#state{http3_machine=HTTP3Machine0}, StreamRef, fin) -> - HTTP3Machine = cow_http3_machine:close_bidi_stream_for_sending(StreamRef, HTTP3Machine0), - State#state{http3_machine=HTTP3Machine}; +maybe_send_is_fin(State=#state{http3_machine=HTTP3Machine0}, + Stream=#stream{id=StreamID}, fin) -> + HTTP3Machine = cow_http3_machine:close_bidi_stream_for_sending(StreamID, HTTP3Machine0), + maybe_terminate_stream(State#state{http3_machine=HTTP3Machine}, Stream); maybe_send_is_fin(State, _, _) -> State. @@ -641,9 +695,9 @@ send(StreamRef, IoData) -> end. send_headers(State=#state{http3_machine=HTTP3Machine0}, - StreamRef, IsFin0, StatusCode, Headers) -> + #stream{id=StreamID, ref=StreamRef}, IsFin0, StatusCode, Headers) -> {ok, IsFin, HeaderBlock, _EncData, HTTP3Machine} - = cow_http3_machine:prepare_headers(StreamRef, HTTP3Machine0, IsFin0, + = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, IsFin0, #{status => cow_http:status_to_integer(StatusCode)}, headers_to_list(Headers)), {ok, _} = quicer:send(StreamRef, cow_http3:headers(HeaderBlock), send_flag(IsFin)), @@ -660,7 +714,8 @@ headers_to_list(Headers) -> send_flag(nofin) -> ?QUIC_SEND_FLAG_NONE; send_flag(fin) -> ?QUIC_SEND_FLAG_FIN. -reset_stream(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, Error) -> +reset_stream(State0=#state{http3_machine=HTTP3Machine0}, + Stream=#stream{id=StreamID, ref=StreamRef}, Error) -> Reason = case Error of {internal_error, _, _} -> h3_internal_error; {stream_error, Reason0, _} -> Reason0 @@ -669,11 +724,11 @@ reset_stream(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, Error) -> %% @todo Should we close the send side if the receive side was already closed? quicer:shutdown_stream(StreamRef, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, cow_http3:error_to_code(Reason), infinity), - State1 = case cow_http3_machine:reset_stream(StreamRef, HTTP3Machine0) of + State1 = case cow_http3_machine:reset_stream(StreamID, HTTP3Machine0) of {ok, HTTP3Machine} -> - terminate_stream(State0#state{http3_machine=HTTP3Machine}, StreamRef, Error); + terminate_stream(State0#state{http3_machine=HTTP3Machine}, Stream, Error); {error, not_found} -> - terminate_stream(State0, StreamRef, Error) + terminate_stream(State0, Stream, Error) end, %% @todo % case reset_rate(State1) of @@ -685,55 +740,58 @@ reset_stream(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, Error) -> % end. State1. -stop_stream(State0=#state{http3_machine=HTTP3Machine, streams=Streams}, StreamRef) -> - #{StreamRef := Stream} = Streams, +stop_stream(State0=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID}) -> %% We abort reading when stopping the stream but only %% if the client was not finished sending data. - State = case cow_http3_machine:get_stream_remote_state(StreamRef, HTTP3Machine) of + %% We mark the stream as 'stopping' either way. + State = case cow_http3_machine:get_stream_remote_state(StreamID, HTTP3Machine) of {ok, fin} -> - State0; + stream_store(State0, Stream#stream{status=stopping}); _ -> stream_abort_receive(State0, Stream, h3_no_error) end, %% Then we may need to send a response or terminate it %% if the stream handler did not do so already. - case cow_http3_machine:get_stream_local_state(StreamRef, HTTP3Machine) of + case cow_http3_machine:get_stream_local_state(StreamID, HTTP3Machine) of %% When the stream terminates normally (without resetting the stream) %% and no response was sent, we need to send a proper response back to the client. {ok, idle} -> - info(State, StreamRef, {response, 204, #{}, <<>>}); + info(State, StreamID, {response, 204, #{}, <<>>}); %% When a response was sent but not terminated, we need to close the stream. %% We send a final DATA frame to complete the stream. {ok, nofin} -> ct:pal("error nofin"), - info(State, StreamRef, {data, fin, <<>>}); + info(State, StreamID, {data, fin, <<>>}); %% When a response was sent fully we can terminate the stream, %% regardless of the stream being in half-closed or closed state. _ -> - terminate_stream(State, StreamRef, h3_no_error) + terminate_stream(State, Stream, normal) end. -terminate_stream(State=#state{streams=Streams0, children=Children0}, StreamRef, Reason) -> - case maps:take(StreamRef, Streams0) of - {#stream{state=StreamState}, Streams} -> - terminate_stream_handler(State, StreamRef, Reason, StreamState), - Children = cowboy_children:shutdown(Children0, StreamRef), - stream_linger(State#state{streams=Streams, children=Children}, StreamRef); - error -> - State - end. +maybe_terminate_stream(State, Stream=#stream{status=stopping}) -> + terminate_stream(State, Stream, normal); +%% The Stream will be stored in the State at the end of commands processing. +maybe_terminate_stream(State, _) -> + State. + +terminate_stream(State=#state{streams=Streams0, children=Children0}, + #stream{id=StreamID, state=StreamState}, Reason) -> + Streams = maps:remove(StreamID, Streams0), + terminate_stream_handler(State, StreamID, Reason, StreamState), + Children = cowboy_children:shutdown(Children0, StreamID), + stream_linger(State#state{streams=Streams, children=Children}, StreamID). -terminate_stream_handler(#state{opts=Opts}, StreamRef, Reason, StreamState) -> +terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) -> try - cowboy_stream:terminate(StreamRef, Reason, StreamState) + cowboy_stream:terminate(StreamID, Reason, StreamState) catch Class:Exception:Stacktrace -> cowboy:log(cowboy_stream:make_error_log(terminate, - [StreamRef, Reason, StreamState], + [StreamID, Reason, StreamState], Class, Exception, Stacktrace), Opts) end. -ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{ref=StreamRef}) -> - case cow_http3_machine:ignored_frame(StreamRef, HTTP3Machine0) of +ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{id=StreamID}) -> + case cow_http3_machine:ignored_frame(StreamID, HTTP3Machine0) of {ok, HTTP3Machine} -> State#state{http3_machine=HTTP3Machine}; {error, Error={connection_error, _, _}, HTTP3Machine} -> @@ -743,7 +801,7 @@ ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{ref=StreamRef}) stream_abort_receive(State, Stream=#stream{ref=StreamRef}, Reason) -> quicer:shutdown_stream(StreamRef, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, cow_http3:error_to_code(Reason), infinity), - stream_update(State, Stream#stream{status=discard}). + stream_store(State, Stream#stream{status=stopping}). %% @todo Graceful connection shutdown. %% We terminate the connection immediately if it hasn't fully been initialized. @@ -785,39 +843,42 @@ terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reas +stream_get(#state{streams=Streams}, StreamID) -> + maps:get(StreamID, Streams, error). -stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams}, StreamRef, Flags) -> +stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams}, + StreamID, StreamRef, Flags) -> {HTTP3Machine, Status} = case quicer:is_unidirectional(Flags) of true -> - {cow_http3_machine:init_unidi_stream(StreamRef, unidi_remote, HTTP3Machine0), + {cow_http3_machine:init_unidi_stream(StreamID, unidi_remote, HTTP3Machine0), header}; false -> - {cow_http3_machine:init_bidi_stream(StreamRef, HTTP3Machine0), + {cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0), normal} end, - Stream = #stream{ref=StreamRef, status=Status}, + Stream = #stream{id=StreamID, ref=StreamRef, status=Status}, % ct:pal("new stream ~p ~p", [Stream, HTTP3Machine]), - State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamRef => Stream}}. + State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}. stream_closed(State=#state{http3_machine=HTTP3Machine0, streams=Streams0}, - StreamRef, _Flags) -> - case cow_http3_machine:close_stream(StreamRef, HTTP3Machine0) of + StreamID, _Flags) -> + case cow_http3_machine:close_stream(StreamID, HTTP3Machine0) of {ok, HTTP3Machine} -> %% @todo Some streams may not be bidi or remote. - Streams = maps:remove(StreamRef, Streams0), - %% @todo terminate stream + Streams = maps:remove(StreamID, Streams0), + %% @todo terminate stream if necessary State#state{http3_machine=HTTP3Machine, streams=Streams}; {error, Error={connection_error, _, _}, HTTP3Machine} -> terminate(State#state{http3_machine=HTTP3Machine}, Error) end. -stream_update(State=#state{streams=Streams}, Stream=#stream{ref=StreamRef}) -> - State#state{streams=Streams#{StreamRef => Stream}}. +stream_store(State=#state{streams=Streams}, Stream=#stream{id=StreamID}) -> + State#state{streams=Streams#{StreamID => Stream}}. -stream_linger(State=#state{lingering_streams=Lingering0}, StreamRef) -> +stream_linger(State=#state{lingering_streams=Lingering0}, StreamID) -> %% We only keep up to 100 streams in this state. @todo Make it configurable? - Lingering = [StreamRef|lists:sublist(Lingering0, 100 - 1)], + Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)], State#state{lingering_streams=Lingering}. -is_lingering_stream(#state{lingering_streams=Lingering}, StreamRef) -> - lists:member(StreamRef, Lingering). +is_lingering_stream(#state{lingering_streams=Lingering}, StreamID) -> + lists:member(StreamID, Lingering). diff --git a/test/compress_SUITE.erl b/test/compress_SUITE.erl index a25c427e5..2454203f6 100644 --- a/test/compress_SUITE.erl +++ b/test/compress_SUITE.erl @@ -28,6 +28,7 @@ all() -> {group, https_compress}, {group, h2_compress}, {group, h2c_compress} + %% @todo h3_compress ]. groups() -> diff --git a/test/metrics_SUITE.erl b/test/metrics_SUITE.erl index 74a259f20..6c974d179 100644 --- a/test/metrics_SUITE.erl +++ b/test/metrics_SUITE.erl @@ -44,6 +44,8 @@ init_per_group(Name = h2, Config) -> init_per_group(Name = h2c, Config) -> Config1 = cowboy_test:init_http(Name, init_plain_opts(Config), Config), lists:keyreplace(protocol, 1, Config1, {protocol, http2}); +init_per_group(Name = h3, Config) -> + cowboy_test:init_http3(Name, init_plain_opts(Config), Config); init_per_group(Name = http_compress, Config) -> cowboy_test:init_http(Name, init_compress_opts(Config), Config); init_per_group(Name = https_compress, Config) -> @@ -157,16 +159,24 @@ do_get(Path, UserData, Config) -> #{ ref := _, pid := From, - streamid := 1, - reason := normal, + streamid := StreamID, + reason := normal, %% @todo Getting h3_no_error here. req := #{}, informational := [], user_data := UserData } = Metrics, + do_check_streamid(StreamID, Config), %% All good! gun:close(ConnPid) end. +do_check_streamid(StreamID, Config) -> + case config(protocol, Config) of + http -> 1 = StreamID; + http2 -> 1 = StreamID; + http3 -> 0 = StreamID + end. + post_body(Config) -> doc("Confirm metrics are correct for a normal POST request."), %% Perform a POST request. @@ -218,12 +228,13 @@ post_body(Config) -> #{ ref := _, pid := From, - streamid := 1, + streamid := StreamID, reason := normal, req := #{}, informational := [], user_data := #{} } = Metrics, + do_check_streamid(StreamID, Config), %% All good! gun:close(ConnPid) end. @@ -273,12 +284,13 @@ no_resp_body(Config) -> #{ ref := _, pid := From, - streamid := 1, + streamid := StreamID, reason := normal, req := #{}, informational := [], user_data := #{} } = Metrics, + do_check_streamid(StreamID, Config), %% All good! gun:close(ConnPid) end. @@ -291,7 +303,8 @@ early_error(Config) -> %% reason in both protocols. {Method, Headers, Status, Error} = case config(protocol, Config) of http -> {<<"GET">>, [{<<"host">>, <<"host:port">>}], 400, protocol_error}; - http2 -> {<<"TRACE">>, [], 501, no_error} + http2 -> {<<"TRACE">>, [], 501, no_error}; + http3 -> {<<"TRACE">>, [], 501, h3_no_error} end, Ref = gun:request(ConnPid, Method, "/", [ {<<"accept-encoding">>, <<"gzip">>}, @@ -305,7 +318,7 @@ early_error(Config) -> #{ ref := _, pid := From, - streamid := 1, + streamid := StreamID, reason := {stream_error, Error, _}, partial_req := #{}, resp_status := Status, @@ -313,6 +326,7 @@ early_error(Config) -> early_error_time := _, resp_body_length := 0 } = Metrics, + do_check_streamid(StreamID, Config), ExpectedRespHeaders = maps:from_list(RespHeaders), %% All good! gun:close(ConnPid) @@ -321,7 +335,8 @@ early_error(Config) -> early_error_request_line(Config) -> case config(protocol, Config) of http -> do_early_error_request_line(Config); - http2 -> doc("There are no request lines in HTTP/2.") + http2 -> doc("There are no request lines in HTTP/2."); + http3 -> doc("There are no request lines in HTTP/3.") end. do_early_error_request_line(Config) -> @@ -341,7 +356,7 @@ do_early_error_request_line(Config) -> #{ ref := _, pid := From, - streamid := 1, + streamid := StreamID, reason := {connection_error, protocol_error, _}, partial_req := #{}, resp_status := 400, @@ -349,6 +364,7 @@ do_early_error_request_line(Config) -> early_error_time := _, resp_body_length := 0 } = Metrics, + do_check_streamid(StreamID, Config), ExpectedRespHeaders = maps:from_list(RespHeaders), %% All good! ok @@ -405,7 +421,7 @@ do_ws(Config) -> #{ ref := _, pid := From, - streamid := 1, + streamid := StreamID, reason := switch_protocol, req := #{}, %% A 101 upgrade response was sent. @@ -420,6 +436,7 @@ do_ws(Config) -> }], user_data := #{} } = Metrics, + do_check_streamid(StreamID, Config), %% All good! ok end, @@ -476,12 +493,13 @@ error_response(Config) -> #{ ref := _, pid := From, - streamid := 1, + streamid := StreamID, reason := {internal_error, {'EXIT', _Pid, {crash, _StackTrace}}, 'Stream process crashed.'}, req := #{}, informational := [], user_data := #{} } = Metrics, + do_check_streamid(StreamID, Config), %% All good! gun:close(ConnPid) end. @@ -533,12 +551,13 @@ error_response_after_reply(Config) -> #{ ref := _, pid := From, - streamid := 1, + streamid := StreamID, reason := {internal_error, {'EXIT', _Pid, {crash, _StackTrace}}, 'Stream process crashed.'}, req := #{}, informational := [], user_data := #{} } = Metrics, + do_check_streamid(StreamID, Config), %% All good! gun:close(ConnPid) end. diff --git a/test/rfc9114_SUITE.erl b/test/rfc9114_SUITE.erl index 668b872f2..1f4624dee 100644 --- a/test/rfc9114_SUITE.erl +++ b/test/rfc9114_SUITE.erl @@ -162,70 +162,6 @@ req_stream_two_requests(Config) -> #{reason := h3_message_error} = do_wait_stream_aborted(StreamRef), ok. -req_stream_two_requests_sequential(Config) -> - doc("Receipt of multiple requests on a single stream must " - "be rejected with an H3_MESSAGE_ERROR stream error. " - "(RFC9114 4.1, RFC9114 4.1.2)"), - {ok, Conn} = quicer:connect("localhost", config(port, Config), - #{alpn => ["h3"], verify => none}, 5000), - %% To make sure the connection is fully established we wait - %% to receive the SETTINGS frame on the control stream. - {ok, ControlRef, _Settings} = do_wait_settings(Conn), - %% Send a first request. - {ok, StreamRef} = quicer:start_stream(Conn, #{}), - {ok, EncodedRequest1, _EncData1, EncSt0} = cow_qpack:encode_field_section([ - {<<":method">>, <<"GET">>}, - {<<":scheme">>, <<"https">>}, - {<<":authority">>, <<"localhost">>}, - {<<":path">>, <<"/">>}, - {<<"content-length">>, <<"0">>} - ], 0, cow_qpack:init()), - {ok, _} = quicer:send(StreamRef, [ - <<1>>, %% HEADERS frame. - cow_http3:encode_int(iolist_size(EncodedRequest1)), - EncodedRequest1 - ]), - %% Receive the response. - {ok, Data} = do_receive_data(StreamRef), - {HLenEnc, HLenBits} = do_guess_int_encoding(Data), - << - 1, %% HEADERS frame. - HLenEnc:2, HLen:HLenBits, - EncodedResponse:HLen/bytes, - Rest/bits - >> = Data, - {ok, DecodedResponse, _DecData, _DecSt} - = cow_qpack:decode_field_section(EncodedResponse, 0, cow_qpack:init()), - #{ - <<":status">> := <<"200">>, - <<"content-length">> := BodyLen - } = maps:from_list(DecodedResponse), - {DLenEnc, DLenBits} = do_guess_int_encoding(Data), - << - 0, %% DATA frame. - DLenEnc:2, DLen:DLenBits, - Body:DLen/bytes - >> = Rest, - <<"Hello world!">> = Body, - BodyLen = integer_to_binary(byte_size(Body)), - ok = do_wait_peer_send_shutdown(StreamRef), - %% Send a second request. - {ok, EncodedRequest2, _EncData2, _EncSt} = cow_qpack:encode_field_section([ - {<<":method">>, <<"GET">>}, - {<<":scheme">>, <<"https">>}, - {<<":authority">>, <<"localhost">>}, - {<<":path">>, <<"/">>}, - {<<"content-length">>, <<"0">>} - ], 0, EncSt0), - {ok, _} = quicer:send(StreamRef, [ - <<1>>, %% HEADERS frame. - cow_http3:encode_int(iolist_size(EncodedRequest2)), - EncodedRequest2 - ]), - %% The stream should have been aborted. - #{reason := h3_message_error} = do_wait_stream_aborted(StreamRef), - ok. - headers_then_trailers(Config) -> doc("Receipt of HEADERS followed by trailer HEADERS must be accepted. (RFC9114 4.1)"), #{conn := Conn} = do_connect(Config),