Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial send on blocking sockets is not available in Windows #9

Merged
merged 33 commits into from
Aug 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4633aa3
Driver: make unregister more robust
mporsch Aug 9, 2023
746cb77
Socket: catch EAGAIN during partial send
mporsch Aug 9, 2023
a70d608
SocketAsync: add reason string to disconnect handler
mporsch Aug 9, 2023
32a4df3
SocketTls: clear retry flags before we might run into an exception
mporsch Aug 9, 2023
37f1a1b
SocketAsync: unregister immediately after disconnect
mporsch Aug 9, 2023
b4190df
test: fix disconnect handler signature
mporsch Aug 10, 2023
aa6ab50
Socket: make unused virtual interface final
mporsch Aug 10, 2023
59e4ac1
examples: print detailed connect error message
mporsch Aug 10, 2023
4b4160e
SocketTls: disconnect is signalled via exception, not zero return
mporsch Aug 10, 2023
28894af
SocketTls: ignore spurious readable after handshake
mporsch Aug 10, 2023
6e955f0
Socket: make all sockets non-blocking
mporsch Aug 11, 2023
7c7577b
Socket: in Windows send() returns WSAEWOULDBLOCK
mporsch Aug 11, 2023
77285c7
style
mporsch Aug 12, 2023
7753196
Socket, SocketTls: avoid repeated set non-blocking
mporsch Aug 12, 2023
d36b55b
style
mporsch Aug 12, 2023
160b9ae
SocketTls: correctly verify SSL_write requirements
mporsch Aug 13, 2023
5e7b1e2
test: dont randomly change packet size in retry
mporsch Aug 13, 2023
06acfe5
SocketAsync: simplify SSL_write retry check
mporsch Aug 13, 2023
b807fe2
test: avoid excessively inaccurate usec sleep
mporsch Aug 13, 2023
52c9f84
Socket: use string_view to avoid pointer arithmetic
mporsch Aug 13, 2023
6c1802d
SocketAsync: add debug prints
mporsch Aug 14, 2023
8514cb9
github: disable non-OSX builds
mporsch Aug 14, 2023
93255bb
test: disable stable tests
mporsch Aug 14, 2023
1be8c55
test: fix typos
mporsch Aug 14, 2023
4d187f2
Revert "test: disable stable tests"
mporsch Aug 14, 2023
dda6531
Socket: wait before writing to socket unless the interface says other…
mporsch Aug 14, 2023
68c6011
SocketTls: implement non-waiting send/receive for Async
mporsch Aug 19, 2023
90d980f
Async: implement explicit query+housekeeping for TLS socket
mporsch Aug 19, 2023
580e156
github: re-enable disabled builds
mporsch Aug 19, 2023
ea0bb32
SocketTls: allow more handshake in one Driver iteration, revert suppr…
mporsch Aug 20, 2023
3674d3e
Async: improve socket encapsulation, cleanups
mporsch Aug 20, 2023
b60ddad
remove unused error code detail check
mporsch Aug 20, 2023
f5aebf9
Socket: documentation
mporsch Aug 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ jobs:
# which fails at link time
type: Release

# uses pre-installed OpenSSL 1.1.1u and fails sockpuppet_tls_buffered_test with 'SSL routines:ssl3_write_bytes:bad length:ssl'
#- name: "Windows MSVC17 shared library using OpenSSL"
# os: windows-2022
# generator: -G "Visual Studio 17 2022" -A x64
# type: Debug
- name: "Windows MSVC17 shared library using OpenSSL"
os: windows-2022
generator: -G "Visual Studio 17 2022" -A x64
# uses pre-installed OpenSSL 1.1.1u
type: Debug

- name: "Unix GCC shared library using LibreSSL"
os: ubuntu-latest
Expand All @@ -36,6 +36,7 @@ jobs:
- name: "Unix GCC shared library using OpenSSL"
os: ubuntu-latest
generator: -G "Unix Makefiles"
# uses pre-installed OpenSSL 3.0.2
type: Debug

- name: "OSX clang shared library using LibreSSL"
Expand All @@ -47,6 +48,7 @@ jobs:
- name: "OSX clang shared library using OpenSSL"
os: macos-latest
generator: -G "Unix Makefiles"
# uses pre-installed OpenSSL 3.1.1
type: Debug
runs-on: ${{ matrix.os }}
steps:
Expand Down
6 changes: 3 additions & 3 deletions examples/sockpuppet_chat_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ struct ReconnectClient
to_string(client->LocalAddress()) +
" -> " +
to_string(remoteAddress));
} catch(std::exception const &) {
} catch(std::exception const &e) {
ioBuf.Print(
"failed to (re)connect to " +
to_string(remoteAddress) +
" (" + e.what() + ")"
" will retry in " +
std::to_string(delay.count()) +
"s");
std::to_string(delay.count()) + "s");

