Skip to content

Commit

Permalink
Progress
Browse files Browse the repository at this point in the history
  • Loading branch information
essen committed Oct 19, 2023
1 parent f4bc8d5 commit 00eb7a2
Show file tree
Hide file tree
Showing 20 changed files with 210 additions and 113 deletions.
2 changes: 1 addition & 1 deletion ebin/cowboy.app
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
{applications, [kernel,stdlib,crypto,cowlib,ranch,quicer]},
{mod, {cowboy_app, []}},
{env, []}
]}.
]}.
11 changes: 7 additions & 4 deletions src/cowboy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,13 @@ start_quic(TransOpts, ProtoOpts) ->
{peer_unidi_stream_count, 3}, %% We only need control and QPACK enc/dec.
{peer_bidi_stream_count, 100}
|SocketOpts0],
{ok, Listen} = quicer:listen(Port, SocketOpts),
ListenerPid = spawn(fun AcceptLoop() ->
{ok, Conn} = quicer:accept(Listen, []),
{ok, Listener} = quicer:listen(Port, SocketOpts),
ct:pal("listen ~p", [Listener]),
_ListenerPid = spawn(fun AcceptLoop() ->
{ok, Conn} = quicer:accept(Listener, []),
ct:pal("accept ~p", [Conn]),
{ok, Conn} = quicer:handshake(Conn),
ct:pal("handshake ~p", [Conn]),
Pid = spawn(fun() ->
receive go -> ok end,
process_flag(trap_exit, true), %% @todo Only if supervisor though.
Expand All @@ -94,7 +97,7 @@ start_quic(TransOpts, ProtoOpts) ->
Pid ! go,
AcceptLoop()
end),
{ok, ListenerPid}.
{ok, Listener}.

-spec start_quic_test() -> ok.
start_quic_test() ->
Expand Down
62 changes: 47 additions & 15 deletions src/cowboy_http3.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
buffer = <<>> :: binary(),

%% Stream state.
state :: {module, any()}
state = undefined :: undefined | {module, any()}
}).

