From 9f47774dc11f651db02f2a8ac38a42b1bb500726 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 28 Sep 2023 15:38:27 +0200 Subject: [PATCH] WIP req_SUITE is green for h3 Some stuff skipped for now (peer cert, push). --- src/cow_http3_machine.erl | 68 +++++++++++++- src/cowboy_http3.erl | 183 +++++++++++++++++++++++++++++--------- test/req_SUITE.erl | 36 +++++--- 3 files changed, 231 insertions(+), 56 deletions(-) diff --git a/src/cow_http3_machine.erl b/src/cow_http3_machine.erl index 7036cdad2..54cce0124 100644 --- a/src/cow_http3_machine.erl +++ b/src/cow_http3_machine.erl @@ -20,11 +20,15 @@ -export([set_unidi_remote_stream_type/3]). -export([init_bidi_stream/2]). -export([init_bidi_stream/3]). +-export([close_bidi_stream_for_sending/2]). -export([close_stream/2]). -export([frame/4]). -export([ignored_frame/2]). -export([prepare_headers/5]). +-export([prepare_trailers/3]). -export([reset_stream/2]). +-export([get_stream_local_state/2]). +-export([get_stream_remote_state/2]). -record(stream, { ref :: any(), %% @todo specs @@ -131,6 +135,12 @@ init_bidi_stream(StreamRef, Method, State=#http3_machine{streams=Streams}) -> %% @todo set_bidi_method? +-spec close_bidi_stream_for_sending(_, _) -> _. %% @todo + +close_bidi_stream_for_sending(StreamRef, State=#http3_machine{streams=Streams}) -> + #{StreamRef := Stream} = Streams, + stream_store(Stream#stream{local=fin}, State). + -spec close_stream(_, _) -> _. %% @todo close_stream(StreamRef, State=#http3_machine{streams=Streams0}) -> @@ -660,6 +670,27 @@ merge_pseudo_headers(PseudoHeaders, Headers0) -> [{iolist_to_binary([$:, atom_to_binary(Name, latin1)]), Value}|Acc] end, Headers0, maps:to_list(PseudoHeaders)). +-spec prepare_trailers(_, _, _) -> todo. + +prepare_trailers(StreamRef, State=#http3_machine{encode_state=EncodeState0}, Trailers) -> + Stream = #stream{local=nofin, te=TE0} = stream_get(StreamRef, State), + TE = try cow_http_hd:parse_te(TE0) of + {trailers, []} -> trailers; + _ -> no_trailers + catch _:_ -> + %% If we can't parse the TE header, assume we can't send trailers. + no_trailers + end, + case TE of + trailers -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + {ok, HeaderBlock, EncData, EncodeState} = cow_qpack:encode_field_section(Trailers, StreamID, EncodeState0), + {trailers, HeaderBlock, EncData, stream_store(Stream#stream{local=fin}, + State#http3_machine{encode_state=EncodeState})}; + no_trailers -> + {no_trailers, stream_store(Stream#stream{local=fin}, State)} + end. + %% Public interface to reset streams. -spec reset_stream(_, _) -> todo. @@ -672,15 +703,44 @@ reset_stream(StreamRef, State=#http3_machine{streams=Streams0}) -> {error, not_found} end. +%% Retrieve the local state for a stream. + +-spec get_stream_local_state(_, _) -> todo. + +get_stream_local_state(StreamRef, State) -> + case stream_get(StreamRef, State) of + #stream{local=IsFin} -> + {ok, IsFin}; + %% Stream may never have been opened, or could have + %% already been closed. + undefined -> + {error, not_found} + end. + +%% Retrieve the remote state for a stream. + +-spec get_stream_remote_state(_, _) -> todo. + +get_stream_remote_state(StreamRef, State) -> + case stream_get(StreamRef, State) of + #stream{remote=IsFin} -> + {ok, IsFin}; + %% Stream may never have been opened, or could have + %% already been closed. + undefined -> + {error, not_found} + end. + %% Stream-related functions. stream_get(StreamRef, #http3_machine{streams=Streams}) -> maps:get(StreamRef, Streams, undefined). -stream_store(#stream{ref=StreamRef, local=fin, remote=fin}, - State=#http3_machine{streams=Streams0}) -> - Streams = maps:remove(StreamRef, Streams0), - State#http3_machine{streams=Streams}; +%stream_store(#stream{ref=StreamRef, local=fin, remote=fin}, +% State=#http3_machine{streams=Streams0}) -> +% %% @todo We may want to wait for the QUIC message about stream closure. +% Streams = maps:remove(StreamRef, Streams0), +% State#http3_machine{streams=Streams}; stream_store(Stream=#stream{ref=StreamRef}, State=#http3_machine{streams=Streams}) -> State#http3_machine{streams=Streams#{StreamRef => Stream}}. diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index 45d8098b6..9038f8687 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -24,9 +24,14 @@ -export([init/3]). +%% Temporary callback to do sendfile over QUIC. +-export([send/2]). + -include_lib("quicer/include/quicer.hrl"). -record(stream, { + %% @todo We shouldn't use the QUIC reference because it is a NIF object + %% and if it's stored somewhere by the user it'll never get GC. ref :: any(), %% @todo specs %% Whether the stream is currently in a special state. @@ -143,8 +148,11 @@ loop(State0=#state{conn=Conn}) -> %% Messages pertaining to a stream. {{Pid, StreamRef}, Msg} when Pid =:= self() -> loop(info(State0, StreamRef, Msg)); - _Msg -> -% ct:pal("msg ~p", [Msg]), + %% Exit signal from children. + Msg = {'EXIT', Pid, _} -> + loop(down(State0, Pid, Msg)); + Msg -> + ct:pal("cowboy msg ~p", [Msg]), loop(State0) end. @@ -192,6 +200,7 @@ parse1(State, Data, Stream=#stream{ref=StreamRef}, Props) -> case cow_http3:parse(Data) of {ok, Frame, Rest} -> IsFin = is_fin(Props, Rest), +% ct:pal("parse1 Frame= ~p Rest= ~p", [Frame, Rest]), parse(frame(State, Stream, Frame, IsFin), Rest, StreamRef, Props); {more, Frame, Len} -> IsFin = is_fin(Props, <<>>), @@ -252,6 +261,7 @@ parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0}, end. frame(State=#state{http3_machine=HTTP3Machine0}, Stream=#stream{ref=StreamRef}, Frame, IsFin) -> +% ct:pal("cowboy frame ~p ~p", [Frame, IsFin]), case cow_http3_machine:frame(Frame, IsFin, StreamRef, HTTP3Machine0) of {ok, HTTP3Machine} -> State#state{http3_machine=HTTP3Machine}; @@ -323,7 +333,7 @@ headers_frame_parse_host(State=#state{peer=Peer, sock=Sock}, streamid => StreamRef, peer => Peer, sock => Sock, -% cert => Cert, %% @todo + cert => undefined, %Cert, %% @todo method => Method, scheme => Scheme, host => Host, @@ -351,6 +361,7 @@ headers_frame_parse_host(State=#state{peer=Peer, sock=Sock}, end. %% @todo Copied from cowboy_http2. +%% @todo Remove "http"? Probably. ensure_port(<<"http">>, undefined) -> 80; ensure_port(<<"https">>, undefined) -> 443; ensure_port(_, Port) -> Port. @@ -374,10 +385,11 @@ headers_to_map([{Name, Value}|Tail], Acc0) -> headers_frame(State=#state{opts=Opts, streams=Streams}, Stream=#stream{ref=StreamRef}, Req) -> +ct:pal("req ~p", [Req]), try cowboy_stream:init(StreamRef, Req, Opts) of {Commands, StreamState} -> -logger:error("~p", [Commands]), -logger:error("~p", [StreamState]), +%logger:error("~p", [Commands]), +%logger:error("~p", [StreamState]), commands(State#state{ streams=Streams#{StreamRef => Stream#stream{state=StreamState}}}, StreamRef, Commands) @@ -389,14 +401,38 @@ logger:error("~p", [StreamState]), 'Unhandled exception in cowboy_stream:init/3.'}) end. +%% Erlang messages. + +down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) -> + State = case cowboy_children:down(Children0, Pid) of + %% The stream was terminated already. + {ok, undefined, Children} -> + State0#state{children=Children}; + %% The stream is still running. + {ok, StreamRef, Children} -> + info(State0#state{children=Children}, StreamRef, Msg); + %% The process was unknown. + error -> + cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n", + [Msg, Pid], Opts), + State0 + end, + if +%% @todo +% State#state.http2_status =:= closing, State#state.streams =:= #{} -> +% terminate(State, {stop, normal, 'The connection is going away.'}); + true -> + State + end. + info(State=#state{opts=Opts, http3_machine=_HTTP3Machine, streams=Streams}, StreamRef, Msg) -> -logger:error("~p", [Msg]), +%ct:pal("INFO ~p", [Msg]), case Streams of #{StreamRef := Stream=#stream{state=StreamState0}} -> try cowboy_stream:info(StreamRef, Msg, StreamState0) of {Commands, StreamState} -> -logger:error("~p", [Commands]), -logger:error("~p ~p", [StreamRef, Streams]), +%ct:pal("~p", [Commands]), +%logger:error("~p ~p", [StreamRef, Streams]), commands(State#state{streams=Streams#{StreamRef => Stream#stream{state=StreamState}}}, StreamRef, Commands) catch Class:Exception:Stacktrace -> @@ -422,35 +458,59 @@ logger:error("~p ~p", [StreamRef, Streams]), commands(State, _, []) -> State; %% Error responses are sent only if a response wasn't sent already. -%commands(State=#state{http3_machine=HTTP3Machine}, StreamRef, -% [{error_response, StatusCode, Headers, Body}|Tail]) -> -% %% @todo +commands(State=#state{http3_machine=HTTP3Machine}, StreamRef, + [{error_response, StatusCode, Headers, Body}|Tail]) -> + %% @todo % case cow_http2_machine:get_stream_local_state(StreamRef, HTTP2Machine) of % {ok, idle, _} -> -% commands(State, StreamRef, [{response, StatusCode, Headers, Body}|Tail]); + commands(State, StreamRef, [{response, StatusCode, Headers, Body}|Tail]); % _ -> % commands(State, StreamRef, Tail) % end; %% Send an informational response. -%commands(State0, StreamRef, [{inform, StatusCode, Headers}|Tail]) -> -% State = send_headers(State0, StreamRef, idle, StatusCode, Headers), -% commands(State, StreamRef, Tail); +commands(State0, StreamRef, [{inform, StatusCode, Headers}|Tail]) -> + State = send_headers(State0, StreamRef, idle, StatusCode, Headers), + commands(State, StreamRef, Tail); %% Send response headers. commands(State0, StreamRef, [{response, StatusCode, Headers, Body}|Tail]) -> + ct:pal("commands response ~p ~p ~p", [StatusCode, Headers, try iolist_size(Body) catch _:_ -> Body end]), State = send_response(State0, StreamRef, StatusCode, Headers, Body), commands(State, StreamRef, Tail); %% Send response headers. -%commands(State0, StreamRef, [{headers, StatusCode, Headers}|Tail]) -> -% State = send_headers(State0, StreamRef, nofin, StatusCode, Headers), -% commands(State, StreamRef, Tail); +commands(State0, StreamRef, [{headers, StatusCode, Headers}|Tail]) -> + ct:pal("commands headers ~p ~p", [StatusCode, Headers]), + State = send_headers(State0, StreamRef, nofin, StatusCode, Headers), + commands(State, StreamRef, Tail); %%% Send a response body chunk. -%commands(State0, StreamRef, [{data, IsFin, Data}|Tail]) -> -% State = maybe_send_data(State0, StreamRef, IsFin, Data, []), -% commands(State, StreamRef, Tail); +commands(State0, StreamRef, [{data, IsFin, Data}|Tail]) -> + ct:pal("commands data ~p ~p", [IsFin, try iolist_size(Data) catch _:_ -> Data end]), + _ = case Data of + {sendfile, Offset, Bytes, Path} -> + %% Temporary solution to do sendfile over QUIC. + {ok, _} = ranch_transport:sendfile(?MODULE, StreamRef, + Path, Offset, Bytes, []), + {ok, _} = quicer:send(StreamRef, cow_http3:data(<<>>), send_flag(IsFin)); + _ -> + {ok, _} = quicer:send(StreamRef, cow_http3:data(Data), send_flag(IsFin)) + end, + State = maybe_send_is_fin(State0, StreamRef, IsFin), + commands(State, StreamRef, Tail); %%% Send trailers. -%commands(State0, StreamRef, [{trailers, Trailers}|Tail]) -> -% State = maybe_send_data(State0, StreamRef, fin, {trailers, maps:to_list(Trailers)}, []), -% commands(State, StreamRef, Tail); +commands(State=#state{http3_machine=HTTP3Machine0}, StreamRef, [{trailers, Trailers}|Tail]) -> + ct:pal("commands trailers ~p", [Trailers]), + HTTP3Machine = case cow_http3_machine:prepare_trailers( + StreamRef, HTTP3Machine0, maps:to_list(Trailers)) of + {trailers, HeaderBlock, _EncData, HTTP3Machine1} -> + ct:pal("trailers"), + %% @todo EncData!! + {ok, _} = quicer:send(StreamRef, cow_http3:headers(HeaderBlock), send_flag(fin)), + HTTP3Machine1; + {no_trailers, HTTP3Machine1} -> + ct:pal("no_trailers"), + {ok, _} = quicer:send(StreamRef, cow_http3:data(<<>>), send_flag(fin)), + HTTP3Machine1 + end, + commands(State#state{http3_machine=HTTP3Machine}, StreamRef, Tail); %% Send a push promise. %% %% @todo Responses sent as a result of a push_promise request @@ -524,6 +584,7 @@ commands(State=#state{children=Children}, StreamRef, [{spawn, Pid, Shutdown}|Tai commands(State, StreamRef, [{set_options, _Opts}|Tail]) -> commands(State, StreamRef, Tail); commands(State, StreamRef, [stop|_Tail]) -> + ct:pal("stop"), %% @todo Do we want to run the commands after a stop? %% @todo Do we even allow commands after? stop_stream(State, StreamRef); @@ -534,13 +595,13 @@ commands(State=#state{opts=Opts}, StreamRef, [Log={log, _, _, _}|Tail]) -> send_response(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, StatusCode, Headers, Body) -> Size = case Body of - {sendfile, _, Bytes, _} -> Bytes; + {sendfile, _, Bytes0, _} -> Bytes0; _ -> iolist_size(Body) end, case Size of 0 -> State = send_headers(State0, StreamRef, fin, StatusCode, Headers), - maybe_terminate_stream(State, StreamRef, fin); + maybe_send_is_fin(State, StreamRef, fin); _ -> %% @todo Add a test for HEAD to make sure we don't send the body when %% returning {response...} from a stream handler (or {headers...} then {data...}). @@ -550,12 +611,33 @@ send_response(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, StatusCode, #{status => cow_http:status_to_integer(StatusCode)}, headers_to_list(Headers)), %% @todo It might be better to do async sends. - {ok, _} = quicer:send(StreamRef, [ - cow_http3:headers(HeaderBlock), - cow_http3:data(Body) - ], send_flag(fin)), - State0#state{http3_machine=HTTP3Machine} - %% @todo maybe_terminate_stream (see maybe_send_data for how to handle it) + _ = case Body of + {sendfile, Offset, Bytes, Path} -> + {ok, _} = quicer:send(StreamRef, cow_http3:headers(HeaderBlock)), + %% Temporary solution to do sendfile over QUIC. + {ok, _} = ranch_transport:sendfile(?MODULE, StreamRef, + Path, Offset, Bytes, []), + {ok, _} = quicer:send(StreamRef, cow_http3:data(<<>>), send_flag(fin)); + _ -> + {ok, _} = quicer:send(StreamRef, [ + cow_http3:headers(HeaderBlock), + cow_http3:data(Body) + ], send_flag(fin)) + end, + maybe_send_is_fin(State0#state{http3_machine=HTTP3Machine}, StreamRef, fin) + end. + +maybe_send_is_fin(State=#state{http3_machine=HTTP3Machine0}, StreamRef, fin) -> + HTTP3Machine = cow_http3_machine:close_bidi_stream_for_sending(StreamRef, HTTP3Machine0), + State#state{http3_machine=HTTP3Machine}; +maybe_send_is_fin(State, _, _) -> + State. + +%% Temporary callback to do sendfile over QUIC. +send(StreamRef, IoData) -> + case quicer:send(StreamRef, cow_http3:data(IoData)) of + {ok, _} -> ok; + Error -> Error end. send_headers(State=#state{http3_machine=HTTP3Machine0}, @@ -603,6 +685,34 @@ reset_stream(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, Error) -> % end. State1. +stop_stream(State0=#state{http3_machine=HTTP3Machine, streams=Streams}, StreamRef) -> + #{StreamRef := Stream} = Streams, + %% We abort reading when stopping the stream but only + %% if the client was not finished sending data. + State = case cow_http3_machine:get_stream_remote_state(StreamRef, HTTP3Machine) of + {ok, fin} -> + State0; + _ -> + stream_abort_receive(State0, Stream, h3_no_error) + end, + %% Then we may need to send a response or terminate it + %% if the stream handler did not do so already. + case cow_http3_machine:get_stream_local_state(StreamRef, HTTP3Machine) of + %% When the stream terminates normally (without resetting the stream) + %% and no response was sent, we need to send a proper response back to the client. + {ok, idle} -> + info(State, StreamRef, {response, 204, #{}, <<>>}); + %% When a response was sent but not terminated, we need to close the stream. + %% We send a final DATA frame to complete the stream. + {ok, nofin} -> + ct:pal("error nofin"), + info(State, StreamRef, {data, fin, <<>>}); + %% When a response was sent fully we can terminate the stream, + %% regardless of the stream being in half-closed or closed state. + _ -> + terminate_stream(State, StreamRef, h3_no_error) + end. + terminate_stream(State=#state{streams=Streams0, children=Children0}, StreamRef, Reason) -> case maps:take(StreamRef, Streams0) of {#stream{state=StreamState}, Streams} -> @@ -622,15 +732,6 @@ terminate_stream_handler(#state{opts=Opts}, StreamRef, Reason, StreamState) -> Class, Exception, Stacktrace), Opts) end. - - -stop_stream(_, _) -> - error(todo). - -%% @todo Maybe do this on stream close instead. -maybe_terminate_stream(State, _, _) -> - State. - ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{ref=StreamRef}) -> case cow_http3_machine:ignored_frame(StreamRef, HTTP3Machine0) of {ok, HTTP3Machine} -> diff --git a/test/req_SUITE.erl b/test/req_SUITE.erl index 352b2a041..c31862280 100644 --- a/test/req_SUITE.erl +++ b/test/req_SUITE.erl @@ -181,7 +181,8 @@ bindings(Config) -> cert(Config) -> case config(type, Config) of tcp -> doc("TLS certificates can only be provided over TLS."); - ssl -> do_cert(Config) + ssl -> do_cert(Config); + quic -> {skip, "Implement using quicer:peercert/1."} end. do_cert(Config0) -> @@ -377,7 +378,8 @@ port(Config) -> Port = do_get_body("/direct/port", Config), ExpectedPort = case config(type, Config) of tcp -> <<"80">>; - ssl -> <<"443">> + ssl -> <<"443">>; + quic -> <<"443">> end, ExpectedPort = do_get_body("/port", [{<<"host">>, <<"localhost">>}], Config), ExpectedPort = do_get_body("/direct/port", [{<<"host">>, <<"localhost">>}], Config), @@ -403,7 +405,8 @@ do_scheme(Path, Config) -> Transport = config(type, Config), case do_get_body(Path, Config) of <<"http">> when Transport =:= tcp -> ok; - <<"https">> when Transport =:= ssl -> ok + <<"https">> when Transport =:= ssl -> ok; + <<"https">> when Transport =:= quic -> ok end. sock(Config) -> @@ -416,7 +419,8 @@ uri(Config) -> doc("Request URI building/modification."), Scheme = case config(type, Config) of tcp -> <<"http">>; - ssl -> <<"https">> + ssl -> <<"https">>; + quic -> <<"https">> end, SLen = byte_size(Scheme), Port = integer_to_binary(config(port, Config)), @@ -450,7 +454,8 @@ do_version(Path, Config) -> Protocol = config(protocol, Config), case do_get_body(Path, Config) of <<"HTTP/1.1">> when Protocol =:= http -> ok; - <<"HTTP/2">> when Protocol =:= http2 -> ok + <<"HTTP/2">> when Protocol =:= http2 -> ok; + <<"HTTP/3">> when Protocol =:= http3 -> ok end. %% Tests: Request body. @@ -598,6 +603,8 @@ do_read_urlencoded_body_too_long(Path, Body, Config) -> %% 408 error responses should close HTTP/1.1 connections. {_, <<"close">>} = lists:keyfind(<<"connection">>, 1, RespHeaders); http2 -> + ok; + http3 -> ok end, gun:close(ConnPid). @@ -1008,8 +1015,11 @@ stream_body_content_length_nofin_error(Config) -> end end; http2 -> - %% @todo HTTP2 should have the same content-length checks - ok + %% @todo HTTP/2 should have the same content-length checks. + {skip, "Implement the test for HTTP/2."}; + http3 -> + %% @todo HTTP/3 should have the same content-length checks. + {skip, "Implement the test for HTTP/3."} end. stream_body_concurrent(Config) -> @@ -1127,26 +1137,30 @@ do_trailers(Path, Config) -> push(Config) -> case config(protocol, Config) of http -> do_push_http("/resp/push", Config); - http2 -> do_push_http2(Config) + http2 -> do_push_http2(Config); + http3 -> {skip, "Implement server push for HTTP/3."} end. push_method(Config) -> case config(protocol, Config) of http -> do_push_http("/resp/push/method", Config); - http2 -> do_push_http2_method(Config) + http2 -> do_push_http2_method(Config); + http3 -> {skip, "Implement server push for HTTP/3."} end. push_origin(Config) -> case config(protocol, Config) of http -> do_push_http("/resp/push/origin", Config); - http2 -> do_push_http2_origin(Config) + http2 -> do_push_http2_origin(Config); + http3 -> {skip, "Implement server push for HTTP/3."} end. push_qs(Config) -> case config(protocol, Config) of http -> do_push_http("/resp/push/qs", Config); - http2 -> do_push_http2_qs(Config) + http2 -> do_push_http2_qs(Config); + http3 -> {skip, "Implement server push for HTTP/3."} end. do_push_http(Path, Config) ->