// schedule a reconnect attempt with increasing backoff delay
ToDo(driver,
Expand Down
3 changes: 2 additions & 1 deletion include/sockpuppet/socket_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ using ConnectHandler = std::function<void(SocketTcp, Address)>;
/// @param Address of peer that just disconnected from local socket.
/// Matches connect address the socket was created with or
/// obtained peer address of incoming connection.
/// @param Error/reason message.
/// @note After peer disconnect the socket is invalid and should be released.
using DisconnectHandler = std::function<void(Address)>;
using DisconnectHandler = std::function<void(Address, char const *)>;

/// UDP (unreliable communication) socket class that adds an interface for
/// an external socket driver to the buffered UDP class.
Expand Down
51 changes: 35 additions & 16 deletions src/driver_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ struct FdEqual

bool operator()(SocketAsyncImpl const &async) const
{
return (async.buff->sock->fd == fd);
return (async.DriverGetFd() == fd);
}

bool operator()(pollfd const &pfd) const
Expand Down Expand Up @@ -102,7 +102,7 @@ void Driver::DriverImpl::Step(Duration timeout)
StepGuard lock(*this);

if(todos.empty()) {
StepFds(timeout);
StepSockets(timeout);
} else {
// execute due ToDos while keeping track of the time
auto remaining =
Expand All @@ -113,7 +113,7 @@ void Driver::DriverImpl::Step(Duration timeout)
StepTodos(DeadlineLimited(timeout))));

// run sockets with remaining time
StepFds(remaining);
StepSockets(remaining);
}
}

Expand Down Expand Up @@ -144,8 +144,12 @@ Duration Driver::DriverImpl::StepTodos(Deadline deadline)
return Duration(0);
}

void Driver::DriverImpl::StepFds(Duration timeout)
void Driver::DriverImpl::StepSockets(Duration timeout)
{
// query sockets whether they request/suppress write poll
// the TLS socket uses this override for the TLS handshake
QuerySockets();

if(!Wait(pfds, timeout)) {
return; // timeout exceeded
}
Expand All @@ -157,7 +161,7 @@ void Driver::DriverImpl::StepFds(Duration timeout)
} else if(pfds.front().revents != 0) {
throw std::logic_error("unexpected signalling pipe poll result");
} else {
DoOneFdTask();
DoOneSocketTask();
}
}

Expand Down Expand Up @@ -193,26 +197,25 @@ void Driver::DriverImpl::ToDoMove(ToDoShared todo, TimePoint when)
todos.Move(std::move(todo), when);
}

void Driver::DriverImpl::AsyncRegister(
SocketAsyncImpl &sock)
void Driver::DriverImpl::AsyncRegister(SocketAsyncImpl &sock)
{
PauseGuard lock(*this);

sockets.emplace_back(sock);
pfds.emplace_back(pollfd{sock.buff->sock->fd, POLLIN, 0});
pfds.emplace_back(pollfd{sock.DriverGetFd(), POLLIN, 0});
}

void Driver::DriverImpl::AsyncUnregister(SOCKET fd)
{
PauseGuard lock(*this);

auto itSocket = std::find_if(begin(sockets), end(sockets), FdEqual{fd});
assert(itSocket != end(sockets));
sockets.erase(itSocket);
if(auto it = std::find_if(begin(sockets), end(sockets), FdEqual{fd}); it != end(sockets)) {
sockets.erase(it);
}

auto itPfd = std::find_if(begin(pfds), end(pfds), FdEqual{fd});
assert(itPfd != end(pfds));
pfds.erase(itPfd);
if(auto it = std::find_if(begin(pfds), end(pfds), FdEqual{fd}); it != end(pfds)) {
pfds.erase(it);
}
}

void Driver::DriverImpl::AsyncWantSend(SOCKET fd)
Expand Down Expand Up @@ -240,21 +243,37 @@ void Driver::DriverImpl::Unbump()
(void)pipeTo.ReceiveFrom(dump, sizeof(dump));
}

void Driver::DriverImpl::DoOneFdTask()
void Driver::DriverImpl::QuerySockets()
{
#ifdef SOCKPUPPET_WITH_TLS
assert(sockets.size() + 1U == pfds.size());

for(size_t i = 0U; i < sockets.size(); ++i) {
auto &&pfd = pfds[i + 1U];
auto &&sock = sockets[i].get();
assert(pfd.fd == sock.DriverGetFd());

sock.DriverQuery(pfd.events);
}
#endif // SOCKPUPPET_WITH_TLS
}

