Skip to content

Commit

Permalink
More WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
essen committed Oct 11, 2023
1 parent 9f47774 commit b0b182e
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 348 deletions.
151 changes: 67 additions & 84 deletions src/cow_http3_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
-export([get_stream_remote_state/2]).

-record(stream, {
ref :: any(), %% @todo specs
id :: non_neg_integer(),
dir :: unidi_local | unidi_remote | bidi,
type :: undefined | req | control | push | encoder | decoder,

Expand Down Expand Up @@ -65,13 +65,9 @@
%% Maximum Push ID.
max_push_id = -1 :: -1 | non_neg_integer(),

%% Quick pointers for commonly used streams.
local_encoder_ref :: any(), %% @todo specs
local_decoder_ref :: any(), %% @todo specs

%% Currently active HTTP/3 streams. Streams may be initiated either
%% by the client or by the server through PUSH_PROMISE frames.
streams = #{} :: #{reference() => stream()},
streams = #{} :: #{non_neg_integer() => stream()},
%% @todo Maybe merge these two in some kind of control stream state.
has_peer_control_stream = false :: boolean(),
has_received_peer_settings = false :: boolean(),
Expand All @@ -88,63 +84,61 @@ init(Mode, _Opts) ->

-spec init_unidi_local_streams(_, _ ,_ ,_) -> _. %% @todo

init_unidi_local_streams(ControlRef, EncoderRef, DecoderRef,
init_unidi_local_streams(ControlID, EncoderID, DecoderID,
State=#http3_machine{streams=Streams}) ->
State#http3_machine{
local_encoder_ref=EncoderRef,
local_decoder_ref=DecoderRef,
streams=Streams#{
ControlRef => #stream{ref=ControlRef, dir=unidi_local, type=control},
EncoderRef => #stream{ref=EncoderRef, dir=unidi_local, type=encoder},
DecoderRef => #stream{ref=DecoderRef, dir=unidi_local, type=decoder}
ControlID => #stream{id=ControlID, dir=unidi_local, type=control},
EncoderID => #stream{id=EncoderID, dir=unidi_local, type=encoder},
DecoderID => #stream{id=DecoderID, dir=unidi_local, type=decoder}
}}.

-spec init_unidi_stream(_, _, _) -> _. %% @todo

init_unidi_stream(StreamRef, StreamDir, State=#http3_machine{streams=Streams}) ->
State#http3_machine{streams=Streams#{StreamRef => #stream{
ref=StreamRef, dir=StreamDir, type=undefined}}}.
init_unidi_stream(StreamID, StreamDir, State=#http3_machine{streams=Streams}) ->
State#http3_machine{streams=Streams#{StreamID => #stream{
id=StreamID, dir=StreamDir, type=undefined}}}.

-spec set_unidi_remote_stream_type(_, _, _) -> _. %% @todo

set_unidi_remote_stream_type(_, control, State=#http3_machine{has_peer_control_stream=true}) ->
{error, {connection_error, h3_stream_creation_error,
'There can be only one control stream; yet a second one was opened. (RFC9114 6.2.1)'},
State};
set_unidi_remote_stream_type(StreamRef, Type,
set_unidi_remote_stream_type(StreamID, Type,
State=#http3_machine{streams=Streams, has_peer_control_stream=HasControl}) ->
#{StreamRef := Stream} = Streams,
#{StreamID := Stream} = Streams,
{ok, State#http3_machine{
streams=Streams#{StreamRef => Stream#stream{type=Type}},
streams=Streams#{StreamID => Stream#stream{type=Type}},
has_peer_control_stream=HasControl orelse (Type =:= control)
}}.

-spec init_bidi_stream(_, _) -> _. %% @todo

%% All bidi streams are request/response.
init_bidi_stream(StreamRef, State=#http3_machine{streams=Streams}) ->
State#http3_machine{streams=Streams#{StreamRef => #stream{
ref=StreamRef, dir=bidi, type=req}}}.
init_bidi_stream(StreamID, State=#http3_machine{streams=Streams}) ->
State#http3_machine{streams=Streams#{StreamID => #stream{
id=StreamID, dir=bidi, type=req}}}.

-spec init_bidi_stream(_, _, _) -> _. %% @todo

