Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl tcp unsend #52

Open
wants to merge 3 commits into
base: emqx-OTP-26.2.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions erts/emulator/drivers/common/inet_drv.c
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ static size_t my_strnlen(const char *s, size_t maxlen)
#define TCP_REQ_UNRECV 43
#define TCP_REQ_SHUTDOWN 44
#define TCP_REQ_SENDFILE 45
#define TCP_REQ_UNSEND 46
/* UDP and SCTP requests */
#define PACKET_REQ_RECV 60 /* Common for UDP and SCTP */
/* #define SCTP_REQ_LISTEN 61 MERGED Different from TCP; not for UDP */
Expand Down Expand Up @@ -11904,6 +11905,29 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd,
return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);
}

case TCP_REQ_UNSEND: {
ErlIOVec iov;
ErlDrvSizeT sz;

DDBG(INETP(desc),
("INET-DRV-DBG[%d][" SOCKET_FSTR ",%T] tcp_inet_ctl -> UNSEND\r\n",
__LINE__, desc->inet.s, driver_caller(desc->inet.port)) );
if (!IS_CONNECTED(INETP(desc)))
return ctl_error(ENOTCONN, rbuf, rsize);

sz = driver_peekqv(desc->inet.port, &iov);

if (0 == sz)
// this returns {error, ''}
return ctl_reply(INET_REP_ERROR, NULL, 0, rbuf, rsize);
else {
driver_outputv(desc->inet.port, "?unsend?", sizeof("?unsend?"), &iov, 0);
// We flush the queue, so no more writes/flush to the OS socket
sz = driver_deq(desc->inet.port, sz);
ASSERT(0 == driver_sizeq(desc->inet.port));
return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);
}
}

