Skip to content

Commit

Permalink
Merge pull request #55 from qzhuyan/dev/william/conn-close-when-owner…
Browse files Browse the repository at this point in the history
…-dead

- immediate close conn when the owner is dead
- API: conn/stream shutdown with Flags/Reason
  • Loading branch information
qzhuyan authored Aug 24, 2021
2 parents ac9deff + 506134c commit 348c08a
Show file tree
Hide file tree
Showing 15 changed files with 494 additions and 86 deletions.
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,20 @@ quicer:connection(Hostname, Port, Options, Timeout) ->
{ok, Connection} | {error, any()} | {error, any(), ErrorCode::integer()}.
```

### close_connection
### Close_connection

``` erlang
quicer:close_connection(Connection) -> ok.
quicer:close_connection(Connection, Timeout) -> ok.
quicer:close_connection(Connection, Flag, Reason) -> ok.
quicer:close_connection(Connection, Flag, Reason, Timeout) -> ok.

Flag :: ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE | ?QUIC_CONNECTION_SHUTDOWN_FLAG_SILENT.
```

Gracefully Shutdown connection.
Shutdown connection with app specific reason, it also implicitly shuts down the streams.

`QUIC_CONNECTION_SHUTDOWN_FLAG_SILENT` is used for lowmem scenarios without sending a connection_close frame to the peer.

## Stream API

Expand Down Expand Up @@ -253,10 +260,15 @@ note, the requested Len cannot exceeed the stream recv window size of connection
### Shutdown stream

``` erlang
quicer:close_stream(Stream) -> ok.
quicer:close_stream(Stream) -> ok | {error, any()}.
quicer:close_stream(Stream, Timeout) -> ok | {error, any()}.
quicer:close_stream(Stream, Flags, Reason, Timeout) -> ok | {error, any()}.
```
Shutdown stream with an app specific reason (integer) indicate to the peer as the reason for the shutdown.

Use flags to control of the behavior of shutdown, check ?QUIC_STREAM_SHUTDOWN_FLAG_* in =quicer.hrl= for more.

Shutdown stream gracefully.
note, could return error if wrong combination of flags are set.

### Get/Set Connection/Stream Opts

Expand Down
24 changes: 19 additions & 5 deletions c_src/quicer_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ _IRQL_requires_max_(DISPATCH_LEVEL)
BOOLEAN is_destroy = FALSE;

enif_mutex_lock(c_ctx->lock);
TP_CB_3(event, Connection, Event->Type);
switch (Event->Type)
{
case QUIC_CONNECTION_EVENT_CONNECTED:
Expand All @@ -150,6 +151,7 @@ _IRQL_requires_max_(DISPATCH_LEVEL)
ATOM_CONNECTED,
enif_make_resource(env, c_ctx))))
{
TP_CB_3(app_down, Connection, Event->Type);
enif_mutex_unlock(c_ctx->lock);
return QUIC_STATUS_INTERNAL_ERROR;
}
Expand Down Expand Up @@ -281,6 +283,7 @@ ServerConnectionCallback(HQUIC Connection,
BOOLEAN is_destroy = FALSE;

enif_mutex_lock(c_ctx->lock);
TP_CB_3(event, Connection, Event->Type);
switch (Event->Type)
{
case QUIC_CONNECTION_EVENT_CONNECTED:
Expand Down Expand Up @@ -361,6 +364,9 @@ ServerConnectionCallback(HQUIC Connection,
// The connection has completed the shutdown process and is ready to be
// safely cleaned up.
//
TP_CB_3(shutdown_complete,
Connection,
Event->SHUTDOWN_COMPLETE.AppCloseInProgress);
report = enif_make_tuple3(
env, ATOM_QUIC, ATOM_CLOSED, enif_make_resource(env, c_ctx));

Expand Down Expand Up @@ -587,23 +593,31 @@ async_accept2(ErlNifEnv *env,

//@todo, shutdown with error
ERL_NIF_TERM
close_connection1(ErlNifEnv *env,
close_connection3(ErlNifEnv *env,
__unused_parm__ int argc,
const ERL_NIF_TERM argv[])
{
QuicerConnCTX *c_ctx;
uint32_t app_errcode = 0, flags = 0;
if (!enif_get_resource(env, argv[0], ctx_connection_t, (void **)&c_ctx))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

if (!enif_get_uint(env, argv[1], &flags))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

if (!enif_get_uint(env, argv[2], &app_errcode))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}
enif_mutex_lock(c_ctx->lock);
if (!c_ctx->is_closed)
{
c_ctx->is_closed = TRUE;
MsQuic->ConnectionShutdown(c_ctx->Connection,
//@todo, check rfc for the error code
QUIC_CONNECTION_SHUTDOWN_FLAG_NONE,
0);
MsQuic->ConnectionShutdown(c_ctx->Connection, flags, app_errcode);
}
enif_mutex_unlock(c_ctx->lock);
return ATOM_OK;
Expand Down
2 changes: 1 addition & 1 deletion c_src/quicer_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async_connect3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM
async_accept2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM
close_connection1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
close_connection3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM sockname1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

QUIC_STATUS ServerConnectionCallback(HQUIC Connection,
Expand Down
7 changes: 6 additions & 1 deletion c_src/quicer_ctx.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ init_c_ctx()
void
destroy_c_ctx(QuicerConnCTX *c_ctx)
{
// Since enif_release_resource is async call,
// we should demon the owner now!
enif_demonitor_process(c_ctx->env, c_ctx, &c_ctx->owner_mon);
enif_release_resource(c_ctx);
}

Expand All @@ -82,7 +85,9 @@ init_s_ctx()
void
destroy_s_ctx(QuicerStreamCTX *s_ctx)
{
// note, see resource_stream_dealloc_callback
// Since enif_release_resource is async call,
// we should demon the owner now!
enif_demonitor_process(s_ctx->env, s_ctx, &s_ctx->owner_mon);
enif_release_resource(s_ctx);
}

Expand Down
36 changes: 25 additions & 11 deletions c_src/quicer_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -513,42 +513,55 @@ resource_listener_down_callback(__unused_parm__ ErlNifEnv *caller_env,
}

void
resource_conn_dealloc_callback(__unused_parm__ ErlNifEnv *caller_env,
void *obj)
resource_conn_dealloc_callback(__unused_parm__ ErlNifEnv *env, void *obj)
{
QuicerConnCTX *c_ctx = (QuicerConnCTX *)obj;
TP_CB_3(start, c_ctx->Connection, 0);
enif_demonitor_process(c_ctx->env, c_ctx, &c_ctx->owner_mon);
AcceptorQueueDestroy(c_ctx->acceptor_queue);
enif_free_env(c_ctx->env);
enif_mutex_destroy(c_ctx->lock);
CXPLAT_FREE(c_ctx->TlsSecrets, QUICER_TLS_SECRETS);
CXPLAT_FREE(c_ctx->ssl_keylogfile, QUICER_TRACE);
AcceptorDestroy(c_ctx->owner);
TP_CB_3(end, c_ctx->Connection, 0);
}

void
resource_conn_down_callback(__unused_parm__ ErlNifEnv *caller_env,
__unused_parm__ void *obj,
resource_conn_down_callback(__unused_parm__ ErlNifEnv *env,
void *ctx,
__unused_parm__ ErlNifPid *pid,
__unused_parm__ ErlNifMonitor *mon)
{
// todo
QuicerConnCTX *c_ctx = ctx;
if (!ctx)
{
return;
}
else
{
TP_CB_3(start, c_ctx->Connection, (uint64_t)ctx);
MsQuic->ConnectionShutdown(
c_ctx->Connection, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0);
TP_CB_3(end, c_ctx->Connection, (uint64_t)ctx);
}
}

void
resource_stream_dealloc_callback(__unused_parm__ ErlNifEnv *caller_env,
void *obj)
resource_stream_dealloc_callback(__unused_parm__ ErlNifEnv *env, void *obj)
{
QuicerStreamCTX *s_ctx = (QuicerStreamCTX *)obj;
TP_CB_3(start, s_ctx->Stream, 0);
enif_mutex_lock(s_ctx->lock);
enif_free_env(s_ctx->env);
enif_mutex_unlock(s_ctx->lock);
enif_mutex_destroy(s_ctx->lock);
TP_CB_3(end, s_ctx->Stream, 0);
}

void
resource_stream_down_callback(__unused_parm__ ErlNifEnv *env,
__unused_parm__ void *ctx,
void *ctx,
__unused_parm__ ErlNifPid *pid,
__unused_parm__ ErlNifMonitor *mon)
{
Expand Down Expand Up @@ -858,7 +871,7 @@ connection_controlling_process(ErlNifEnv *env,
const ErlNifPid *caller,
const ERL_NIF_TERM *pid)
{

TP_NIF_3(enter, c_ctx->Connection, (uint64_t)&c_ctx);
if (0 != enif_compare_pids(&c_ctx->owner->Pid, caller))
{
return ERROR_TUPLE_2(ATOM_NOT_OWNER);
Expand All @@ -879,6 +892,7 @@ connection_controlling_process(ErlNifEnv *env,
return ERROR_TUPLE_2(ATOM_OWNER_DEAD);
}

TP_NIF_3(exit, c_ctx->Connection, (uint64_t)&c_ctx);
return ATOM_OK;
}

Expand Down Expand Up @@ -928,12 +942,12 @@ static ErlNifFunc nif_funcs[] = {
{ "async_connect", 3, async_connect3, 0},
{ "async_accept", 2, async_accept2, 0},
{ "async_handshake", 1, async_handshake_1, 0},
{ "async_close_connection", 1, close_connection1, 0},
{ "async_close_connection", 3, close_connection3, 0},
{ "async_accept_stream", 2, async_accept_stream2, 0},
{ "start_stream", 2, async_start_stream2, 0},
{ "send", 3, send3, 0},
{ "recv", 2, recv2, 0},
{ "async_close_stream", 1, close_stream1, 0},
{ "async_close_stream", 3, close_stream3, 0},
{ "sockname", 1, sockname1, 0},
{ "getopt", 3, getopt3, 0},
{ "setopt", 3, setopt3, 0},
Expand Down
20 changes: 16 additions & 4 deletions c_src/quicer_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ ServerStreamCallback(HQUIC Stream, void *Context, QUIC_STREAM_EVENT *Event)
//
// The peer aborted its send direction of the stream.
//
TP_CB_3(peer_send_aborted, Stream, Event->PEER_SEND_ABORTED.ErrorCode);
report = enif_make_tuple4(
env,
ATOM_QUIC,
Expand Down Expand Up @@ -593,26 +594,37 @@ recv2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[])
}

ERL_NIF_TERM
close_stream1(ErlNifEnv *env,
close_stream3(ErlNifEnv *env,
__unused_parm__ int argc,
const ERL_NIF_TERM argv[])
{
QUIC_STATUS Status;
ERL_NIF_TERM ret = ATOM_OK;
QuicerStreamCTX *s_ctx;
uint32_t app_errcode = 0, flags = 0;
if (!enif_get_resource(env, argv[0], ctx_stream_t, (void **)&s_ctx))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

// only check type, actual flag will be validated by msquic
if (!enif_get_uint(env, argv[1], &flags))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

if (!enif_get_uint(env, argv[2], &app_errcode))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}
//@todo support application specific error code.
// we don't use trylock since we are in NIF call.
enif_mutex_lock(s_ctx->lock);
enif_keep_resource(s_ctx);
if (!s_ctx->is_closed)
{
if (QUIC_FAILED(
Status = MsQuic->StreamShutdown(
s_ctx->Stream, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0)))
if (QUIC_FAILED(Status = MsQuic->StreamShutdown(
s_ctx->Stream, flags, app_errcode)))
{
ret = ERROR_TUPLE_2(ETERM_INT(Status));
}
Expand Down
2 changes: 1 addition & 1 deletion c_src/quicer_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ERL_NIF_TERM send3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM recv2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

ERL_NIF_TERM
close_stream1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
close_stream3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

_IRQL_requires_max_(DISPATCH_LEVEL)
_Function_class_(QUIC_STREAM_CALLBACK) QUIC_STATUS QUIC_API
Expand Down
17 changes: 17 additions & 0 deletions test/quicer_nif_macro.hrl → include/quicer.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
%% limitations under the License.
%%--------------------------------------------------------------------

-ifndef(QUICER_HRL).
-define(QUICER_HRL, true).

%%% ========================================
%%% mirror macro from NIF code
Expand Down Expand Up @@ -51,3 +53,18 @@
-define(QUIC_CONNECTION_EVENT_RESUMED , 13).
-define(QUIC_CONNECTION_EVENT_RESUMPTION_TICKET_RECEIVED , 14).
-define(QUIC_CONNECTION_EVENT_PEER_CERTIFICATE_RECEIVED , 15).


%% STREAM SHUTDOWN FLAGS
-define(QUIC_STREAM_SHUTDOWN_FLAG_NONE , 0).
-define(QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL , 1). % Cleanly closes the send path.
-define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT_SEND , 2). % Abruptly closes the send path.
-define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE , 4). % Abruptly closes the receive path.
-define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT , 6). % Abruptly closes both send and receive paths.
-define(QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE , 8).


%% CONNECTED SHUTDOWN FLAGS
-define(QUIC_CONNECTION_SHUTDOWN_FLAG_NONE , 0).
-define(QUIC_CONNECTION_SHUTDOWN_FLAG_SILENT , 1).
-endif. %% QUICER_HRL
Loading

0 comments on commit 348c08a

Please sign in to comment.