%% All bidi streams are request/response.
init_bidi_stream(StreamRef, Method, State=#http3_machine{streams=Streams}) ->
State#http3_machine{streams=Streams#{StreamRef => #stream{
ref=StreamRef, dir=bidi, type=req, method=Method}}}.
init_bidi_stream(StreamID, Method, State=#http3_machine{streams=Streams}) ->
State#http3_machine{streams=Streams#{StreamID => #stream{
id=StreamID, dir=bidi, type=req, method=Method}}}.

%% @todo set_bidi_method?

-spec close_bidi_stream_for_sending(_, _) -> _. %% @todo

close_bidi_stream_for_sending(StreamRef, State=#http3_machine{streams=Streams}) ->
#{StreamRef := Stream} = Streams,
close_bidi_stream_for_sending(StreamID, State=#http3_machine{streams=Streams}) ->
#{StreamID := Stream} = Streams,
stream_store(Stream#stream{local=fin}, State).

-spec close_stream(_, _) -> _. %% @todo

close_stream(StreamRef, State=#http3_machine{streams=Streams0}) ->
case maps:take(StreamRef, Streams0) of
close_stream(StreamID, State=#http3_machine{streams=Streams0}) ->
case maps:take(StreamID, Streams0) of
{#stream{type=control}, Streams} ->
{error, {connection_error, h3_closed_critical_stream,
'The control stream was closed. (RFC9114 6.2.1)'},
Expand All @@ -155,29 +149,22 @@ close_stream(StreamRef, State=#http3_machine{streams=Streams0}) ->

-spec frame(_, _, _, _) -> _. %% @todo

frame(Frame, IsFin, StreamRef, State) ->
frame(Frame, IsFin, StreamID, State) ->
case element(1, Frame) of
data -> data_frame(Frame, IsFin, StreamRef, State);
headers -> headers_frame(Frame, IsFin, StreamRef, State);
cancel_push -> cancel_push_frame(Frame, IsFin, StreamRef, State);
settings -> settings_frame(Frame, IsFin, StreamRef, State);
push_promise -> push_promise_frame(Frame, IsFin, StreamRef, State);
goaway -> goaway_frame(Frame, IsFin, StreamRef, State);
max_push_id -> max_push_id_frame(Frame, IsFin, StreamRef, State)
data -> data_frame(Frame, IsFin, StreamID, State);
headers -> headers_frame(Frame, IsFin, StreamID, State);
cancel_push -> cancel_push_frame(Frame, IsFin, StreamID, State);
settings -> settings_frame(Frame, IsFin, StreamID, State);
push_promise -> push_promise_frame(Frame, IsFin, StreamID, State);
goaway -> goaway_frame(Frame, IsFin, StreamID, State);
max_push_id -> max_push_id_frame(Frame, IsFin, StreamID, State)
end.

%% DATA frame.

%data_frame({data, StreamID, _, _}, State=#http2_machine{mode=Mode,
% local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
% when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID))
% orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) ->
% {error, {connection_error, protocol_error,
% 'DATA frame received on a stream in idle state. (RFC7540 5.1)'},
% State};
data_frame(Frame={data, Data}, IsFin, StreamRef, State) ->
data_frame(Frame={data, Data}, IsFin, StreamID, State) ->
DataLen = byte_size(Data),
case stream_get(StreamRef, State) of
case stream_get(StreamID, State) of
Stream = #stream{type=req, remote=nofin} ->
data_frame(Frame, IsFin, Stream, State, DataLen);
#stream{type=req, remote=idle} ->
Expand Down Expand Up @@ -222,8 +209,8 @@ is_body_size_valid(_) ->

%% HEADERS frame.

headers_frame(Frame, IsFin, StreamRef, State=#http3_machine{mode=Mode}) ->
case stream_get(StreamRef, State) of
headers_frame(Frame, IsFin, StreamID, State=#http3_machine{mode=Mode}) ->
case stream_get(StreamID, State) of
%% Headers.
Stream=#stream{type=req, remote=idle} ->
headers_decode(Frame, IsFin, Stream, State, case Mode of
Expand All @@ -243,9 +230,8 @@ headers_frame(Frame, IsFin, StreamRef, State=#http3_machine{mode=Mode}) ->
end.

%% @todo Check whether connection_error or stream_error fits better.
headers_decode({headers, EncodedFieldSection}, IsFin, Stream=#stream{ref=StreamRef},
headers_decode({headers, EncodedFieldSection}, IsFin, Stream=#stream{id=StreamID},
State=#http3_machine{decode_state=DecodeState0}, Type) ->
{ok, StreamID} = quicer:get_stream_id(StreamRef),
try cow_qpack:decode_field_section(EncodedFieldSection, StreamID, DecodeState0) of
{ok, Headers, DecData, DecodeState} ->
headers_pseudo_headers(Stream,
Expand Down Expand Up @@ -492,8 +478,7 @@ headers_parse_expected_size(Stream, State, IsFin, Type, DecData,
end
end.

headers_frame(Stream0, State0=#http3_machine{local_decoder_ref=DecoderRef},
IsFin, Type, DecData, PseudoHeaders, Headers, Len) ->
headers_frame(Stream0, State0, IsFin, Type, DecData, PseudoHeaders, Headers, Len) ->
Stream = case Type of
request ->
TE = case lists:keyfind(<<"te">>, 1, Headers) of
Expand All @@ -513,10 +498,10 @@ headers_frame(Stream0, State0=#http3_machine{local_decoder_ref=DecoderRef},
<<>> ->
{ok, {headers, IsFin, Headers, PseudoHeaders, Len}, State};
_ ->
{ok, {headers, IsFin, Headers, PseudoHeaders, Len}, {DecoderRef, DecData}, State}
{ok, {headers, IsFin, Headers, PseudoHeaders, Len}, DecData, State}
end.

trailers_frame(Stream0, State0=#http3_machine{local_decoder_ref=DecoderRef}, DecData, Headers) ->
trailers_frame(Stream0, State0, DecData, Headers) ->
Stream = Stream0#stream{remote=fin},
State = stream_store(Stream, State0),
%% @todo Error out if we didn't get the full body.
Expand All @@ -526,21 +511,21 @@ trailers_frame(Stream0, State0=#http3_machine{local_decoder_ref=DecoderRef}, Dec
<<>> ->
{ok, {trailers, Headers}, State};
_ ->
{ok, {trailers, Headers}, {DecoderRef, DecData}, State}
{ok, {trailers, Headers}, DecData, State}
end.%;
% false ->
% stream_reset(StreamID, State, protocol_error,
% 'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)')
% end.

cancel_push_frame(Frame, _IsFin, StreamRef, State) ->
case stream_get(StreamRef, State) of
cancel_push_frame(Frame, _IsFin, StreamID, State) ->
case stream_get(StreamID, State) of
#stream{type=control} ->
control_frame(Frame, State)
end.

settings_frame(Frame, _IsFin, StreamRef, State) ->
case stream_get(StreamRef, State) of
settings_frame(Frame, _IsFin, StreamID, State) ->
case stream_get(StreamID, State) of
#stream{type=control} ->
control_frame(Frame, State);
#stream{type=req} ->
Expand All @@ -549,14 +534,14 @@ settings_frame(Frame, _IsFin, StreamRef, State) ->
State}
end.

push_promise_frame(Frame, _IsFin, StreamRef, State) ->
case stream_get(StreamRef, State) of
push_promise_frame(Frame, _IsFin, StreamID, State) ->
case stream_get(StreamID, State) of
#stream{type=control} ->
control_frame(Frame, State)
end.

goaway_frame(Frame, _IsFin, StreamRef, State) ->
case stream_get(StreamRef, State) of
goaway_frame(Frame, _IsFin, StreamID, State) ->
case stream_get(StreamID, State) of
#stream{type=control} ->
control_frame(Frame, State);
#stream{type=req} ->
Expand All @@ -565,8 +550,8 @@ goaway_frame(Frame, _IsFin, StreamRef, State) ->
State}
end.

max_push_id_frame(Frame, _IsFin, StreamRef, State) ->
case stream_get(StreamRef, State) of
max_push_id_frame(Frame, _IsFin, StreamID, State) ->
case stream_get(StreamID, State) of
#stream{type=control} ->
control_frame(Frame, State);
#stream{type=req} ->
Expand Down Expand Up @@ -608,8 +593,8 @@ control_frame(_Frame, State) ->

-spec ignored_frame(_, _) -> _. %% @todo

ignored_frame(StreamRef, State) ->
case stream_get(StreamRef, State) of
ignored_frame(StreamID, State) ->
case stream_get(StreamID, State) of
#stream{type=control} ->
control_frame(ignored_frame, State);
_ ->
Expand All @@ -625,10 +610,9 @@ ignored_frame(StreamRef, State) ->
% -> {ok, cow_http2:fin(), iodata(), State} when State::http2_machine().
-spec prepare_headers(_, _, _, _, _) -> todo.

prepare_headers(StreamRef, State=#http3_machine{encode_state=EncodeState0},
prepare_headers(StreamID, State=#http3_machine{encode_state=EncodeState0},
IsFin0, PseudoHeaders, Headers0) ->
{ok, StreamID} = quicer:get_stream_id(StreamRef),
Stream = #stream{method=Method, local=idle} = stream_get(StreamRef, State),
Stream = #stream{method=Method, local=idle} = stream_get(StreamID, State),
IsFin = case {IsFin0, Method} of
{idle, _} -> nofin;
{_, <<"HEAD">>} -> fin;
Expand Down Expand Up @@ -672,8 +656,8 @@ merge_pseudo_headers(PseudoHeaders, Headers0) ->

-spec prepare_trailers(_, _, _) -> todo.

prepare_trailers(StreamRef, State=#http3_machine{encode_state=EncodeState0}, Trailers) ->
Stream = #stream{local=nofin, te=TE0} = stream_get(StreamRef, State),
prepare_trailers(StreamID, State=#http3_machine{encode_state=EncodeState0}, Trailers) ->
Stream = #stream{local=nofin, te=TE0} = stream_get(StreamID, State),
TE = try cow_http_hd:parse_te(TE0) of
{trailers, []} -> trailers;
_ -> no_trailers
Expand All @@ -683,7 +667,6 @@ prepare_trailers(StreamRef, State=#http3_machine{encode_state=EncodeState0}, Tra
end,
case TE of
trailers ->
{ok, StreamID} = quicer:get_stream_id(StreamRef),
{ok, HeaderBlock, EncData, EncodeState} = cow_qpack:encode_field_section(Trailers, StreamID, EncodeState0),
{trailers, HeaderBlock, EncData, stream_store(Stream#stream{local=fin},
State#http3_machine{encode_state=EncodeState})};
Expand All @@ -695,8 +678,8 @@ prepare_trailers(StreamRef, State=#http3_machine{encode_state=EncodeState0}, Tra

-spec reset_stream(_, _) -> todo.

reset_stream(StreamRef, State=#http3_machine{streams=Streams0}) ->
case maps:take(StreamRef, Streams0) of
reset_stream(StreamID, State=#http3_machine{streams=Streams0}) ->
case maps:take(StreamID, Streams0) of
{_, Streams} ->
{ok, State#http3_machine{streams=Streams}};
error ->
Expand All @@ -707,8 +690,8 @@ reset_stream(StreamRef, State=#http3_machine{streams=Streams0}) ->

-spec get_stream_local_state(_, _) -> todo.

get_stream_local_state(StreamRef, State) ->
case stream_get(StreamRef, State) of
get_stream_local_state(StreamID, State) ->
case stream_get(StreamID, State) of
#stream{local=IsFin} ->
{ok, IsFin};
%% Stream may never have been opened, or could have
Expand All @@ -721,8 +704,8 @@ get_stream_local_state(StreamRef, State) ->

-spec get_stream_remote_state(_, _) -> todo.

get_stream_remote_state(StreamRef, State) ->
case stream_get(StreamRef, State) of
get_stream_remote_state(StreamID, State) ->
case stream_get(StreamID, State) of
#stream{remote=IsFin} ->
{ok, IsFin};
%% Stream may never have been opened, or could have
Expand All @@ -733,14 +716,14 @@ get_stream_remote_state(StreamRef, State) ->

%% Stream-related functions.

stream_get(StreamRef, #http3_machine{streams=Streams}) ->
maps:get(StreamRef, Streams, undefined).
stream_get(StreamID, #http3_machine{streams=Streams}) ->
maps:get(StreamID, Streams, undefined).

%stream_store(#stream{ref=StreamRef, local=fin, remote=fin},
%stream_store(#stream{id=StreamID, local=fin, remote=fin},
% State=#http3_machine{streams=Streams0}) ->
% %% @todo We may want to wait for the QUIC message about stream closure.
% Streams = maps:remove(StreamRef, Streams0),
% Streams = maps:remove(StreamID, Streams0),
% State#http3_machine{streams=Streams};
stream_store(Stream=#stream{ref=StreamRef},
stream_store(Stream=#stream{id=StreamID},
State=#http3_machine{streams=Streams}) ->
State#http3_machine{streams=Streams#{StreamRef => Stream}}.
State#http3_machine{streams=Streams#{StreamID => Stream}}.
Loading

0 comments on commit b0b182e

Please sign in to comment.