void Driver::DriverImpl::DoOneSocketTask()
{
assert(sockets.size() + 1U == pfds.size());

// user task may unregister/destroy a socket -> handle only one
for(size_t i = 0U; i < sockets.size(); ++i) {
auto &&pfd = pfds[i + 1U];
auto &&sock = sockets[i].get();
assert(pfd.fd == sock.buff->sock->fd);
assert(pfd.fd == sock.DriverGetFd());

if(pfd.revents & POLLIN) {
sock.DriverOnReadable();
return;
} else if(pfd.revents & POLLOUT) {
if(sock.DriverOnWritable()) {
// send queue emptied -> stop write poll
pfd.events &= ~POLLOUT;
}
return;
Expand Down
12 changes: 8 additions & 4 deletions src/driver_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,19 @@ struct Driver::DriverImpl
PauseGuard &operator=(PauseGuard &&) = delete;
};

/// internal signalling pipe for cancelling Step()
/// Internal signalling pipe for cancelling Step()
AddressShared pipeToAddr;
SocketImpl pipeFrom;
SocketImpl pipeTo;

/// Lists of managed ToDos and Sockets protected by mutex
std::recursive_mutex stepMtx;
std::mutex pauseMtx;
ToDos todos; // guarded by stepMtx
std::vector<SocketRef> sockets; // guarded by stepMtx
std::vector<pollfd> pfds; // front element belongs to internal signalling pipe; guarded by stepMtx

std::atomic<bool> shouldStop; ///< flag for cancelling Run()
std::atomic<bool> shouldStop; ///< Flag for cancelling Run()

DriverImpl();
DriverImpl(DriverImpl const &) = delete;
Expand All @@ -76,7 +77,7 @@ struct Driver::DriverImpl
void Step(Duration timeout);
template<typename Deadline>
Duration StepTodos(Deadline deadline);
void StepFds(Duration timeout);
void StepSockets(Duration timeout);

void Run();
void Stop();
Expand All @@ -91,10 +92,13 @@ struct Driver::DriverImpl
void AsyncUnregister(SOCKET fd);
void AsyncWantSend(SOCKET fd);

// interactions with signalling pipe
void Bump();
void Unbump();

void DoOneFdTask();
// interactions with sockets
void QuerySockets();
void DoOneSocketTask();
};

} // namespace sockpuppet
Expand Down
6 changes: 6 additions & 0 deletions src/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ SocketUdp::SocketUdp(Address const &bindAddress)
{
impl->Bind(bindAddress.impl->ForUdp());
impl->SetSockOptBroadcast();
impl->SetSockOptNonBlocking();
}

