Skip to content

Commit

Permalink
Refactor and update the IO/TCP to make it's behaviour more consistent…
Browse files Browse the repository at this point in the history
… and reliable (windows to come)
  • Loading branch information
TrentHouliston committed Aug 9, 2023
1 parent 59fb065 commit 44b35ca
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 223 deletions.
28 changes: 25 additions & 3 deletions src/dsl/word/IO.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,18 @@ namespace dsl {
namespace word {

struct IOConfiguration {
IOConfiguration(fd_t fd, int events, const std::shared_ptr<threading::Reaction>& reaction)
: fd(fd), events(events), reaction(reaction) {}
fd_t fd;
int events;
std::shared_ptr<threading::Reaction> reaction;
};

struct IOFinished {
IOFinished(const uint64_t& id) : id(id) {}
uint64_t id;
};

/**
* @brief
* This is used to trigger reactions based on standard I/O operations using file descriptors.
Expand Down Expand Up @@ -68,15 +75,25 @@ namespace dsl {
* @par Implements
* Bind
*/
struct IO : public Single {
struct IO {

// On windows we use different wait events
#ifdef _WIN32
// NOLINTNEXTLINE(google-runtime-int)
enum EventType : short{READ = FD_READ | FD_OOB | FD_ACCEPT, WRITE = FD_WRITE, CLOSE = FD_CLOSE, ERROR = 0};
enum EventType : short {
READ = FD_READ | FD_OOB | FD_ACCEPT,
WRITE = FD_WRITE,
CLOSE = FD_CLOSE,
ERROR = 0,
};
#else
// NOLINTNEXTLINE(google-runtime-int)
enum EventType : short { READ = POLLIN, WRITE = POLLOUT, CLOSE = POLLHUP, ERROR = POLLNVAL | POLLERR };
enum EventType : short {
READ = POLLIN,
WRITE = POLLOUT,
CLOSE = POLLHUP,
ERROR = POLLNVAL | POLLERR,
};
#endif

struct Event {
Expand Down Expand Up @@ -114,6 +131,11 @@ namespace dsl {
// Otherwise return an invalid event
return Event{INVALID_SOCKET, 0};
}

template <typename DSL>
static inline void postcondition(threading::ReactionTask& task) {
task.parent.reactor.emit<emit::Direct>(std::make_unique<IOFinished>(task.parent.id));
}
};

} // namespace word
Expand Down
14 changes: 5 additions & 9 deletions src/dsl/word/TCP.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace dsl {
* @par Implements
* Bind
*/
struct TCP {
struct TCP : public IO {

struct Connection {

Expand Down Expand Up @@ -124,10 +124,8 @@ namespace dsl {
port = ntohs(address.sin_port);

// Generate a reaction for the IO system that closes on death
const fd_t cfd = fd;
reaction->unbinders.push_back([](const threading::Reaction& r) {
r.reactor.emit<emit::Direct>(std::make_unique<operation::Unbind<IO>>(r.id));
});
const fd_t cfd = fd.release();

reaction->unbinders.push_back([cfd](const threading::Reaction&) {
::shutdown(cfd, SHUT_RDWR);
#ifdef _WIN32
Expand All @@ -137,10 +135,8 @@ namespace dsl {
#endif
});

auto io_config = std::make_unique<IOConfiguration>(IOConfiguration{fd.release(), IO::READ, reaction});

// Send our configuration out
reaction->reactor.emit<emit::Direct>(io_config);
// Bind using the IO system
IO::bind<DSL>(reaction, cfd, IO::READ | IO::CLOSE);

// Return our handles
return std::make_tuple(port, cfd);
Expand Down
45 changes: 11 additions & 34 deletions src/dsl/word/UDP.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace dsl {
* @par Implements
* Bind
*/
struct UDP {
struct UDP : public IO {

struct Packet {
Packet() = default;
Expand Down Expand Up @@ -148,16 +148,9 @@ namespace dsl {
port = ntohs(address.sin_port);

// Generate a reaction for the IO system that closes on death
const fd_t cfd = fd;
reaction->unbinders.push_back([](const threading::Reaction& r) {
r.reactor.emit<emit::Direct>(std::make_unique<operation::Unbind<IO>>(r.id));
});
reaction->unbinders.push_back([cfd](const threading::Reaction&) { close(cfd); });

auto io_config = std::make_unique<IOConfiguration>(IOConfiguration{fd.release(), IO::READ, reaction});

// Send our configuration out
reaction->reactor.emit<emit::Direct>(io_config);
const fd_t cfd = fd.release();
reaction->unbinders.push_back([cfd](const threading::Reaction&) { ::close(cfd); });
IO::bind<DSL>(reaction, cfd, IO::READ | IO::CLOSE);

// Return our handles and our bound port
return std::make_tuple(port, cfd);
Expand Down Expand Up @@ -248,7 +241,7 @@ namespace dsl {
return p;
}

struct Broadcast {
struct Broadcast : public IO {

template <typename DSL>
static inline std::tuple<in_port_t, fd_t> bind(const std::shared_ptr<threading::Reaction>& reaction,
Expand Down Expand Up @@ -306,17 +299,9 @@ namespace dsl {
port = ntohs(address.sin_port);

// Generate a reaction for the IO system that closes on death
const fd_t cfd = fd;
reaction->unbinders.push_back([](const threading::Reaction& r) {
r.reactor.emit<emit::Direct>(std::make_unique<operation::Unbind<IO>>(r.id));
});
reaction->unbinders.push_back([cfd](const threading::Reaction&) { close(cfd); });

auto io_config =
std::make_unique<IOConfiguration>(IOConfiguration{fd.release(), IO::READ, reaction});

// Send our configuration out
reaction->reactor.emit<emit::Direct>(io_config);
const fd_t cfd = fd.release();
reaction->unbinders.push_back([cfd](const threading::Reaction&) { ::close(cfd); });
IO::bind<DSL>(reaction, cfd, IO::READ | IO::CLOSE);

// Return our handles and our bound port
return std::make_tuple(port, cfd);
Expand All @@ -328,7 +313,7 @@ namespace dsl {
}
};

struct Multicast {
struct Multicast : public IO {

template <typename DSL>
static inline std::tuple<in_port_t, fd_t> bind(const std::shared_ptr<threading::Reaction>& reaction,
Expand Down Expand Up @@ -412,17 +397,9 @@ namespace dsl {
}

// Generate a reaction for the IO system that closes on death
const fd_t cfd = fd;
reaction->unbinders.push_back([](const threading::Reaction& r) {
r.reactor.emit<emit::Direct>(std::make_unique<operation::Unbind<IO>>(r.id));
});
const fd_t cfd = fd.release();
reaction->unbinders.push_back([cfd](const threading::Reaction&) { close(cfd); });

auto io_config =
std::make_unique<IOConfiguration>(IOConfiguration{fd.release(), IO::READ, reaction});

// Send our configuration out for each file descriptor (same reaction)
reaction->reactor.emit<emit::Direct>(io_config);
IO::bind<DSL>(reaction, cfd, IO::READ | IO::CLOSE);

// Return our handles
return std::make_tuple(port, cfd);
Expand Down
Loading

0 comments on commit 44b35ca

Please sign in to comment.