-record(state, {
Expand Down Expand Up @@ -75,6 +75,7 @@

-spec init(_, _, _) -> no_return().
init(Parent, Conn, Opts) ->
ct:pal("init"),
{ok, SettingsBin, HTTP3Machine0} = cow_http3_machine:init(server, Opts),
%% Immediately open a control, encoder and decoder stream.
{ok, ControlRef} = quicer:start_stream(Conn,
Expand Down Expand Up @@ -112,7 +113,8 @@ init(Parent, Conn, Opts) ->
'A socket error occurred when retrieving the sock name.'})
end.

loop(State0=#state{conn=Conn}) ->
loop(State0=#state{conn=Conn, children=Children}) ->
%ct:pal("~p", [process_info(self(), messages)]),
receive
%% Stream data.
%% @todo IsFin is inside Props. But it may not be set once the data was sent.
Expand Down Expand Up @@ -156,6 +158,10 @@ loop(State0=#state{conn=Conn}) ->
%% QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE
{quic, send_shutdown_complete, _StreamRef, _IsGraceful} ->
loop(State0);
%% Timeouts.
{timeout, Ref, {shutdown, Pid}} ->
cowboy_children:shutdown_timeout(Children, Ref, Pid),
loop(State0);
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
loop(info(State0, StreamID, Msg));
Expand Down Expand Up @@ -477,7 +483,7 @@ down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
end.

info(State=#state{opts=Opts, http3_machine=_HTTP3Machine}, StreamID, Msg) ->
%ct:pal("INFO ~p", [Msg]),
%ct:pal("INFO ~p ~p ~p", [State, StreamID, Msg]),
case stream_get(State, StreamID) of
Stream=#stream{state=StreamState0} ->
try cowboy_stream:info(StreamID, Msg, StreamState0) of
Expand Down Expand Up @@ -522,17 +528,17 @@ commands(State0, Stream, [{inform, StatusCode, Headers}|Tail]) ->
commands(State, Stream, Tail);
%% Send response headers.
commands(State0, Stream, [{response, StatusCode, Headers, Body}|Tail]) ->
ct:pal("commands response ~p ~p ~p", [StatusCode, Headers, try iolist_size(Body) catch _:_ -> Body end]),
% ct:pal("commands response ~p ~p ~p", [StatusCode, Headers, try iolist_size(Body) catch _:_ -> Body end]),
State = send_response(State0, Stream, StatusCode, Headers, Body),
commands(State, Stream, Tail);
%% Send response headers.
commands(State0, Stream, [{headers, StatusCode, Headers}|Tail]) ->
ct:pal("commands headers ~p ~p", [StatusCode, Headers]),
% ct:pal("commands headers ~p ~p", [StatusCode, Headers]),
State = send_headers(State0, Stream, nofin, StatusCode, Headers),
commands(State, Stream, Tail);
%%% Send a response body chunk.
commands(State0, Stream=#stream{ref=StreamRef}, [{data, IsFin, Data}|Tail]) ->
ct:pal("commands data ~p ~p", [IsFin, try iolist_size(Data) catch _:_ -> Data end]),
% ct:pal("commands data ~p ~p", [IsFin, try iolist_size(Data) catch _:_ -> Data end]),
_ = case Data of
{sendfile, Offset, Bytes, Path} ->
%% Temporary solution to do sendfile over QUIC.
Expand All @@ -548,7 +554,7 @@ commands(State0, Stream=#stream{ref=StreamRef}, [{data, IsFin, Data}|Tail]) ->
commands(State=#state{http3_machine=HTTP3Machine0},
Stream=#stream{id=StreamID, ref=StreamRef},
[{trailers, Trailers}|Tail]) ->
ct:pal("commands trailers ~p", [Trailers]),
% ct:pal("commands trailers ~p", [Trailers]),
HTTP3Machine = case cow_http3_machine:prepare_trailers(
StreamID, HTTP3Machine0, maps:to_list(Trailers)) of
{trailers, HeaderBlock, _EncData, HTTP3Machine1} ->
Expand Down Expand Up @@ -700,7 +706,7 @@ send_headers(State=#state{http3_machine=HTTP3Machine0},
= cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, IsFin0,
#{status => cow_http:status_to_integer(StatusCode)},
headers_to_list(Headers)),
{ok, _} = quicer:send(StreamRef, cow_http3:headers(HeaderBlock), send_flag(IsFin)),
quicer:send(StreamRef, cow_http3:headers(HeaderBlock), send_flag(IsFin)),
%% @todo Send _EncData.
State#state{http3_machine=HTTP3Machine}.

Expand All @@ -716,6 +722,7 @@ send_flag(fin) -> ?QUIC_SEND_FLAG_FIN.

reset_stream(State0=#state{http3_machine=HTTP3Machine0},
Stream=#stream{id=StreamID, ref=StreamRef}, Error) ->
%ct:pal("~p ~p", [Stream, Error]),
Reason = case Error of
{internal_error, _, _} -> h3_internal_error;
{stream_error, Reason0, _} -> Reason0
Expand All @@ -741,12 +748,15 @@ reset_stream(State0=#state{http3_machine=HTTP3Machine0},
State1.

stop_stream(State0=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID}) ->
%ct:pal("stop_stream ~p ~p", [State0, Stream]),
%% We abort reading when stopping the stream but only
%% if the client was not finished sending data.
%% We mark the stream as 'stopping' either way.
State = case cow_http3_machine:get_stream_remote_state(StreamID, HTTP3Machine) of
{ok, fin} ->
stream_store(State0, Stream#stream{status=stopping});
{error, not_found} ->
stream_store(State0, Stream#stream{status=stopping});
_ ->
stream_abort_receive(State0, Stream, h3_no_error)
end,
Expand All @@ -760,7 +770,7 @@ stop_stream(State0=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamI
%% When a response was sent but not terminated, we need to close the stream.
%% We send a final DATA frame to complete the stream.
{ok, nofin} ->
ct:pal("error nofin"),
% ct:pal("error nofin"),
info(State, StreamID, {data, fin, <<>>});
%% When a response was sent fully we can terminate the stream,
%% regardless of the stream being in half-closed or closed state.
Expand Down Expand Up @@ -860,14 +870,36 @@ stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
% ct:pal("new stream ~p ~p", [Stream, HTTP3Machine]),
State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}.

stream_closed(State=#state{http3_machine=HTTP3Machine0, streams=Streams0},
StreamID, _Flags) ->
stream_closed(State=#state{opts=Opts, http3_machine=HTTP3Machine0,
streams=Streams0, children=Children0}, StreamID, #{error := ErrorCode}) ->
case cow_http3_machine:close_stream(StreamID, HTTP3Machine0) of
{ok, HTTP3Machine} ->
%% @todo Some streams may not be bidi or remote.
Streams = maps:remove(StreamID, Streams0),
%% @todo terminate stream if necessary
State#state{http3_machine=HTTP3Machine, streams=Streams};
case maps:take(StreamID, Streams0) of
{#stream{state=undefined}, Streams} ->
%% Unidi stream has no handler/children.
State#state{http3_machine=HTTP3Machine, streams=Streams};
%% We only stop bidi streams if the stream was closed with an error
%% or the stream was already in the process of stopping.
{#stream{status=Status, state=StreamState}, Streams}
when Status =:= stopping; ErrorCode =/= 0 ->
terminate_stream_handler(State, StreamID, closed, StreamState),
Children = cowboy_children:shutdown(Children0, StreamID),
State#state{http3_machine=HTTP3Machine, streams=Streams, children=Children};
%% Don't remove a stream that terminated properly but
%% has chosen to remain up (custom stream handlers).
{_, _} ->
State#state{http3_machine=HTTP3Machine};
error ->
case is_lingering_stream(State, StreamID) of
true ->
ok;
false ->
%% We avoid logging the data as it could be quite large.
cowboy:log(warning, "Received stream_closed for unknown stream ~p.",
[StreamID], Opts)
end,
State
end;
{error, Error={connection_error, _, _}, HTTP3Machine} ->
terminate(State#state{http3_machine=HTTP3Machine}, Error)
end.
Expand Down
6 changes: 3 additions & 3 deletions test/compress_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ all() ->
{group, http_compress},
{group, https_compress},
{group, h2_compress},
{group, h2c_compress}
%% @todo h3_compress
{group, h2c_compress},
{group, h3_compress}
].

groups() ->
Expand All @@ -38,7 +38,7 @@ init_per_group(Name, Config) ->
cowboy_test:init_common_groups(Name, Config, ?MODULE).

end_per_group(Name, _) ->
cowboy:stop_listener(Name).
cowboy_test:stop_group(Name).

%% Routes.

Expand Down
31 changes: 24 additions & 7 deletions test/cowboy_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,23 @@ init_http3(Ref, ProtoOpts, Config) ->
++ "/rfc9114_SUITE_data",
TransOpts = #{
socket_opts => [
{cert, DataDir ++ "/server.pem"},
{key, DataDir ++ "/server.key"}
{certfile, DataDir ++ "/server.pem"},
{keyfile, DataDir ++ "/server.key"}
]
},
{ok, _} = cowboy:start_quic(TransOpts, ProtoOpts), %% @todo Ref argument.
{ok, Listener} = cowboy:start_quic(TransOpts, ProtoOpts), %% @todo Ref argument.
%% @todo Keep listener information around in a better place.
persistent_term:put({cowboy_test_quic, Ref}, Listener),
[{ref, Ref}, {type, quic}, {protocol, http3}, {port, Port}, {opts, TransOpts}|Config].

stop_group(Ref) ->
case persistent_term:get({cowboy_test_quic, Ref}, undefined) of
undefined ->
cowboy:stop_listener(Ref);
Listener ->
quicer:close_listener(Listener)
end.

%% Common group of listeners used by most suites.

common_all() ->
Expand All @@ -67,7 +77,8 @@ common_all() ->
{group, http_compress},
{group, https_compress},
{group, h2_compress},
{group, h2c_compress}
{group, h2c_compress},
{group, h3_compress}
].

common_groups(Tests) ->
Expand All @@ -84,7 +95,8 @@ common_groups(Tests) ->
{http_compress, Opts, Tests},
{https_compress, Opts, Tests},
{h2_compress, Opts, Tests},
{h2c_compress, Opts, Tests}
{h2c_compress, Opts, Tests},
{h3_compress, [], Tests} %% @todo Enable parallel when issues get fixed.
].

init_common_groups(Name = http, Config, Mod) ->
Expand Down Expand Up @@ -128,7 +140,12 @@ init_common_groups(Name = h2c_compress, Config, Mod) ->
env => #{dispatch => Mod:init_dispatch(Config)},
stream_handlers => [cowboy_compress_h, cowboy_stream_h]
}, [{flavor, compress}|Config]),
lists:keyreplace(protocol, 1, Config1, {protocol, http2}).
lists:keyreplace(protocol, 1, Config1, {protocol, http2});
init_common_groups(Name = h3_compress, Config, Mod) ->
init_http3(Name, #{
env => #{dispatch => Mod:init_dispatch(Config)},
stream_handlers => [cowboy_compress_h, cowboy_stream_h]
}, [{flavor, compress}|Config]).

%% Support functions for testing using Gun.

Expand All @@ -138,7 +155,7 @@ gun_open(Config) ->
gun_open(Config, Opts) ->
TlsOpts = case proplists:get_value(no_cert, Config, false) of
true -> [{verify, verify_none}];
false -> ct_helper:get_certs_from_ets()
false -> ct_helper:get_certs_from_ets() %% @todo Wrong in current quicer.
end,
{ok, ConnPid} = gun:open("localhost", config(port, Config), Opts#{
retry => 0,
Expand Down
2 changes: 1 addition & 1 deletion test/loop_handler_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ init_per_group(Name, Config) ->
cowboy_test:init_common_groups(Name, Config, ?MODULE).

end_per_group(Name, _) ->
cowboy:stop_listener(Name).
cowboy_test:stop_group(Name).

%% Dispatch configuration.

Expand Down
6 changes: 4 additions & 2 deletions test/metrics_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ init_per_group(Name = h2_compress, Config) ->
cowboy_test:init_http2(Name, init_compress_opts(Config), Config);
init_per_group(Name = h2c_compress, Config) ->
Config1 = cowboy_test:init_http(Name, init_compress_opts(Config), Config),
lists:keyreplace(protocol, 1, Config1, {protocol, http2}).
lists:keyreplace(protocol, 1, Config1, {protocol, http2});
init_per_group(Name = h3_compress, Config) ->
cowboy_test:init_http3(Name, init_compress_opts(Config), Config).

end_per_group(Name, _) ->
cowboy:stop_listener(Name).
cowboy_test:stop_group(Name).

init_plain_opts(Config) ->
#{
Expand Down
2 changes: 1 addition & 1 deletion test/misc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ init_per_group(Name, Config) ->
end_per_group(set_env, _) ->
ok;
end_per_group(Name, _) ->
cowboy:stop_listener(Name).
cowboy_test:stop_group(Name).

init_dispatch(_) ->
cowboy_router:compile([{"localhost", [
Expand Down
2 changes: 1 addition & 1 deletion test/plain_handler_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ init_per_group(Name, Config) ->
cowboy_test:init_common_groups(Name, Config, ?MODULE).

end_per_group(Name, _) ->
cowboy:stop_listener(Name).
cowboy_test:stop_group(Name).

%% Routes.

Expand Down
2 changes: 1 addition & 1 deletion test/req_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ init_per_group(Name, Config) ->
cowboy_test:init_common_groups(Name, Config, ?MODULE).

end_per_group(Name, _) ->
cowboy:stop_listener(Name).
cowboy_test:stop_group(Name).

%% Routes.

Expand Down
2 changes: 1 addition & 1 deletion test/rest_handler_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ init_per_group(Name, Config) ->
cowboy_test:init_common_groups(Name, Config, ?MODULE).

end_per_group(Name, _) ->
cowboy:stop_listener(Name).
cowboy_test:stop_group(Name).

%% Dispatch configuration.

Expand Down
2 changes: 1 addition & 1 deletion test/rfc6585_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ init_per_group(Name, Config) ->
cowboy_test:init_common_groups(Name, Config, ?MODULE).

end_per_group(Name, _) ->
cowboy:stop_listener(Name).
cowboy_test:stop_group(Name).

init_dispatch(_) ->
cowboy_router:compile([{"[...]", [
Expand Down
Loading

0 comments on commit 00eb7a2

Please sign in to comment.