case TCP_REQ_SHUTDOWN: {
int how;
Expand Down
5 changes: 5 additions & 0 deletions erts/emulator/nifs/common/prim_socket_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -2405,6 +2405,7 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket')
LOCAL_ATOM_DECL(nread); \
LOCAL_ATOM_DECL(nspace); \
LOCAL_ATOM_DECL(nwrite); \
LOCAL_ATOM_DECL(outq); \
LOCAL_ATOM_DECL(null); \
LOCAL_ATOM_DECL(num_acceptors); \
LOCAL_ATOM_DECL(num_cnt_bits); \
Expand Down Expand Up @@ -4881,6 +4882,10 @@ ERL_NIF_TERM esock_supports_ioctl_requests(ErlNifEnv* env)
requests = MKC(env, MKT2(env, atom_nspace, MKUL(env, FIONSPACE)), requests);
#endif

#if defined(TIOCOUTQ)
requests = MKC(env, MKT2(env, atom_outq, MKUL(env, TIOCOUTQ)), requests);
#endif

#if defined(SIOCATMARK)
requests = MKC(env, MKT2(env, atom_atmark, MKUL(env, SIOCATMARK)), requests);
#endif
Expand Down
23 changes: 23 additions & 0 deletions erts/emulator/nifs/unix/unix_socket_syncio.c
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,13 @@ static ERL_NIF_TERM essio_ioctl_gifconf(ErlNifEnv* env,
#define IOCTL_FIONSPACE_FUNC2_DEF
#endif

/* esock_ioctl_tiocoutq */
#if defined(TIOCOUTQ)
#define IOCTL_TIOCOUTQ_FUNC2_DEF IOCTL_GET_FUNC2_DEF(siocoutq)
#else
#define IOCTL_TIOCOUTQ_FUNC2_DEF
#endif

/* esock_ioctl_siocatmark */
#if defined(SIOCATMARK)
#define IOCTL_SIOCATMARK_FUNC2_DEF IOCTL_GET_FUNC2_DEF(siocatmark)
Expand All @@ -461,6 +468,7 @@ static ERL_NIF_TERM essio_ioctl_gifconf(ErlNifEnv* env,
IOCTL_FIONREAD_FUNC2_DEF; \
IOCTL_FIONWRITE_FUNC2_DEF; \
IOCTL_FIONSPACE_FUNC2_DEF; \
IOCTL_TIOCOUTQ_FUNC2_DEF; \
IOCTL_SIOCATMARK_FUNC2_DEF;
#define IOCTL_GET_FUNC2_DEF(F) \
static ERL_NIF_TERM essio_ioctl_##F(ErlNifEnv* env, \
Expand Down Expand Up @@ -3865,6 +3873,12 @@ ERL_NIF_TERM essio_ioctl2(ErlNifEnv* env,
break;
#endif

#if defined(TIOCOUTQ)
case TIOCOUTQ:
return essio_ioctl_siocoutq(env, descP);
break;
#endif

#if defined(SIOCATMARK)
case SIOCATMARK:
return essio_ioctl_siocatmark(env, descP);
Expand Down Expand Up @@ -4193,10 +4207,19 @@ ERL_NIF_TERM essio_ioctl_gifconf(ErlNifEnv* env,
#define IOCTL_FIONSPACE_FUNC2_DECL
#endif

/* *** essio_ioctl_siocoutq *** */
#if defined(TIOCOUTQ)
#define IOCTL_TIOCOUTQ_FUNC2_DECL \
IOCTL_GET_REQUEST2_DECL(siocoutq, TIOCOUTQ, ivalue)
#else
#define IOCTL_TIOCOUTQ_FUNC2_DECL
#endif

#define IOCTL_GET_FUNCS2 \
IOCTL_FIONREAD_FUNC2_DECL \
IOCTL_FIONWRITE_FUNC2_DECL \
IOCTL_FIONSPACE_FUNC2_DECL \
IOCTL_TIOCOUTQ_FUNC2_DECL \
IOCTL_SIOCATMARK_FUNC2_DECL

#define IOCTL_GET_REQUEST2_DECL(OR, R, EF) \
Expand Down
Binary file modified erts/preloaded/ebin/prim_inet.beam
Binary file not shown.
13 changes: 13 additions & 0 deletions erts/preloaded/src/prim_inet.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
-export([send/2, send/3, sendto/4, sendmsg/3, sendfile/4]).
-export([recv/2, recv/3, async_recv/3]).
-export([unrecv/2]).
-export([unsend/2]).
-export([recvfrom/2, recvfrom/3]).
-export([setopt/3, setopts/2, getopt/2, getopts/2, is_sockopt_val/2]).
-export([chgopt/3, chgopts/2]).
Expand Down Expand Up @@ -1468,6 +1469,18 @@ unrecv(S, Data) ->

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
%% UNSEND(insock(), data) -> ok | {error, Reason}
%%
%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

unsend(S, Data) ->
case ctl_cmd(S, ?TCP_REQ_UNSEND, Data) of
{ok, _} -> ok;
{error,_}=Error -> Error
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
%% DETACH(insock()) -> ok
%%
%% unlink from a socket
Expand Down
1 change: 1 addition & 0 deletions lib/kernel/src/inet_int.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
-define(TCP_REQ_UNRECV, 43).
-define(TCP_REQ_SHUTDOWN, 44).
-define(TCP_REQ_SENDFILE, 45).
-define(TCP_REQ_UNSEND, 46).

%% UDP and SCTP requests
-define(PACKET_REQ_RECV, 60).
Expand Down
5 changes: 3 additions & 2 deletions lib/kernel/src/socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4443,7 +4443,7 @@ peername(Socket) ->
Socket :: socket(),
Reason :: posix() | 'closed';

(Socket, GetRequest :: 'nread' | 'nwrite' | 'nspace') ->
(Socket, GetRequest :: 'nread' | 'nwrite' | 'outq' | 'nspace') ->
{'ok', NumBytes :: non_neg_integer()} | {'error', Reason} when
Socket :: socket(),
Reason :: posix() | 'closed';
Expand All @@ -4458,12 +4458,13 @@ peername(Socket) ->
Socket :: socket(),
Reason :: posix() | 'closed'.

%% gifconf | nread | nwrite | nspace | atmark |
%% gifconf | nread | nwrite | nspace | outq | atmark |
%% {gifaddr, string()} | {gifindex, string()} | {gifname, integer()}
ioctl(?socket(SockRef), gifconf = GetRequest) ->
prim_socket:ioctl(SockRef, GetRequest);
ioctl(?socket(SockRef), GetRequest) when (nread =:= GetRequest) orelse
(nwrite =:= GetRequest) orelse
(outq =:= GetRequest) orelse
(nspace =:= GetRequest) ->
prim_socket:ioctl(SockRef, GetRequest);
ioctl(?socket(SockRef), GetRequest) when (atmark =:= GetRequest) ->
Expand Down
Loading