size_t SocketUdp::SendTo(char const *data, size_t size,
Expand Down Expand Up @@ -54,6 +55,7 @@ SocketTcp::SocketTcp(Address const &connectAddress)
{
impl->SetSockOptNoSigPipe();
impl->Connect(connectAddress.impl->ForTcp());
impl->SetSockOptNonBlocking();
}

#ifdef SOCKPUPPET_WITH_TLS
Expand All @@ -65,6 +67,7 @@ SocketTcp::SocketTcp(Address const &connectAddress,
{
impl->SetSockOptNoSigPipe();
impl->Connect(connectAddress.impl->ForTcp());
impl->SetSockOptNonBlocking();
}
#endif // SOCKPUPPET_WITH_TLS

Expand Down Expand Up @@ -97,6 +100,7 @@ SocketTcp::SocketTcp(std::unique_ptr<SocketImpl> &&other)
: impl(std::move(other))
{
impl->SetSockOptNoSigPipe();
impl->SetSockOptNonBlocking();
}

SocketTcp::SocketTcp(SocketTcp &&other) noexcept = default;
Expand All @@ -112,6 +116,7 @@ Acceptor::Acceptor(Address const &bindAddress)
{
impl->SetSockOptReuseAddr();
impl->Bind(bindAddress.impl->ForTcp());
impl->SetSockOptNonBlocking();
}

#ifdef SOCKPUPPET_WITH_TLS
Expand All @@ -123,6 +128,7 @@ Acceptor::Acceptor(Address const &bindAddress,
{
impl->SetSockOptReuseAddr();
impl->Bind(bindAddress.impl->ForTcp());
impl->SetSockOptNonBlocking();
}
#endif // SOCKPUPPET_WITH_TLS

Expand Down
66 changes: 28 additions & 38 deletions src/socket_async_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,20 @@ bool SocketAsyncImpl::DoSendEnqueue(std::promise<void> promise, Args&&... args)

auto &q = std::get<Queue>(sendQ);
bool wasEmpty = q.empty();
assert(!pendingTlsSend || !wasEmpty); // queue cannot be empty while TLS send is pending
q.emplace(std::move(promise), std::forward<Args>(args)...);
return wasEmpty;
}

SOCKET SocketAsyncImpl::DriverGetFd() const
{
return buff->sock->fd;
}

void SocketAsyncImpl::DriverQuery(short &events)
{
buff->sock->DriverQuery(events);
}

void SocketAsyncImpl::DriverOnReadable()
{
onReadable();
Expand All @@ -130,21 +139,6 @@ void SocketAsyncImpl::DriverConnect(ConnectHandler const &onConnect)

void SocketAsyncImpl::DriverReceive(ReceiveHandler const &onReceive)
{
if(pendingTlsSend) {
// a previous TLS send failed because handshake receipt was pending
// which probably arrived now: repeat the same send call to handle
// the handshake and continue where it left off sending
// see https://www.openssl.org/docs/man1.1.1/man3/SSL_write.html
bool isSendQueueEmpty = DriverOnWritable();
if(!isSendQueueEmpty) {
if(auto ptr = driver.lock()) {
ptr->AsyncWantSend(buff->sock->fd);
}
}

return;
}

try {
auto buffer = buff->Receive();
if(buffer->empty()) {
Expand Down Expand Up @@ -189,30 +183,21 @@ bool SocketAsyncImpl::DriverSend(SendQ &q)
{
auto const sendQSize = q.size();
if(!sendQSize) {
throw std::logic_error("uncalled send");
// TLS socket has requested write for TLS handshake
buff->sock->DriverPending();
return true;
}

auto &&[promise, buffer] = q.front();
if(pendingTlsSend) {
assert(pendingTlsSend == buffer->data());
pendingTlsSend = nullptr;
}

try {
if(auto sent = buff->sock->SendSome(buffer->data(), buffer->size())) {
if(sent == buffer->size()) {
promise.set_value();
} else {
// allow partial send to avoid starving other driver's sockets if this one is rate limited
buffer->erase(0, sent);
return false;
}
} else { // zero-size sent data
// TLS can't send while handshake receipt pending:
// give up for now by proclaiming the send queue is empty,
// but actually keep the data queued and retry the exact same call on readable
pendingTlsSend = buffer->data();
return true;
auto sent = buff->sock->SendSome(buffer->data(), buffer->size());
if(sent == buffer->size()) {
promise.set_value();
} else {
// allow partial send to avoid starving other driver's sockets if this one is rate limited
// (may also be zero sent despite socket being writable when TLS handshake is pending)
buffer->erase(0, sent);
return false;
}
} catch(std::runtime_error const &e) {
promise.set_exception(std::make_exception_ptr(e));
Expand Down Expand Up @@ -248,9 +233,14 @@ void SocketAsyncImpl::DriverOnError(char const *message)
}

void SocketAsyncImpl::DriverDisconnect(DisconnectHandler const &onDisconnect,
AddressShared peerAddr, char const *)
AddressShared peerAddr, char const *reason)
{
onDisconnect(Address(std::move(peerAddr)));
// don't try to do any further receives that would just lead to more errors
if(auto ptr = driver.lock()) {
ptr->AsyncUnregister(buff->sock->fd);
}

onDisconnect(Address(std::move(peerAddr)), reason);
}


Expand Down
3 changes: 2 additions & 1 deletion src/socket_async_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ struct SocketAsyncImpl
std::function<void(char const *)> onError; // contains use-case-dependent data as bound arguments
mutable std::mutex sendQMtx;
std::variant<SendQ, SendToQ> sendQ; // use-case dependent queue type
char const *pendingTlsSend = nullptr; // flag to satisfy OpenSSL_write retry requirements

SocketAsyncImpl(std::unique_ptr<SocketBufferedImpl> &&buff,
DriverShared &driver,
Expand All @@ -57,6 +56,8 @@ struct SocketAsyncImpl
bool DoSendEnqueue(std::promise<void> promise, Args&&... args);

// in thread context of DriverImpl
SOCKET DriverGetFd() const;
void DriverQuery(short &events);
void DriverOnReadable();
void DriverConnect(ConnectHandler const &onConnect);
void DriverReceive(ReceiveHandler const &onReceive);
Expand Down
Loading
Loading