diff --git a/Drv/Ip/IpSocket.cpp b/Drv/Ip/IpSocket.cpp index 2c148d3087..0645959791 100644 --- a/Drv/Ip/IpSocket.cpp +++ b/Drv/Ip/IpSocket.cpp @@ -23,17 +23,17 @@ #ifdef TGT_OS_TYPE_VXWORKS #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #elif defined TGT_OS_TYPE_LINUX || TGT_OS_TYPE_DARWIN #include #include @@ -52,7 +52,7 @@ IpSocket::IpSocket() : m_timeoutSeconds(0), m_timeoutMicroseconds(0), m_port(0) SocketIpStatus IpSocket::configure(const char* const hostname, const U16 port, const U32 timeout_seconds, const U32 timeout_microseconds) { FW_ASSERT(timeout_microseconds < 1000000, static_cast(timeout_microseconds)); - FW_ASSERT(this->isValidPort(port)); + FW_ASSERT(this->isValidPort(port), static_cast(port)); FW_ASSERT(hostname != nullptr); this->m_timeoutSeconds = timeout_seconds; this->m_timeoutMicroseconds = timeout_microseconds; @@ -65,7 +65,7 @@ bool IpSocket::isValidPort(U16 port) { return true; } -SocketIpStatus IpSocket::setupTimeouts(NATIVE_INT_TYPE socketFd) { +SocketIpStatus IpSocket::setupTimeouts(PlatformIntType socketFd) { // Get the IP address from host #ifdef TGT_OS_TYPE_VXWORKS // No timeouts set on Vxworks @@ -103,38 +103,39 @@ SocketIpStatus IpSocket::addressToIp4(const char* address, void* ip4) { return SOCK_SUCCESS; } -void IpSocket::close(NATIVE_INT_TYPE fd) { - (void)::shutdown(fd, SHUT_RDWR); - (void)::close(fd); -} - -void IpSocket::shutdown(NATIVE_INT_TYPE fd) { - this->close(fd); +void IpSocket::close(const SocketDescriptor& socketDescriptor) { + (void)::close(socketDescriptor.fd); } -SocketIpStatus IpSocket::startup() { - // no op for non-server components - return SOCK_SUCCESS; +void IpSocket::shutdown(const SocketDescriptor& socketDescriptor) { + errno = 0; + PlatformIntType status = ::shutdown(socketDescriptor.fd, SHUT_RDWR); + // If shutdown fails, go straight to the hard-shutdown + if (status != 0) { + this->close(socketDescriptor); + } } -SocketIpStatus IpSocket::open(NATIVE_INT_TYPE& fd) { +SocketIpStatus IpSocket::open(SocketDescriptor& socketDescriptor) { SocketIpStatus status = SOCK_SUCCESS; + errno = 0; // Open a TCP socket for incoming commands, and outgoing data if not using UDP - status = this->openProtocol(fd); + status = this->openProtocol(socketDescriptor); if (status != SOCK_SUCCESS) { - FW_ASSERT(fd == -1); // Ensure we properly kept closed on error + socketDescriptor.fd = -1; return status; } return status; } -SocketIpStatus IpSocket::send(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) { +SocketIpStatus IpSocket::send(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) { U32 total = 0; I32 sent = 0; // Attempt to send out data and retry as necessary for (U32 i = 0; (i < SOCKET_MAX_ITERATIONS) && (total < size); i++) { + errno = 0; // Send using my specific protocol - sent = this->sendProtocol(fd, data + total, size - total); + sent = this->sendProtocol(socketDescriptor, data + total, size - total); // Error is EINTR or timeout just try again if (((sent == -1) && (errno == EINTR)) || (sent == 0)) { continue; @@ -159,13 +160,13 @@ SocketIpStatus IpSocket::send(NATIVE_INT_TYPE fd, const U8* const data, const U3 return SOCK_SUCCESS; } -SocketIpStatus IpSocket::recv(NATIVE_INT_TYPE fd, U8* data, U32& req_read) { +SocketIpStatus IpSocket::recv(const SocketDescriptor& socketDescriptor, U8* data, U32& req_read) { I32 size = 0; - // Try to read until we fail to receive data for (U32 i = 0; (i < SOCKET_MAX_ITERATIONS) && (size <= 0); i++) { + errno = 0; // Attempt to recv out data - size = this->recvProtocol(fd, data, req_read); + size = this->recvProtocol(socketDescriptor, data, req_read); // Nothing to be received if ((size == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK))) { diff --git a/Drv/Ip/IpSocket.hpp b/Drv/Ip/IpSocket.hpp index 19501edd0e..554bb5fe65 100644 --- a/Drv/Ip/IpSocket.hpp +++ b/Drv/Ip/IpSocket.hpp @@ -17,6 +17,12 @@ #include namespace Drv { + +struct SocketDescriptor final { + PlatformIntType fd = -1; //!< Used for all sockets to track the communication file descriptor + PlatformIntType serverFd = -1; //!< Used for server sockets to track the listening file descriptor +}; + /** * \brief Status enumeration for socket return values */ @@ -36,7 +42,8 @@ enum SocketIpStatus { SOCK_SEND_ERROR = -13, //!< Failed to send after configured retries SOCK_NOT_STARTED = -14, //!< Socket has not been started SOCK_FAILED_TO_READ_BACK_PORT = -15, //!< Failed to read back port from connection - SOCK_NO_DATA_AVAILABLE = -16 //!< No data available or read operation would block + SOCK_NO_DATA_AVAILABLE = -16, //!< No data available or read operation would block + SOCK_ANOTHER_THREAD_OPENING = -17 //!< Another thread is opening }; /** @@ -70,16 +77,6 @@ class IpSocket { SocketIpStatus configure(const char* hostname, const U16 port, const U32 send_timeout_seconds, const U32 send_timeout_microseconds); - /** - * \brief startup the socket, a no-op on unless this is server - * - * This will start-up the socket. In the case of most sockets, this is a no-op. On server sockets this binds to the - * server address and progresses through the `listen` step such that on `open` new clients may be accepted. - * - * \return status of startup - */ - virtual SocketIpStatus startup(); - /** * \brief open the IP socket for communications * @@ -95,10 +92,10 @@ class IpSocket { * * Note: delegates to openProtocol for protocol specific implementation * - * \param fd: file descriptor to open + * \param socketDescriptor: socket descriptor to update with opened port * \return status of open */ - SocketIpStatus open(NATIVE_INT_TYPE& fd); + SocketIpStatus open(SocketDescriptor& socketDescriptor); /** * \brief send data out the IP socket from the given buffer * @@ -115,7 +112,7 @@ class IpSocket { * \param size: size of data to send * \return status of the send, SOCK_DISCONNECTED to reopen, SOCK_SUCCESS on success, something else on error */ - SocketIpStatus send(NATIVE_INT_TYPE fd, const U8* const data, const U32 size); + SocketIpStatus send(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size); /** * \brief receive data from the IP socket from the given buffer * @@ -127,31 +124,35 @@ class IpSocket { * * Note: delegates to `recvProtocol` to send the data * - * \param fd: file descriptor to recv from + * \param socketDescriptor: socket descriptor to recv from * \param data: pointer to data to fill with received data * \param size: maximum size of data buffer to fill * \return status of the send, SOCK_DISCONNECTED to reopen, SOCK_SUCCESS on success, something else on error */ - SocketIpStatus recv(NATIVE_INT_TYPE fd, U8* const data, U32& size); + SocketIpStatus recv(const SocketDescriptor& fd, U8* const data, U32& size); + /** * \brief closes the socket * * Closes the socket opened by the open call. In this case of the TcpServer, this does NOT close server's listening - * port (call `shutdown`) but will close the active client connection. + * port but will close the active client connection. * - * \param fd: file descriptor to close + * \param socketDescriptor: socket descriptor to close */ - void close(NATIVE_INT_TYPE fd); + void close(const SocketDescriptor& socketDescriptor); /** * \brief shutdown the socket * - * Closes the socket opened by the open call. In this case of the TcpServer, this does close server's listening - * port. This will shutdown all clients. + * Shuts down the socket opened by the open call. In this case of the TcpServer, this does shut down server's + * listening port, but rather shuts down the active client. + * + * A shut down begins the termination of communication. The underlying socket will coordinate a clean shutdown, and + * it is safe to close the socket once a recv with 0 size has returned or an appropriate timeout has been reached. * - * \param fd: file descriptor to shutdown + * \param socketDescriptor: socket descriptor to shutdown */ - virtual void shutdown(NATIVE_INT_TYPE fd); + void shutdown(const SocketDescriptor& socketDescriptor); PROTECTED: /** @@ -168,10 +169,10 @@ class IpSocket { /** * \brief setup the socket timeout properties of the opened outgoing socket - * \param socketFd: file descriptor to setup + * \param socketDescriptor: socket descriptor to setup * \return status of timeout setup */ - SocketIpStatus setupTimeouts(NATIVE_INT_TYPE socketFd); + SocketIpStatus setupTimeouts(PlatformIntType socketFd); /** * \brief converts a given address in dot form x.x.x.x to an ip address. ONLY works for IPv4. @@ -182,27 +183,27 @@ class IpSocket { static SocketIpStatus addressToIp4(const char* address, void* ip4); /** * \brief Protocol specific open implementation, called from open. - * \param fd: (output) file descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid + * \param socketDescriptor: (output) socket descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid * \return status of open */ - virtual SocketIpStatus openProtocol(NATIVE_INT_TYPE& fd) = 0; + virtual SocketIpStatus openProtocol(SocketDescriptor& fd) = 0; /** * \brief Protocol specific implementation of send. Called directly with retry from send. - * \param fd: file descriptor to send to + * \param socketDescriptor: socket descriptor to send to * \param data: data to send * \param size: size of data to send * \return: size of data sent, or -1 on error. */ - virtual I32 sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) = 0; + virtual I32 sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) = 0; /** * \brief Protocol specific implementation of recv. Called directly with error handling from recv. - * \param fd: file descriptor to recv from + * \param socket: socket descriptor to recv from * \param data: data pointer to fill * \param size: size of data buffer * \return: size of data received, or -1 on error. */ - virtual I32 recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) = 0; + virtual I32 recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) = 0; U32 m_timeoutSeconds; U32 m_timeoutMicroseconds; diff --git a/Drv/Ip/SocketComponentHelper.cpp b/Drv/Ip/SocketComponentHelper.cpp index 56f0a4bdc0..6c6efa3232 100644 --- a/Drv/Ip/SocketComponentHelper.cpp +++ b/Drv/Ip/SocketComponentHelper.cpp @@ -15,11 +15,9 @@ #include #include -#define MAXIMUM_SIZE 0x7FFFFFFF - namespace Drv { -SocketComponentHelper::SocketComponentHelper() : m_fd(-1), m_reconnect(false), m_stop(false), m_started(false), m_open(false) {} +SocketComponentHelper::SocketComponentHelper() {} SocketComponentHelper::~SocketComponentHelper() {} @@ -29,61 +27,64 @@ void SocketComponentHelper::start(const Fw::StringBase &name, const Os::Task::ParamType stack, const Os::Task::ParamType cpuAffinity) { FW_ASSERT(m_task.getState() == Os::Task::State::NOT_STARTED); // It is a coding error to start this task multiple times - FW_ASSERT(not this->m_stop); // It is a coding error to stop the thread before it is started + this->m_stop = false; m_reconnect = reconnect; // Note: the first step is for the IP socket to open the port Os::Task::Arguments arguments(name, SocketComponentHelper::readTask, this, priority, stack, cpuAffinity); Os::Task::Status stat = m_task.start(arguments); - FW_ASSERT(Os::Task::OP_OK == stat, static_cast(stat)); -} - -SocketIpStatus SocketComponentHelper::startup() { - this->m_started = true; - return this->getSocketHandler().startup(); -} - -bool SocketComponentHelper::isStarted() { - bool is_started = false; - is_started = this->m_started; - return is_started; + FW_ASSERT(Os::Task::OP_OK == stat, static_cast(stat)); } SocketIpStatus SocketComponentHelper::open() { - SocketIpStatus status = SOCK_FAILED_TO_GET_SOCKET; - NATIVE_INT_TYPE fd = -1; - this->m_lock.lock(); - if (not this->m_open) { - FW_ASSERT(this->m_fd == -1 and not this->m_open); // Ensure we are not opening an opened socket - status = this->getSocketHandler().open(fd); - this->m_fd = fd; - // Call connected any time the open is successful + SocketIpStatus status = SOCK_ANOTHER_THREAD_OPENING; + OpenState local_open = OpenState::OPEN; + // Scope to guard lock + { + Os::ScopeLock scopeLock(m_lock); + if (this->m_open == OpenState::NOT_OPEN) { + this->m_open = OpenState::OPENING; + local_open = this->m_open; + } else { + local_open = OpenState::SKIP; + } + + } + if (local_open == OpenState::OPENING) { + FW_ASSERT(this->m_descriptor.fd == -1); // Ensure we are not opening an opened socket + status = this->getSocketHandler().open(this->m_descriptor); + // Lock scope + { + Os::ScopeLock scopeLock(m_lock); + if (Drv::SOCK_SUCCESS == status) { + this->m_open = OpenState::OPEN; + } else { + this->m_open = OpenState::NOT_OPEN; + this->m_descriptor.fd = -1; + } + } + // Notify connection on success outside locked scope if (Drv::SOCK_SUCCESS == status) { - this->m_open = true; - this->m_lock.unlock(); this->connected(); - return status; } } - this->m_lock.unlock(); + return status; } bool SocketComponentHelper::isOpened() { - bool is_open = false; - this->m_lock.lock(); - is_open = this->m_open; - this->m_lock.unlock(); + Os::ScopeLock scopedLock(this->m_lock); + bool is_open = this->m_open == OpenState::OPEN; return is_open; } SocketIpStatus SocketComponentHelper::reconnect() { SocketIpStatus status = SOCK_SUCCESS; - - // Handle opening - // Open a network connection if it has not already been open - if (this->isStarted() and (not this->isOpened())) { + if (not this->isOpened()) { status = this->open(); + if (status == SocketIpStatus::SOCK_ANOTHER_THREAD_OPENING) { + status = SocketIpStatus::SOCK_SUCCESS; + } } return status; } @@ -91,20 +92,21 @@ SocketIpStatus SocketComponentHelper::reconnect() { SocketIpStatus SocketComponentHelper::send(const U8* const data, const U32 size) { SocketIpStatus status = SOCK_SUCCESS; this->m_lock.lock(); - NATIVE_INT_TYPE fd = this->m_fd; + SocketDescriptor descriptor = this->m_descriptor; this->m_lock.unlock(); // Prevent transmission before connection, or after a disconnect - if (fd == -1) { + if (descriptor.fd == -1) { status = this->reconnect(); // if reconnect wasn't successful, pass the that up to the caller if(status != SOCK_SUCCESS) { return status; } + // Refresh local copy after reconnect this->m_lock.lock(); - fd = this->m_fd; + descriptor = this->m_descriptor; this->m_lock.unlock(); } - status = this->getSocketHandler().send(fd, data, size); + status = this->getSocketHandler().send(descriptor, data, size); if (status == SOCK_DISCONNECTED) { this->close(); } @@ -112,21 +114,15 @@ SocketIpStatus SocketComponentHelper::send(const U8* const data, const U32 size) } void SocketComponentHelper::shutdown() { - this->m_lock.lock(); - this->getSocketHandler().shutdown(this->m_fd); - this->m_started = false; - this->m_fd = -1; - this->m_lock.unLock(); + Os::ScopeLock scopedLock(this->m_lock); + this->getSocketHandler().shutdown(this->m_descriptor); } void SocketComponentHelper::close() { - this->m_lock.lock(); - if (this->m_fd != -1) { - this->getSocketHandler().close(this->m_fd); - this->m_fd = -1; - } - this->m_open = false; - this->m_lock.unlock(); + Os::ScopeLock scopedLock(this->m_lock); + this->getSocketHandler().close(this->m_descriptor); + this->m_descriptor.fd = -1; + this->m_open = OpenState::NOT_OPEN; } Os::Task::Status SocketComponentHelper::join() { @@ -134,69 +130,79 @@ Os::Task::Status SocketComponentHelper::join() { } void SocketComponentHelper::stop() { - this->m_lock.lock(); - this->m_stop = true; - this->m_lock.unlock(); + // Scope to protect lock + { + Os::ScopeLock scopeLock(m_lock); + this->m_stop = true; + } this->shutdown(); // Break out of any receives and fully shutdown } +bool SocketComponentHelper::running() { + Os::ScopeLock scopedLock(this->m_lock); + bool running = not this->m_stop; + return running; +} + SocketIpStatus SocketComponentHelper::recv(U8* data, U32 &size) { SocketIpStatus status = SOCK_SUCCESS; // Check for previously disconnected socket this->m_lock.lock(); - NATIVE_INT_TYPE fd = this->m_fd; + SocketDescriptor descriptor = this->m_descriptor; this->m_lock.unlock(); - if (fd == -1) { + if (descriptor.fd == -1) { return SOCK_DISCONNECTED; } - status = this->getSocketHandler().recv(fd, data, size); + status = this->getSocketHandler().recv(descriptor, data, size); if (status == SOCK_DISCONNECTED) { this->close(); } return status; } -void SocketComponentHelper::readTask(void* pointer) { - FW_ASSERT(pointer); +void SocketComponentHelper::readLoop() { SocketIpStatus status = SOCK_SUCCESS; - SocketComponentHelper* self = reinterpret_cast(pointer); do { // Prevent transmission before connection, or after a disconnect - if ((not self->isOpened()) and (not self->m_stop)) { - status = self->reconnect(); - if(status != SOCK_SUCCESS) { - Fw::Logger::log( - "[WARNING] Failed to open port with status %d and errno %d\n", - status, - errno); - (void) Os::Task::delay(SOCKET_RETRY_INTERVAL); + if ((not this->isOpened()) and this->running()) { + status = this->reconnect(); + if (status != SOCK_SUCCESS) { + Fw::Logger::log("[WARNING] Failed to open port with status %d and errno %d\n", status, errno); + (void)Os::Task::delay(SOCKET_RETRY_INTERVAL); continue; } } // If the network connection is open, read from it - if (self->isStarted() and self->isOpened() and (not self->m_stop)) { - Fw::Buffer buffer = self->getBuffer(); + if (this->isOpened() and this->running()) { + Fw::Buffer buffer = this->getBuffer(); U8* data = buffer.getData(); FW_ASSERT(data); U32 size = buffer.getSize(); // recv blocks, so it may have been a while since its done an isOpened check - status = self->recv(data, size); - if ((status != SOCK_SUCCESS) && (status != SOCK_INTERRUPTED_TRY_AGAIN && (status != SOCK_NO_DATA_AVAILABLE))) { + status = this->recv(data, size); + if ((status != SOCK_SUCCESS) && (status != SOCK_INTERRUPTED_TRY_AGAIN) && (status != SOCK_NO_DATA_AVAILABLE)) { Fw::Logger::log("[WARNING] Failed to recv from port with status %d and errno %d\n", - status, - errno); - self->close(); + status, + errno); + this->close(); buffer.setSize(0); } else { // Send out received data buffer.setSize(size); } - self->sendBuffer(buffer, status); + this->sendBuffer(buffer, status); } } // As long as not told to stop, and we are successful interrupted or ordered to retry, keep receiving - while (not self->m_stop && - (status == SOCK_SUCCESS || status == SOCK_INTERRUPTED_TRY_AGAIN || self->m_reconnect)); - self->shutdown(); // Shutdown the port entirely + while (this->running() && + (status == SOCK_SUCCESS || (status == SOCK_NO_DATA_AVAILABLE) || status == SOCK_INTERRUPTED_TRY_AGAIN || this->m_reconnect)); + // Close the socket + this->close(); // Close the port entirely +} + +void SocketComponentHelper::readTask(void* pointer) { + FW_ASSERT(pointer); + SocketComponentHelper* self = reinterpret_cast(pointer); + self->readLoop(); } } // namespace Drv diff --git a/Drv/Ip/SocketComponentHelper.hpp b/Drv/Ip/SocketComponentHelper.hpp index e2604705d2..a2a25a0209 100644 --- a/Drv/Ip/SocketComponentHelper.hpp +++ b/Drv/Ip/SocketComponentHelper.hpp @@ -27,6 +27,12 @@ namespace Drv { */ class SocketComponentHelper { public: + enum OpenState{ + NOT_OPEN, + OPENING, + OPEN, + SKIP + }; /** * \brief constructs the socket read task */ @@ -56,25 +62,6 @@ class SocketComponentHelper { const Os::Task::ParamType stack = Os::Task::TASK_DEFAULT, const Os::Task::ParamType cpuAffinity = Os::Task::TASK_DEFAULT); - /** - * \brief startup the socket for communications - * - * Status of the socket handler. - * - * Note: this just delegates to the handler - * - * \return status of open, SOCK_SUCCESS for success, something else on error - */ - SocketIpStatus startup(); - - /** - * \brief Returns true when the socket is started - * - * Returns true when the socket is started up sufficiently to be actively listening to clients. Returns false - * otherwise. This means `startup()` was called and returned success. - */ - bool isStarted(); - /** * \brief open the socket for communications * @@ -129,9 +116,7 @@ class SocketComponentHelper { /** * \brief close the socket communications * - * Typically stopping the socket read task will shutdown the connection. However, in cases where the read task - * will not be started, this function may be used to close the socket. This calls a full `close` on the client - * socket. + * Close the client connection. This will ensure that the resources used are cleaned-up. * * Note: this just delegates to the handler */ @@ -140,14 +125,18 @@ class SocketComponentHelper { /** * \brief shutdown the socket communications * - * Typically stopping the socket read task will shutdown the connection. However, in cases where the read task - * will not be started, this function may be used to close the socket. This calls a full `shutdown` on the client - * socket. + * Shutdown communication. This will begin the process of cleanly closing communications. This process will be + * finished with a receive of 0 size and should be followed by a close. * * Note: this just delegates to the handler */ void shutdown(); + /** + * \brief is the read loop running + */ + bool running(); + /** * \brief stop the socket read task and close the associated socket. * @@ -166,8 +155,11 @@ class SocketComponentHelper { */ Os::Task::Status join(); - PROTECTED: + /** + * \brief receive off the TCP socket + */ + virtual void readLoop(); /** * \brief returns a reference to the socket handler * @@ -209,6 +201,7 @@ class SocketComponentHelper { */ virtual void connected() = 0; + /** * \brief a task designed to read from the socket and output incoming data * @@ -218,13 +211,10 @@ class SocketComponentHelper { Os::Task m_task; Os::Mutex m_lock; - NATIVE_INT_TYPE m_fd; - bool m_reconnect; //!< Force reconnection - bool m_stop; //!< Stops the task when set to true - bool m_started; //!< Have we successfully started the socket - bool m_open; //!< Have we successfully opened - - + SocketDescriptor m_descriptor; + bool m_reconnect = false; //!< Force reconnection + bool m_stop = true; //!< Stops the task when set to true + OpenState m_open = OpenState::NOT_OPEN; //!< Have we successfully opened }; } #endif // DRV_SocketComponentHelper_HPP diff --git a/Drv/Ip/TcpClientSocket.cpp b/Drv/Ip/TcpClientSocket.cpp index 8ad574badd..0cad233748 100644 --- a/Drv/Ip/TcpClientSocket.cpp +++ b/Drv/Ip/TcpClientSocket.cpp @@ -47,7 +47,7 @@ bool TcpClientSocket::isValidPort(U16 port) { } -SocketIpStatus TcpClientSocket::openProtocol(NATIVE_INT_TYPE& fd) { +SocketIpStatus TcpClientSocket::openProtocol(SocketDescriptor& socketDescriptor) { NATIVE_INT_TYPE socketFd = -1; struct sockaddr_in address; @@ -81,17 +81,17 @@ SocketIpStatus TcpClientSocket::openProtocol(NATIVE_INT_TYPE& fd) { ::close(socketFd); return SOCK_FAILED_TO_CONNECT; } - fd = socketFd; + socketDescriptor.fd = socketFd; Fw::Logger::log("Connected to %s:%hu as a tcp client\n", m_hostname, m_port); return SOCK_SUCCESS; } -I32 TcpClientSocket::sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) { - return static_cast(::send(fd, data, size, SOCKET_IP_SEND_FLAGS)); +I32 TcpClientSocket::sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) { + return static_cast(::send(socketDescriptor.fd, data, size, SOCKET_IP_SEND_FLAGS)); } -I32 TcpClientSocket::recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) { - return static_cast(::recv(fd, data, size, SOCKET_IP_RECV_FLAGS)); +I32 TcpClientSocket::recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) { + return static_cast(::recv(socketDescriptor.fd, data, size, SOCKET_IP_RECV_FLAGS)); } } // namespace Drv diff --git a/Drv/Ip/TcpClientSocket.hpp b/Drv/Ip/TcpClientSocket.hpp index cd12f58cc6..c5a800daa0 100644 --- a/Drv/Ip/TcpClientSocket.hpp +++ b/Drv/Ip/TcpClientSocket.hpp @@ -44,26 +44,26 @@ class TcpClientSocket : public IpSocket { /** * \brief Tcp specific implementation for opening a client socket. - * \param fd: (output) file descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid + * \param socketDescriptor: (output) descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid * \return status of open */ - SocketIpStatus openProtocol(NATIVE_INT_TYPE& fd) override; + SocketIpStatus openProtocol(SocketDescriptor& socketDescriptor) override; /** * \brief Protocol specific implementation of send. Called directly with retry from send. - * \param fd: file descriptor to send to + * \param socketDescriptor: descriptor to send to * \param data: data to send * \param size: size of data to send * \return: size of data sent, or -1 on error. */ - I32 sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) override; + I32 sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) override; /** * \brief Protocol specific implementation of recv. Called directly with error handling from recv. - * \param fd: file descriptor to recv from + * \param socketDescriptor: descriptor to recv from * \param data: data pointer to fill * \param size: size of data buffer * \return: size of data received, or -1 on error. */ - I32 recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) override; + I32 recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) override; }; } // namespace Drv diff --git a/Drv/Ip/TcpServerSocket.cpp b/Drv/Ip/TcpServerSocket.cpp index 5f29658959..d0ad011586 100644 --- a/Drv/Ip/TcpServerSocket.cpp +++ b/Drv/Ip/TcpServerSocket.cpp @@ -38,15 +38,15 @@ namespace Drv { -TcpServerSocket::TcpServerSocket() : IpSocket(), m_base_fd(-1) {} +TcpServerSocket::TcpServerSocket() : IpSocket() {} U16 TcpServerSocket::getListenPort() { U16 port = this->m_port; return port; } -SocketIpStatus TcpServerSocket::startup() { - NATIVE_INT_TYPE serverFd = -1; +SocketIpStatus TcpServerSocket::startup(SocketDescriptor& socketDescriptor) { + PlatformIntType serverFd = -1; struct sockaddr_in address; // Acquire a socket, or return error if ((serverFd = ::socket(AF_INET, SOCK_STREAM, 0)) == -1) { @@ -77,34 +77,30 @@ SocketIpStatus TcpServerSocket::startup() { ::close(serverFd); return SOCK_FAILED_TO_READ_BACK_PORT; } - U16 port = ntohs(address.sin_port); - Fw::Logger::log("Listening for single client at %s:%hu\n", m_hostname, port); - // TCP requires listening on the socket. Since we only expect a single client, set the TCP backlog (second argument) to 1 to prevent queuing of multiple clients. + // TCP requires listening on the socket. Since we only expect a single client, set the TCP backlog (second argument) to 1 to prevent queuing of multiple clients. if (::listen(serverFd, 1) < 0) { ::close(serverFd); return SOCK_FAILED_TO_LISTEN; // What we have here is a failure to communicate } - - m_base_fd = serverFd; - m_port = port; - - return this->IpSocket::startup(); + Fw::Logger::log("Listening for single client at %s:%hu\n", m_hostname, m_port); + FW_ASSERT(serverFd != -1); + socketDescriptor.serverFd = serverFd; + this->m_port = ntohs(address.sin_port); + return SOCK_SUCCESS; } -void TcpServerSocket::shutdown(NATIVE_INT_TYPE fd) { - if (this->m_base_fd != -1) { - (void)::shutdown(this->m_base_fd, SHUT_RDWR); - (void)::close(this->m_base_fd); - this->m_base_fd = -1; - } - this->IpSocket::shutdown(fd); +void TcpServerSocket::terminate(const SocketDescriptor& socketDescriptor) { + (void)::close(socketDescriptor.serverFd); } -SocketIpStatus TcpServerSocket::openProtocol(NATIVE_INT_TYPE& fd) { - NATIVE_INT_TYPE clientFd = -1; - NATIVE_INT_TYPE serverFd = -1; +SocketIpStatus TcpServerSocket::openProtocol(SocketDescriptor& socketDescriptor) { + PlatformIntType clientFd = -1; + PlatformIntType serverFd = socketDescriptor.serverFd; - serverFd = this->m_base_fd; + // Check for not started yet, may be true in the case of start-up reconnect attempts + if (serverFd == -1) { + return SOCK_NOT_STARTED; + } // TCP requires accepting on the socket to get the client socket file descriptor. clientFd = ::accept(serverFd, nullptr, nullptr); @@ -118,18 +114,18 @@ SocketIpStatus TcpServerSocket::openProtocol(NATIVE_INT_TYPE& fd) { } Fw::Logger::log("Accepted client at %s:%hu\n", m_hostname, m_port); - fd = clientFd; + socketDescriptor.fd = clientFd; return SOCK_SUCCESS; } -I32 TcpServerSocket::sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) { - return static_cast(::send(fd, data, size, SOCKET_IP_SEND_FLAGS)); +I32 TcpServerSocket::sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) { + return static_cast(::send(socketDescriptor.fd, data, size, SOCKET_IP_SEND_FLAGS)); } -I32 TcpServerSocket::recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) { +I32 TcpServerSocket::recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) { I32 size_buf; // recv will return 0 if the client has done an orderly shutdown - size_buf = static_cast(::recv(fd, data, size, SOCKET_IP_RECV_FLAGS)); + size_buf = static_cast(::recv(socketDescriptor.fd, data, size, SOCKET_IP_RECV_FLAGS)); return size_buf; } diff --git a/Drv/Ip/TcpServerSocket.hpp b/Drv/Ip/TcpServerSocket.hpp index 6e639d15c1..ae623df293 100644 --- a/Drv/Ip/TcpServerSocket.hpp +++ b/Drv/Ip/TcpServerSocket.hpp @@ -17,6 +17,7 @@ #include namespace Drv { + /** * \brief Helper for setting up Tcp using Berkeley sockets as a server * @@ -36,18 +37,20 @@ class TcpServerSocket : public IpSocket { * Opens the server's listening socket such that this server can listen for incoming client requests. Given the * nature of this component, only one (1) client can be handled at a time. After this call succeeds, clients may * connect. This call does not block, block occurs on `open` while waiting to accept incoming clients. + * \param socketDescriptor: server descriptor will be written here * \return status of the server socket setup. */ - SocketIpStatus startup() override; + SocketIpStatus startup(SocketDescriptor& socketDescriptor); /** - * \brief Shutdown and close the server socket followed by the open client + * \brief close the server socket created by the `startup` call + * + * Calls the close function on the server socket. No shutdown is performed on the server socket, as that is left to + * the individual client sockets. * - * \param fd: file descriptor to shutdown - * First, this calls `shutdown` and `close` on the server socket and then calls the close method to `shutdown` and - * `close` the client. + * \param socketDescriptor: descriptor to close */ - void shutdown(NATIVE_INT_TYPE fd) override; + void terminate(const SocketDescriptor& socketDescriptor); /** * \brief get the port being listened on @@ -62,28 +65,29 @@ class TcpServerSocket : public IpSocket { PROTECTED: /** * \brief Tcp specific implementation for opening a client socket connected to this server. - * \param fd: (output) file descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid + * \param socketDescriptor: (output) descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid * \return status of open */ - SocketIpStatus openProtocol(NATIVE_INT_TYPE& fd) override; + SocketIpStatus openProtocol(SocketDescriptor& socketDescriptor) override; /** * \brief Protocol specific implementation of send. Called directly with retry from send. - * \param fd: file descriptor to send to + * \param socketDescriptor: descriptor to send to * \param data: data to send * \param size: size of data to send * \return: size of data sent, or -1 on error. */ - I32 sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) override; + I32 sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) override; /** * \brief Protocol specific implementation of recv. Called directly with error handling from recv. - * \param fd: file descriptor to recv from + * \param socketDescriptor: descriptor to recv from * \param data: data pointer to fill * \param size: size of data buffer * \return: size of data received, or -1 on error. */ - I32 recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) override; - PRIVATE: - NATIVE_INT_TYPE m_base_fd; //!< File descriptor of the listening socket + I32 recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) override; + + + }; } // namespace Drv diff --git a/Drv/Ip/UdpSocket.cpp b/Drv/Ip/UdpSocket.cpp index 59ca76e5bf..9d8cb3f0e8 100644 --- a/Drv/Ip/UdpSocket.cpp +++ b/Drv/Ip/UdpSocket.cpp @@ -80,9 +80,9 @@ U16 UdpSocket::getRecvPort() { } -SocketIpStatus UdpSocket::bind(NATIVE_INT_TYPE fd) { +SocketIpStatus UdpSocket::bind(const PlatformIntType fd) { struct sockaddr_in address; - FW_ASSERT(-1 != fd); + FW_ASSERT(fd != -1); // Set up the address port and name address.sin_family = AF_INET; @@ -112,7 +112,7 @@ SocketIpStatus UdpSocket::bind(NATIVE_INT_TYPE fd) { return SOCK_SUCCESS; } -SocketIpStatus UdpSocket::openProtocol(NATIVE_INT_TYPE& fd) { +SocketIpStatus UdpSocket::openProtocol(SocketDescriptor& socketDescriptor) { SocketIpStatus status = SOCK_SUCCESS; NATIVE_INT_TYPE socketFd = -1; struct sockaddr_in address; @@ -169,19 +169,19 @@ SocketIpStatus UdpSocket::openProtocol(NATIVE_INT_TYPE& fd) { port); } FW_ASSERT(status == SOCK_SUCCESS, status); - fd = socketFd; + socketDescriptor.fd = socketFd; return status; } -I32 UdpSocket::sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) { +I32 UdpSocket::sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) { FW_ASSERT(this->m_state->m_addr_send.sin_family != 0); // Make sure the address was previously setup - return static_cast(::sendto(fd, data, size, SOCKET_IP_SEND_FLAGS, + return static_cast(::sendto(socketDescriptor.fd, data, size, SOCKET_IP_SEND_FLAGS, reinterpret_cast(&this->m_state->m_addr_send), sizeof(this->m_state->m_addr_send))); } -I32 UdpSocket::recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) { +I32 UdpSocket::recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) { FW_ASSERT(this->m_state->m_addr_recv.sin_family != 0); // Make sure the address was previously setup - return static_cast(::recvfrom(fd, data, size, SOCKET_IP_RECV_FLAGS, nullptr, nullptr)); + return static_cast(::recvfrom(socketDescriptor.fd, data, size, SOCKET_IP_RECV_FLAGS, nullptr, nullptr)); } } // namespace Drv diff --git a/Drv/Ip/UdpSocket.hpp b/Drv/Ip/UdpSocket.hpp index 06d21e7057..76d90d845b 100644 --- a/Drv/Ip/UdpSocket.hpp +++ b/Drv/Ip/UdpSocket.hpp @@ -18,9 +18,6 @@ namespace Drv { -/** - * \brief a structure used to hold the encapsulated socket state to prevent namespace collision - */ struct SocketState; /** @@ -89,32 +86,32 @@ class UdpSocket : public IpSocket { /** * \brief bind the UDP to a port such that it can receive packets at the previously configured port - * \param fd: socket file descriptor used in bind + * \param socketDescriptor: socket descriptor used in bind * \return status of the bind */ - SocketIpStatus bind(NATIVE_INT_TYPE fd); + SocketIpStatus bind(const PlatformIntType fd); /** * \brief udp specific implementation for opening a socket. - * \param fd: (output) file descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid + * \param socketDescriptor: (output) file descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid * \return status of open */ - SocketIpStatus openProtocol(NATIVE_INT_TYPE& fd) override; + SocketIpStatus openProtocol(SocketDescriptor& socketDescriptor) override; /** * \brief Protocol specific implementation of send. Called directly with retry from send. - * \param fd: file descriptor to send to + * \param socketDescriptor: descriptor to send to * \param data: data to send * \param size: size of data to send * \return: size of data sent, or -1 on error. */ - I32 sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) override; + I32 sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) override; /** * \brief Protocol specific implementation of recv. Called directly with error handling from recv. - * \param fd: file descriptor to recv from + * \param socketDescriptor: descriptor to recv from * \param data: data pointer to fill * \param size: size of data buffer * \return: size of data received, or -1 on error. */ - I32 recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) override; + I32 recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) override; private: SocketState* m_state; //!< State storage U16 m_recv_port; //!< IP address port used diff --git a/Drv/Ip/test/ut/SocketTestHelper.cpp b/Drv/Ip/test/ut/SocketTestHelper.cpp index dbf7c6c346..43c2400136 100644 --- a/Drv/Ip/test/ut/SocketTestHelper.cpp +++ b/Drv/Ip/test/ut/SocketTestHelper.cpp @@ -43,21 +43,46 @@ void validate_random_buffer(Fw::Buffer &buffer, U8 *data) { buffer.setSize(0); } -void fill_random_buffer(Fw::Buffer &buffer) { +U32 fill_random_buffer(Fw::Buffer &buffer) { buffer.setSize(STest::Pick::lowerUpper(1, buffer.getSize())); fill_random_data(buffer.getData(), buffer.getSize()); + return static_cast(buffer.getSize()); } -void send_recv(Drv::IpSocket& sender, Drv::IpSocket& receiver, NATIVE_INT_TYPE sender_fd, NATIVE_INT_TYPE receiver_fd) { +void drain(Drv::IpSocket& receiver, Drv::SocketDescriptor& receiver_fd) { + Drv::SocketIpStatus status = SOCK_SUCCESS; + // Drain the server in preparation for close + while (status == Drv::SOCK_SUCCESS || status == Drv::SOCK_NO_DATA_AVAILABLE) { + U8 buffer[1]; + U32 size = sizeof buffer; + status = receiver.recv(receiver_fd, buffer, size); + } + ASSERT_EQ(status, Drv::SocketIpStatus::SOCK_DISCONNECTED) << "Socket did not disconnect as expected"; +} + +void receive_all(Drv::IpSocket& receiver, Drv::SocketDescriptor& receiver_fd, U8* buffer, U32 size) { + ASSERT_NE(buffer, nullptr); + U32 received_size = 0; + Drv::SocketIpStatus status; + do { + U32 size_in_out = size - received_size; + status = receiver.recv(receiver_fd, buffer + received_size, size_in_out); + ASSERT_TRUE((status == Drv::SOCK_NO_DATA_AVAILABLE || status == Drv::SOCK_SUCCESS)); + received_size += size_in_out; + } while (size > received_size); + EXPECT_EQ(received_size, size); +} + +void send_recv(Drv::IpSocket& sender, Drv::IpSocket& receiver, Drv::SocketDescriptor& sender_fd, Drv::SocketDescriptor& receiver_fd) { U32 size = MAX_DRV_TEST_MESSAGE_SIZE; + U8 buffer_out[MAX_DRV_TEST_MESSAGE_SIZE] = {0}; U8 buffer_in[MAX_DRV_TEST_MESSAGE_SIZE] = {0}; // Send receive validate block Drv::Test::fill_random_data(buffer_out, MAX_DRV_TEST_MESSAGE_SIZE); EXPECT_EQ(sender.send(sender_fd, buffer_out, MAX_DRV_TEST_MESSAGE_SIZE), Drv::SOCK_SUCCESS); - EXPECT_EQ(receiver.recv(receiver_fd, buffer_in, size), Drv::SOCK_SUCCESS); - EXPECT_EQ(size, static_cast(MAX_DRV_TEST_MESSAGE_SIZE)); + receive_all(receiver, receiver_fd, buffer_in, size); Drv::Test::validate_random_data(buffer_out, buffer_in, MAX_DRV_TEST_MESSAGE_SIZE); } diff --git a/Drv/Ip/test/ut/SocketTestHelper.hpp b/Drv/Ip/test/ut/SocketTestHelper.hpp index 9b4d069a91..6b93ed1f7d 100644 --- a/Drv/Ip/test/ut/SocketTestHelper.hpp +++ b/Drv/Ip/test/ut/SocketTestHelper.hpp @@ -47,7 +47,7 @@ void validate_random_buffer(Fw::Buffer &buffer, U8 *data); * Fill random data into the buffer (using a random length). * @param buffer: buffer to fill. */ -void fill_random_buffer(Fw::Buffer &buffer); +U32 fill_random_buffer(Fw::Buffer &buffer); /** * Send/receive pair. @@ -56,7 +56,23 @@ void fill_random_buffer(Fw::Buffer &buffer); * @param sender_fd: file descriptor for sender * @param receiver_fd: file descriptor for receiver */ -void send_recv(Drv::IpSocket& sender, Drv::IpSocket& receiver, NATIVE_INT_TYPE sender_fd, NATIVE_INT_TYPE receiver_fd); +void send_recv(Drv::IpSocket& sender, Drv::IpSocket& receiver, Drv::SocketDescriptor& sender_fd, Drv::SocketDescriptor& receiver_fd); + +/** + * Drain bytes from the socket until disconnect received. + * @warning: must have called shutdown on the remote before calling this + * @param drain_fd: file descriptor for draining + */ +void drain(Drv::IpSocket& receiver, Drv::SocketDescriptor& drain_fd); + +/** + * Receive all data, reassembling the frame + * @param receiver: receiver + * @param receiver_fd: receiver descriptor + * @param buffer: buffer + * @param size: size to receive + */ +void receive_all(Drv::IpSocket& receiver, Drv::SocketDescriptor& receiver_fd, U8* buffer, U32 size); /** * Wait on socket change. diff --git a/Drv/Ip/test/ut/TestTcp.cpp b/Drv/Ip/test/ut/TestTcp.cpp index 64ad86dcbe..8641f9945d 100644 --- a/Drv/Ip/test/ut/TestTcp.cpp +++ b/Drv/Ip/test/ut/TestTcp.cpp @@ -19,11 +19,11 @@ void test_with_loop(U32 iterations) { U16 port = 0; // Choose a port Drv::TcpServerSocket server; - NATIVE_INT_TYPE server_fd = -1; - NATIVE_INT_TYPE client_fd = -1; + Drv::SocketDescriptor server_fd; + Drv::SocketDescriptor client_fd; server.configure("127.0.0.1", port, 0, 100); - EXPECT_EQ(server.startup(), Drv::SOCK_SUCCESS); - Drv::Test::force_recv_timeout(server_fd, server); + EXPECT_EQ(server.startup(server_fd), Drv::SOCK_SUCCESS); + Drv::Test::force_recv_timeout(server_fd.fd, server); // Loop through a bunch of client disconnects for (U32 i = 0; i < iterations; i++) { @@ -31,7 +31,7 @@ void test_with_loop(U32 iterations) { client.configure("127.0.0.1", server.getListenPort(),0,100); // client_fd gets assigned a real value here status1 = client.open(client_fd); - EXPECT_EQ(status1, Drv::SOCK_SUCCESS); + EXPECT_EQ(status1, Drv::SOCK_SUCCESS) << "With errno: " << errno; // client_fd gets assigned a real value here status2 = server.open(server_fd); @@ -41,15 +41,18 @@ void test_with_loop(U32 iterations) { // If all the opens worked, then run this if (Drv::SOCK_SUCCESS == status1 && Drv::SOCK_SUCCESS == status2) { // Force the sockets not to hang, if at all possible - Drv::Test::force_recv_timeout(client_fd, client); - Drv::Test::force_recv_timeout(server_fd, server); + Drv::Test::force_recv_timeout(client_fd.fd, client); + Drv::Test::force_recv_timeout(server_fd.fd, server); Drv::Test::send_recv(server, client, server_fd, client_fd); Drv::Test::send_recv(client, server, client_fd, server_fd); } - client.close(client_fd); + server.shutdown(client_fd); + // Drain the server before close + Drv::Test::drain(server, server_fd); server.close(server_fd); + client.close(client_fd); } - server.shutdown(server_fd); + server.terminate(server_fd); } diff --git a/Drv/Ip/test/ut/TestUdp.cpp b/Drv/Ip/test/ut/TestUdp.cpp index 95b97d1de0..d7f02471ca 100644 --- a/Drv/Ip/test/ut/TestUdp.cpp +++ b/Drv/Ip/test/ut/TestUdp.cpp @@ -15,8 +15,8 @@ void test_with_loop(U32 iterations, bool duplex) { Drv::SocketIpStatus status1 = Drv::SOCK_SUCCESS; Drv::SocketIpStatus status2 = Drv::SOCK_SUCCESS; - NATIVE_INT_TYPE udp1_fd = -1; - NATIVE_INT_TYPE udp2_fd = -1; + Drv::SocketDescriptor udp1_fd; + Drv::SocketDescriptor udp2_fd; U16 port1 = Drv::Test::get_free_port(true); ASSERT_NE(0, port1); @@ -49,8 +49,8 @@ void test_with_loop(U32 iterations, bool duplex) { // If all the opens worked, then run this if (Drv::SOCK_SUCCESS == status1 && Drv::SOCK_SUCCESS == status2) { // Force the sockets not to hang, if at all possible - Drv::Test::force_recv_timeout(udp1_fd, udp1); - Drv::Test::force_recv_timeout(udp2_fd, udp2); + Drv::Test::force_recv_timeout(udp1_fd.fd, udp1); + Drv::Test::force_recv_timeout(udp2_fd.fd, udp2); Drv::Test::send_recv(udp1, udp2, udp1_fd, udp2_fd); // Allow duplex connections if (duplex) { diff --git a/Drv/TcpClient/TcpClientComponentImpl.cpp b/Drv/TcpClient/TcpClientComponentImpl.cpp index 89dc366a40..8a89923f7a 100644 --- a/Drv/TcpClient/TcpClientComponentImpl.cpp +++ b/Drv/TcpClient/TcpClientComponentImpl.cpp @@ -23,8 +23,7 @@ namespace Drv { // ---------------------------------------------------------------------- TcpClientComponentImpl::TcpClientComponentImpl(const char* const compName) - : TcpClientComponentBase(compName), - SocketComponentHelper() {} + : TcpClientComponentBase(compName) {} SocketIpStatus TcpClientComponentImpl::configure(const char* hostname, const U16 port, @@ -35,7 +34,6 @@ SocketIpStatus TcpClientComponentImpl::configure(const char* hostname, // Check that ensures the configured buffer size fits within the limits fixed-width type, U32 FW_ASSERT(buffer_size <= std::numeric_limits::max(), static_cast(buffer_size)); m_allocation_size = buffer_size; // Store the buffer size - (void)startup(); return m_socket.configure(hostname, port, send_timeout_seconds, send_timeout_microseconds); } diff --git a/Drv/TcpClient/test/ut/TcpClientTestMain.cpp b/Drv/TcpClient/test/ut/TcpClientTestMain.cpp index 9f5c7e0683..92e21272ad 100644 --- a/Drv/TcpClient/test/ut/TcpClientTestMain.cpp +++ b/Drv/TcpClient/test/ut/TcpClientTestMain.cpp @@ -5,22 +5,22 @@ #include "TcpClientTester.hpp" -TEST(Nominal, BasicMessaging) { +TEST(Nominal, TcpClientBasicMessaging) { Drv::TcpClientTester tester; tester.test_basic_messaging(); } -TEST(Nominal, BasicReceiveThread) { +TEST(Nominal, TcpClientBasicReceiveThread) { Drv::TcpClientTester tester; tester.test_receive_thread(); } -TEST(Reconnect, MultiMessaging) { +TEST(Reconnect, TcpClientMultiMessaging) { Drv::TcpClientTester tester; tester.test_multiple_messaging(); } -TEST(Reconnect, ReceiveThreadReconnect) { +TEST(Reconnect, TcpClientReceiveThreadReconnect) { Drv::TcpClientTester tester; tester.test_advanced_reconnect(); } diff --git a/Drv/TcpClient/test/ut/TcpClientTester.cpp b/Drv/TcpClient/test/ut/TcpClientTester.cpp index edea68e2d7..33f9c28fb5 100644 --- a/Drv/TcpClient/test/ut/TcpClientTester.cpp +++ b/Drv/TcpClient/test/ut/TcpClientTester.cpp @@ -33,9 +33,9 @@ void TcpClientTester ::test_with_loop(U32 iterations, bool recv_thread) { Drv::TcpServerSocket server; server.configure("127.0.0.1", port, 0, 100); - NATIVE_INT_TYPE client_fd = -1; + Drv::SocketDescriptor server_fd; - serverStat = server.startup(); + serverStat = server.startup(server_fd); this->component.configure("127.0.0.1", server.getListenPort(), 0, 100); ASSERT_EQ(serverStat, SOCK_SUCCESS) @@ -60,36 +60,28 @@ void TcpClientTester ::test_with_loop(U32 iterations, bool recv_thread) { } EXPECT_TRUE(this->component.isOpened()); // fd has now been updated to be a value we need to keep track of - status2 = server.open(client_fd); + status2 = server.open(server_fd); EXPECT_EQ(status1, Drv::SOCK_SUCCESS); EXPECT_EQ(status2, Drv::SOCK_SUCCESS); - status2 = Drv::SOCK_NO_DATA_AVAILABLE; - // If all the opens worked, then run this if ((Drv::SOCK_SUCCESS == status1) && (Drv::SOCK_SUCCESS == status2) && (this->component.isOpened())) { // Force the sockets not to hang, if at all possible - Drv::Test::force_recv_timeout(this->component.m_fd, this->component.getSocketHandler()); - Drv::Test::force_recv_timeout(server.m_base_fd, server); + Drv::Test::force_recv_timeout(this->component.m_descriptor.fd, this->component.getSocketHandler()); + Drv::Test::force_recv_timeout(server_fd.serverFd, server); m_data_buffer.setSize(sizeof(m_data_storage)); - Drv::Test::fill_random_buffer(m_data_buffer); + size = Drv::Test::fill_random_buffer(m_data_buffer); Drv::SendStatus status = invoke_to_send(0, m_data_buffer); EXPECT_EQ(status, SendStatus::SEND_OK); - U16 counter = 0; - while ((status2 == Drv::SOCK_NO_DATA_AVAILABLE) and counter < Drv::Test::MAX_ITER) { - status2 = server.recv(client_fd, buffer, size); - counter++; - } - EXPECT_EQ(status2, Drv::SOCK_SUCCESS); - EXPECT_EQ(size, m_data_buffer.getSize()); + Drv::Test::receive_all(server, server_fd, buffer, size); Drv::Test::validate_random_buffer(m_data_buffer, buffer); // If receive thread is live, try the other way if (recv_thread) { m_spinner = false; m_data_buffer.setSize(sizeof(m_data_storage)); - status2 = server.send(client_fd, m_data_buffer.getData(), m_data_buffer.getSize()); + status2 = server.send(server_fd, m_data_buffer.getData(), m_data_buffer.getSize()); EXPECT_EQ(status2, Drv::SOCK_SUCCESS); from_deallocate_handler(0, m_data_buffer); while (not m_spinner) {} @@ -100,11 +92,16 @@ void TcpClientTester ::test_with_loop(U32 iterations, bool recv_thread) { this->component.stop(); this->component.join(); } else { + // Client should close to initiate a clean shutdown + // This is because the server "can't know" if the client is done until + // this close is hit, or the server initiates the shutdown. this->component.close(); } - server.close(client_fd); + // Safe server shutdown after client + Drv::Test::drain(server, server_fd); + server.close(server_fd); } - server.shutdown(client_fd); + server.terminate(server_fd); ASSERT_from_ready_SIZE(iterations); } diff --git a/Drv/TcpServer/TcpServerComponentImpl.cpp b/Drv/TcpServer/TcpServerComponentImpl.cpp index 43b78234b7..ce6502fd88 100644 --- a/Drv/TcpServer/TcpServerComponentImpl.cpp +++ b/Drv/TcpServer/TcpServerComponentImpl.cpp @@ -13,6 +13,7 @@ #include #include #include "Fw/Types/Assert.hpp" +#include "Fw/Logger/Logger.hpp" namespace Drv { @@ -21,8 +22,7 @@ namespace Drv { // ---------------------------------------------------------------------- TcpServerComponentImpl::TcpServerComponentImpl(const char* const compName) - : TcpServerComponentBase(compName), - SocketComponentHelper() {} + : TcpServerComponentBase(compName) {} SocketIpStatus TcpServerComponentImpl::configure(const char* hostname, const U16 port, @@ -70,6 +70,48 @@ void TcpServerComponentImpl::connected() { } } +bool TcpServerComponentImpl::isStarted() { + Os::ScopeLock scopedLock(this->m_lock); + return this->m_descriptor.serverFd != -1; +} + +SocketIpStatus TcpServerComponentImpl::startup() { + Os::ScopeLock scopedLock(this->m_lock); + Drv::SocketIpStatus status = SOCK_SUCCESS; + // Prevent multiple startup attempts + if (this->m_descriptor.serverFd == -1) { + status = this->m_socket.startup(this->m_descriptor); + } + return status; +} + +void TcpServerComponentImpl::terminate() { + Os::ScopeLock scopedLock(this->m_lock); + this->m_socket.terminate(this->m_descriptor); + this->m_descriptor.serverFd = -1; +} + +void TcpServerComponentImpl::readLoop() { + Drv::SocketIpStatus status = Drv::SocketIpStatus::SOCK_NOT_STARTED; + // Keep trying to reconnect until the status is good, told to stop, or reconnection is turned off + do { + status = this->startup(); + if (status != SOCK_SUCCESS) { + Fw::Logger::log("[WARNING] Failed to listen on port %hu with status %d\n", this->getListenPort(), status); + (void)Os::Task::delay(SOCKET_RETRY_INTERVAL); + continue; + } + } + while (this->running() && status != SOCK_SUCCESS && this->m_reconnect); + // If start up was successful then perform normal operations + if (this->running() && status == SOCK_SUCCESS) { + // Perform the nominal read loop + SocketComponentHelper::readLoop(); + // Terminate the server + this->terminate(); + } +} + // ---------------------------------------------------------------------- // Handler implementations for user-defined typed input ports // ---------------------------------------------------------------------- diff --git a/Drv/TcpServer/TcpServerComponentImpl.hpp b/Drv/TcpServer/TcpServerComponentImpl.hpp index 035087f7e5..1a9fe9497c 100644 --- a/Drv/TcpServer/TcpServerComponentImpl.hpp +++ b/Drv/TcpServer/TcpServerComponentImpl.hpp @@ -62,6 +62,26 @@ class TcpServerComponentImpl : public TcpServerComponentBase, public SocketCompo const U32 send_timeout_seconds = SOCKET_SEND_TIMEOUT_SECONDS, const U32 send_timeout_microseconds = SOCKET_SEND_TIMEOUT_MICROSECONDS); + /** + * \brief is started + */ + bool isStarted(); + + /** + * \brief startup the server socket for communications + * + * Start up the server socket by listening on a port. Note: does not accept clients, this is done in open to + * facilitate re-connection of clients. + */ + SocketIpStatus startup(); + + /** + * \brief terminate the server socket + * + * Close the server socket. Should be done after all clients are shutdown and closed. + */ + void terminate(); + /** * \brief get the port being listened on * @@ -85,7 +105,7 @@ class TcpServerComponentImpl : public TcpServerComponentBase, public SocketCompo * * \return IpSocket reference */ - IpSocket& getSocketHandler(); + IpSocket& getSocketHandler() override; /** * \brief returns a buffer to fill with data @@ -95,7 +115,7 @@ class TcpServerComponentImpl : public TcpServerComponentBase, public SocketCompo * * \return Fw::Buffer to fill with data */ - Fw::Buffer getBuffer(); + Fw::Buffer getBuffer() override; /** * \brief sends a buffer to be filled with data @@ -105,13 +125,17 @@ class TcpServerComponentImpl : public TcpServerComponentBase, public SocketCompo * * \return Fw::Buffer filled with data to send out */ - void sendBuffer(Fw::Buffer buffer, SocketIpStatus status); + void sendBuffer(Fw::Buffer buffer, SocketIpStatus status) override; /** * \brief called when the IPv4 system has been connected */ - void connected(); + void connected() override; + /** + * \brief read from the socket, overridden to start and terminate the server socket + */ + void readLoop() override; PRIVATE: @@ -134,7 +158,7 @@ class TcpServerComponentImpl : public TcpServerComponentBase, public SocketCompo * \param fwBuffer: buffer containing data to be sent * \return SEND_OK on success, SEND_RETRY when critical data should be retried and SEND_ERROR upon error */ - Drv::SendStatus send_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& fwBuffer); + Drv::SendStatus send_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& fwBuffer) override; Drv::TcpServerSocket m_socket; //!< Socket implementation }; diff --git a/Drv/TcpServer/test/ut/TcpServerTestMain.cpp b/Drv/TcpServer/test/ut/TcpServerTestMain.cpp index fdb0883ee1..1500241c47 100644 --- a/Drv/TcpServer/test/ut/TcpServerTestMain.cpp +++ b/Drv/TcpServer/test/ut/TcpServerTestMain.cpp @@ -4,22 +4,22 @@ #include "TcpServerTester.hpp" -TEST(Nominal, BasicMessaging) { +TEST(Nominal, TcpServerBasicMessaging) { Drv::TcpServerTester tester; tester.test_basic_messaging(); } -TEST(Nominal, BasicReceiveThread) { +TEST(Nominal, TcpServerBasicReceiveThread) { Drv::TcpServerTester tester; tester.test_receive_thread(); } -TEST(Reconnect, MultiMessaging) { +TEST(Reconnect, TcpServerMultiMessaging) { Drv::TcpServerTester tester; tester.test_multiple_messaging(); } -TEST(Reconnect, ReceiveThreadReconnect) { +TEST(Reconnect, TcpServerReceiveThreadReconnect) { Drv::TcpServerTester tester; tester.test_advanced_reconnect(); } diff --git a/Drv/TcpServer/test/ut/TcpServerTester.cpp b/Drv/TcpServer/test/ut/TcpServerTester.cpp index b820610c0c..a01ad660e5 100644 --- a/Drv/TcpServer/test/ut/TcpServerTester.cpp +++ b/Drv/TcpServer/test/ut/TcpServerTester.cpp @@ -27,10 +27,9 @@ void TcpServerTester ::test_with_loop(U32 iterations, bool recv_thread) { U8 buffer[sizeof(m_data_storage)] = {}; Drv::SocketIpStatus status1 = Drv::SOCK_SUCCESS; Drv::SocketIpStatus status2 = Drv::SOCK_SUCCESS; - Drv::SocketIpStatus serverStat = Drv::SOCK_SUCCESS; U16 port = 0; - NATIVE_INT_TYPE client_fd = -1; + Drv::SocketDescriptor client_fd; status1 = this->component.configure("127.0.0.1", port, 0, 100); EXPECT_EQ(status1, Drv::SOCK_SUCCESS); @@ -41,46 +40,45 @@ void TcpServerTester ::test_with_loop(U32 iterations, bool recv_thread) { EXPECT_TRUE(this->wait_on_started(true, Drv::Test::get_configured_delay_ms()/10 + 1)); } EXPECT_TRUE(component.isStarted()); - // Loop through a bunch of client disconnects - for (U32 i = 0; i < iterations && serverStat == SOCK_SUCCESS; i++) { + for (U32 i = 0; i < iterations && status1 == SOCK_SUCCESS; i++) { Drv::TcpClientSocket client; client.configure("127.0.0.1", this->component.getListenPort(), 0, 100); status2 = client.open(client_fd); - + EXPECT_EQ(status2, Drv::SocketIpStatus::SOCK_SUCCESS) << "Failed to connect client"; U32 size = sizeof(m_data_storage); // Not testing with reconnect thread, we will need to open ourselves if (not recv_thread) { status1 = this->component.open(); } else { - EXPECT_TRUE(this->wait_on_change(true, Drv::Test::get_configured_delay_ms()/10 + 1)); + EXPECT_TRUE(this->wait_on_change(true, Drv::Test::get_configured_delay_ms()/10 + 1)) << + "On iteration: " << i << " and receive thread: " << recv_thread; } - EXPECT_TRUE(this->component.isOpened()); + EXPECT_TRUE(this->component.isOpened()) << + "On iteration: " << i << " and receive thread: " << recv_thread; - EXPECT_EQ(status1, Drv::SOCK_SUCCESS); - EXPECT_EQ(status2, Drv::SOCK_SUCCESS); - - status2 = Drv::SOCK_NO_DATA_AVAILABLE; + EXPECT_EQ(status1, Drv::SOCK_SUCCESS) << + "On iteration: " << i << " and receive thread: " << recv_thread; + EXPECT_EQ(status2, Drv::SOCK_SUCCESS) << + "On iteration: " << i << " and receive thread: " << recv_thread; // If all the opens worked, then run this if ((Drv::SOCK_SUCCESS == status1) && (Drv::SOCK_SUCCESS == status2) && (this->component.isOpened())) { // Force the sockets not to hang, if at all possible - - Drv::Test::force_recv_timeout(this->component.m_fd, this->component.getSocketHandler()); - Drv::Test::force_recv_timeout(client_fd, client); + Drv::Test::force_recv_timeout(this->component.m_descriptor.fd, this->component.getSocketHandler()); + Drv::Test::force_recv_timeout(client_fd.fd, client); m_data_buffer.setSize(sizeof(m_data_storage)); - Drv::Test::fill_random_buffer(m_data_buffer); + size = Drv::Test::fill_random_buffer(m_data_buffer); Drv::SendStatus status = invoke_to_send(0, m_data_buffer); - EXPECT_EQ(status, SendStatus::SEND_OK); - U16 counter = 0; - while ((status2 == Drv::SOCK_NO_DATA_AVAILABLE) and counter < Drv::Test::MAX_ITER) { - status2 = client.recv(client_fd, buffer, size); - counter++; - } - EXPECT_EQ(status2, Drv::SOCK_SUCCESS); - EXPECT_EQ(size, m_data_buffer.getSize()); + EXPECT_EQ(status, SendStatus::SEND_OK) << + "On iteration: " << i << " and receive thread: " << recv_thread; + Drv::Test::receive_all(client, client_fd, buffer, size); + EXPECT_EQ(status2, Drv::SOCK_SUCCESS) << + "On iteration: " << i << " and receive thread: " << recv_thread << " and errno " << errno; + EXPECT_EQ(size, m_data_buffer.getSize()) << + "On iteration: " << i << " and receive thread: " << recv_thread; Drv::Test::validate_random_buffer(m_data_buffer, buffer); // If receive thread is live, try the other way @@ -88,24 +86,30 @@ void TcpServerTester ::test_with_loop(U32 iterations, bool recv_thread) { m_spinner = false; m_data_buffer.setSize(sizeof(m_data_storage)); status2 = client.send(client_fd, m_data_buffer.getData(), m_data_buffer.getSize()); - EXPECT_EQ(status2, Drv::SOCK_SUCCESS); - //from_deallocate_handler(0, m_data_buffer); - while (not m_spinner) {} + EXPECT_EQ(status2, Drv::SOCK_SUCCESS) << + "On iteration: " << i << " and receive thread: " << recv_thread; + if (status2 == Drv::SOCK_SUCCESS) { + while (not m_spinner) {} + } } } // Properly stop the client on the last iteration if (((1 + i) == iterations) && recv_thread) { this->component.stop(); - this->component.close(); - client.close(client_fd); // Client must be closed first or the server risks binding to an existing address - this->component.shutdown(); this->component.join(); } else { + // Server initiates shutdown. It thus must drain its data until it receives + // a socket disconnection. Then it can safely close. + this->component.shutdown(); + Drv::Test::drain(this->component.m_socket, this->component.m_descriptor); this->component.close(); - client.close(client_fd); // Client must be closed first or the server risks binding to an existing address } + // Server should have shutdown cleanly and waited for this to be shut down. It is safe + // to release the file descriptor. + client.close(client_fd); } + this->component.terminate(); ASSERT_from_ready_SIZE(iterations); } @@ -168,7 +172,7 @@ void TcpServerTester ::test_advanced_reconnect() { void TcpServerTester ::from_recv_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& recvBuffer, const RecvStatus& recvStatus) { // this function will still receive a status of error because the recv port is always called this->pushFromPortEntry_recv(recvBuffer, recvStatus); - if (recvStatus == RecvStatus::RECV_OK){ + if (recvStatus == RecvStatus::RECV_OK) { // Make sure we can get to unblocking the spinner EXPECT_EQ(m_data_buffer.getSize(), recvBuffer.getSize()) << "Invalid transmission size"; Drv::Test::validate_random_buffer(m_data_buffer, recvBuffer.getData()); @@ -189,7 +193,6 @@ Fw::Buffer TcpServerTester :: { this->pushFromPortEntry_allocate(size); Fw::Buffer buffer(new U8[size], size); - m_data_buffer2 = buffer; return buffer; } diff --git a/Drv/TcpServer/test/ut/TcpServerTester.hpp b/Drv/TcpServer/test/ut/TcpServerTester.hpp index 318fdff820..ebecd59b9c 100644 --- a/Drv/TcpServer/test/ut/TcpServerTester.hpp +++ b/Drv/TcpServer/test/ut/TcpServerTester.hpp @@ -132,7 +132,6 @@ namespace Drv { //! TcpServerComponentImpl component; Fw::Buffer m_data_buffer; - Fw::Buffer m_data_buffer2; U8 m_data_storage[SEND_DATA_BUFFER_SIZE]; std::atomic m_spinner; diff --git a/Drv/Udp/UdpComponentImpl.cpp b/Drv/Udp/UdpComponentImpl.cpp index 16af1bc1c8..d5ba8f3eed 100644 --- a/Drv/Udp/UdpComponentImpl.cpp +++ b/Drv/Udp/UdpComponentImpl.cpp @@ -23,23 +23,16 @@ namespace Drv { // ---------------------------------------------------------------------- UdpComponentImpl::UdpComponentImpl(const char* const compName) - : UdpComponentBase(compName), - SocketComponentHelper() {} + : UdpComponentBase(compName) {} SocketIpStatus UdpComponentImpl::configureSend(const char* hostname, const U16 port, const U32 send_timeout_seconds, const U32 send_timeout_microseconds) { - if (not this->isStarted()) { - (void)this->startup(); - } return m_socket.configureSend(hostname, port, send_timeout_seconds, send_timeout_microseconds); } SocketIpStatus UdpComponentImpl::configureRecv(const char* hostname, const U16 port) { - if (not this->isStarted()) { - (void)this->startup(); - } return m_socket.configureRecv(hostname, port); } diff --git a/Drv/Udp/UdpComponentImpl.hpp b/Drv/Udp/UdpComponentImpl.hpp index 41a625472f..7829c7ec9b 100644 --- a/Drv/Udp/UdpComponentImpl.hpp +++ b/Drv/Udp/UdpComponentImpl.hpp @@ -147,7 +147,6 @@ class UdpComponentImpl : public UdpComponentBase, public SocketComponentHelper { * \return SEND_OK on success, SEND_RETRY when critical data should be retried and SEND_ERROR upon error */ Drv::SendStatus send_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& fwBuffer); - Drv::UdpSocket m_socket; //!< Socket implementation }; diff --git a/Drv/Udp/test/ut/UdpTestMain.cpp b/Drv/Udp/test/ut/UdpTestMain.cpp index 30005bf151..606c7ea908 100644 --- a/Drv/Udp/test/ut/UdpTestMain.cpp +++ b/Drv/Udp/test/ut/UdpTestMain.cpp @@ -4,22 +4,22 @@ #include "UdpTester.hpp" -TEST(Nominal, BasicMessaging) { +TEST(Nominal, UdpBasicMessaging) { Drv::UdpTester tester; tester.test_basic_messaging(); } -TEST(Nominal, BasicReceiveThread) { +TEST(Nominal, UdpBasicReceiveThread) { Drv::UdpTester tester; tester.test_receive_thread(); } -TEST(Reconnect, MultiMessaging) { +TEST(Reconnect, UdpMultiMessaging) { Drv::UdpTester tester; tester.test_multiple_messaging(); } -TEST(Reconnect, ReceiveThreadReconnect) { +TEST(Reconnect, UdpReceiveThreadReconnect) { Drv::UdpTester tester; tester.test_advanced_reconnect(); } diff --git a/Drv/Udp/test/ut/UdpTester.cpp b/Drv/Udp/test/ut/UdpTester.cpp index 3e1dae4099..773a9cbf1f 100644 --- a/Drv/Udp/test/ut/UdpTester.cpp +++ b/Drv/Udp/test/ut/UdpTester.cpp @@ -28,7 +28,7 @@ void UdpTester::test_with_loop(U32 iterations, bool recv_thread) { U8 buffer[sizeof(m_data_storage)] = {}; Drv::SocketIpStatus status1 = Drv::SOCK_SUCCESS; Drv::SocketIpStatus status2 = Drv::SOCK_SUCCESS; - NATIVE_INT_TYPE udp2_fd = -1; + Drv::SocketDescriptor udp2_fd; U16 port1 = Drv::Test::get_free_port(true); ASSERT_NE(0, port1); @@ -82,26 +82,18 @@ void UdpTester::test_with_loop(U32 iterations, bool recv_thread) { << "UDP socket open error: " << strerror(errno) << std::endl << "Port1: " << port1 << std::endl << "Port2: " << port2 << std::endl; - - status2 = Drv::SOCK_NO_DATA_AVAILABLE; // If all the opens worked, then run this if ((Drv::SOCK_SUCCESS == status1) && (Drv::SOCK_SUCCESS == status2) && (this->component.isOpened())) { // Force the sockets not to hang, if at all possible - Drv::Test::force_recv_timeout(this->component.m_fd, this->component.getSocketHandler()); - Drv::Test::force_recv_timeout(udp2_fd, udp2); + Drv::Test::force_recv_timeout(this->component.m_descriptor.fd, this->component.getSocketHandler()); + Drv::Test::force_recv_timeout(udp2_fd.fd, udp2); m_data_buffer.setSize(sizeof(m_data_storage)); - Drv::Test::fill_random_buffer(m_data_buffer); + size = Drv::Test::fill_random_buffer(m_data_buffer); Drv::SendStatus status = invoke_to_send(0, m_data_buffer); EXPECT_EQ(status, SendStatus::SEND_OK); - U16 counter = 0; - while ((status2 == Drv::SOCK_NO_DATA_AVAILABLE) and counter < Drv::Test::MAX_ITER) { - status2 = udp2.recv(udp2_fd, buffer, size); - counter++; - } - EXPECT_EQ(status2, Drv::SOCK_SUCCESS); - EXPECT_EQ(size, m_data_buffer.getSize()); + Drv::Test::receive_all(udp2, udp2_fd, buffer, size); Drv::Test::validate_random_buffer(m_data_buffer, buffer); // If receive thread is live, try the other way if (recv_thread) {