diff --git a/src/extension/IOController.cpp b/src/extension/IOController.cpp new file mode 100644 index 00000000..4d1542bc --- /dev/null +++ b/src/extension/IOController.cpp @@ -0,0 +1,27 @@ +/* + * MIT License + * + * Copyright (c) 2015 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifdef _WIN32 + #include "IOController_Windows.ipp" +#else + #include "IOController_Posix.ipp" +#endif // _WIN32 diff --git a/src/extension/IOController.hpp b/src/extension/IOController.hpp index 02d671ad..db11af2e 100644 --- a/src/extension/IOController.hpp +++ b/src/extension/IOController.hpp @@ -20,13 +20,121 @@ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -#ifndef NUCLEAR_EXTENSION_IOCONTROLLER -#define NUCLEAR_EXTENSION_IOCONTROLLER +#ifndef NUCLEAR_EXTENSION_IO_CONTROLLER_HPP +#define NUCLEAR_EXTENSION_IO_CONTROLLER_HPP +#include "../Reactor.hpp" +#include "../dsl/word/IO.hpp" +#include "../util/platform.hpp" + +namespace NUClear { +namespace extension { + + class IOController : public Reactor { + struct Task; + + private: + // On windows and posix platforms there are slightly different types that are used #ifdef _WIN32 - #include "IOController_Windows.hpp" + using event_t = long; // NOLINT(google-runtime-int) + using watcher_t = WSAEVENT; + using notifier_t = WSAEVENT; + using tasks_t = std::map; #else - #include "IOController_Posix.hpp" -#endif // _WIN32 + using event_t = decltype(pollfd::events); + using watcher_t = pollfd; + using tasks_t = std::vector; + struct notifier_t { + fd_t recv{-1}; ///< This is the file descriptor that is waited on by poll + fd_t send{-1}; ///< This is the file descriptor that is written to to wake up the poll command + std::mutex mutex; ///< This mutex is used to ensure that a write to poll has worked + }; +#endif + + /** + * A task that is waiting for an IO event. + */ + struct Task { + Task() = default; + Task(const fd_t& fd, event_t listening_events, std::shared_ptr reaction) + : fd(fd), listening_events(listening_events), reaction(std::move(reaction)) {} + + /// The file descriptor we are waiting on + fd_t fd{INVALID_SOCKET}; + /// The events that the task is interested in + event_t listening_events{0}; + /// The events that are waiting to be fired + event_t waiting_events{0}; + /// The events that are currently being processed + event_t processing_events{0}; + /// The reaction that is waiting for this event + std::shared_ptr reaction{nullptr}; + + /** + * Sorts the tasks by their file descriptor. + * + * The tasks are sorted by file descriptor so that when we rebuild the list of file descriptors to poll we + * can assume that if the same file descriptor shows up multiple times it will be next to each other. + * This allows the events that are being watched to be or'ed together. + * + * @param other the other task to compare to + * + * @return `true` if this task is less than the other + */ + bool operator<(const Task& other) const { + return fd == other.fd ? listening_events < other.listening_events : fd < other.fd; + } + }; + + /** + * Rebuilds the list of file descriptors to poll. + * + * This function is called when the list of file descriptors to poll changes. + * It will rebuild the list of file descriptors used by poll. + */ + void rebuild_list(); + + /** + * Fires the event for the task if it is ready. + * + * @param task The task to try to fire the event for + */ + void fire_event(Task& task); + + /** + * Collects the events that have happened and sets them up to fire. + */ + void process_event(watcher_t& event); + + /** + * Bumps the notification pipe to wake up the poll command. + * + * If the poll command is waiting it will wait forever if something doesn't happen. + * When trying to update what to poll or shut down we need to wake it up so it can. + */ + // NOLINTNEXTLINE(readability-make-member-function-const) this changes states + void bump(); + + public: + explicit IOController(std::unique_ptr environment); + + private: + /// The event that is used to wake up the WaitForMultipleEvents call + notifier_t notifier; + + /// Whether or not we are shutting down + std::atomic shutdown{false}; + /// The mutex that protects the tasks list + std::mutex tasks_mutex; + /// Whether or not the list of file descriptors is dirty compared to tasks + bool dirty = true; + /// The list of events that are being watched + std::vector watches; + /// The list of tasks that are waiting for IO events + tasks_t tasks; + }; + +} // namespace extension +} // namespace NUClear -#endif // NUCLEAR_EXTENSION_IOCONTROLLER +#endif // NUCLEAR_EXTENSION_IO_CONTROLLER_HPP diff --git a/src/extension/IOController_Posix.hpp b/src/extension/IOController_Posix.hpp deleted file mode 100644 index 0be49e66..00000000 --- a/src/extension/IOController_Posix.hpp +++ /dev/null @@ -1,391 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2015 NUClear Contributors - * - * This file is part of the NUClear codebase. - * See https://github.com/Fastcode/NUClear for further info. - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated - * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the - * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to - * permit persons to whom the Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the - * Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE - * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR - * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -#ifndef NUCLEAR_EXTENSION_IOCONTROLLER_POSIX_HPP -#define NUCLEAR_EXTENSION_IOCONTROLLER_POSIX_HPP - -#include -#include - -#include -#include -#include - -#include "../Reactor.hpp" -#include "../dsl/word/IO.hpp" - -namespace NUClear { -namespace extension { - - class IOController : public Reactor { - private: - /// The type that poll uses for events - using event_t = decltype(pollfd::events); - - /** - * A task that is waiting for an IO event. - */ - struct Task { - Task() = default; - // NOLINTNEXTLINE(google-runtime-int) - Task(const fd_t& fd, event_t listening_events, std::shared_ptr reaction) - : fd(fd), listening_events(listening_events), reaction(std::move(reaction)) {} - - /// The file descriptor we are waiting on - fd_t fd{-1}; - /// The events that the task is interested in - event_t listening_events{0}; - /// The events that are waiting to be fired - event_t waiting_events{0}; - /// The events that are currently being processed - event_t processing_events{0}; - /// The reaction that is waiting for this event - std::shared_ptr reaction{nullptr}; - - /** - * Sorts the tasks by their file descriptor. - * - * The tasks are sorted by file descriptor so that when we rebuild the list of file descriptors to poll we - * can assume that if the same file descriptor shows up multiple times it will be next to each other. - * This allows the events that are being watched to be or'ed together. - * - * @param other the other task to compare to - * - * @return `true` if this task is less than the other - */ - bool operator<(const Task& other) const { - return fd == other.fd ? listening_events < other.listening_events : fd < other.fd; - } - }; - - /** - * Rebuilds the list of file descriptors to poll. - * - * This function is called when the list of file descriptors to poll changes. - * It will rebuild the list of file descriptors used by poll. - */ - void rebuild_list() { - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - - // Clear our fds to be rebuilt - watches.resize(0); - - // Insert our notify fd - watches.push_back(pollfd{notify_recv, POLLIN, 0}); - - for (const auto& r : tasks) { - // If we are the same fd, then add our interest set and mask out events that are already being processed - if (r.fd == watches.back().fd) { - watches.back().events = event_t((watches.back().events | r.listening_events) - & ~(r.processing_events | r.waiting_events)); - } - // Otherwise add a new one and mask out events that are already being processed - else { - watches.push_back( - pollfd{r.fd, event_t(r.listening_events & ~(r.processing_events | r.waiting_events)), 0}); - } - } - - // We just cleaned the list! - dirty = false; - } - - /** - * Fires the event for the task if it is ready. - * - * @param task The task to try to fire the event for - */ - void fire_event(Task& task) { - if (task.processing_events == 0 && task.waiting_events != 0) { - - // Make our event to pass through and store it in the local cache - IO::Event e{}; - e.fd = task.fd; - e.events = task.waiting_events; - - // Submit the task (which should run the get) - IO::ThreadEventStore::value = &e; - std::unique_ptr r = task.reaction->get_task(); - IO::ThreadEventStore::value = nullptr; - - if (r != nullptr) { - // Clear the waiting events, we are now processing them - task.processing_events = task.waiting_events; - task.waiting_events = 0; - - // Mask out the currently processing events so poll doesn't notify for them - auto it = - std::lower_bound(watches.begin(), watches.end(), task.fd, [&](const pollfd& w, const fd_t& fd) { - return w.fd < fd; - }); - if (it != watches.end() && it->fd == task.fd) { - it->events = event_t(it->events & ~task.processing_events); - } - - powerplant.submit(std::move(r)); - } - else { - task.waiting_events = event_t(task.waiting_events | task.processing_events); - task.processing_events = 0; - } - } - } - - /** - * Collects the events that have happened and sets them up to fire. - */ - void process_events() { - - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - - for (auto& fd : watches) { - - // Something happened - if (fd.revents != 0) { - - // It's our notification handle - if (fd.fd == notify_recv) { - // Read our value to clear it's read status - char val = 0; - if (::read(fd.fd, &val, sizeof(char)) < 0) { - throw std::system_error(network_errno, - std::system_category(), - "There was an error reading our notification pipe?"); - } - } - // It's a regular handle - else { - // Check if we have a read event but 0 bytes to read, this can happen when a socket is closed - // On linux we don't get a close event, we just keep getting read events with 0 bytes - // To make the close event happen if we get a read event with 0 bytes we will check if there are - // any currently processing reads and if not, then close - bool maybe_eof = false; - if ((fd.revents & IO::READ) != 0) { - int bytes_available = 0; - const bool valid = ::ioctl(fd.fd, FIONREAD, &bytes_available) == 0; - if (valid && bytes_available == 0) { - maybe_eof = true; - } - } - - // Find our relevant tasks - auto range = std::equal_range(tasks.begin(), - tasks.end(), - Task{fd.fd, 0, nullptr}, - [](const Task& a, const Task& b) { return a.fd < b.fd; }); - - // There are no tasks for this! - if (range.first == tasks.end()) { - // If this happens then our list is definitely dirty... - dirty = true; - } - else { - // Loop through our values - for (auto it = range.first; it != range.second; ++it) { - // Load in the relevant events that happened into the waiting events - it->waiting_events = event_t(it->waiting_events | (it->listening_events & fd.revents)); - - if (maybe_eof && (it->processing_events & IO::READ) == 0) { - it->waiting_events |= IO::CLOSE; - } - - fire_event(*it); - } - } - } - - // Clear the events from poll to avoid double firing - fd.revents = 0; - } - } - } - - /** - * Bumps the notification pipe to wake up the poll command - * - * If the poll command is waiting it will wait forever if something doesn't happen. - * When trying to update what to poll or shut down we need to wake it up so it can. - */ - // NOLINTNEXTLINE(readability-make-member-function-const) this changes states - void bump() { - // Check if there was an error - uint8_t val = 1; - if (::write(notify_send, &val, sizeof(val)) < 0) { - throw std::system_error(network_errno, - std::system_category(), - "There was an error while writing to the notification pipe"); - } - - // Locking here will ensure we won't return until poll is not running - const std::lock_guard lock(poll_mutex); - } - - public: - explicit IOController(std::unique_ptr environment) : Reactor(std::move(environment)) { - - std::array vals = {-1, -1}; - const int i = ::pipe(vals.data()); - if (i < 0) { - throw std::system_error(network_errno, - std::system_category(), - "We were unable to make the notification pipe for IO"); - } - notify_recv = vals[0]; - notify_send = vals[1]; - - // Start by rebuilding the list - rebuild_list(); - - on>().then( - "Configure IO Reaction", - [this](const dsl::word::IOConfiguration& config) { - // Lock our mutex to avoid concurrent modification - const std::lock_guard lock(tasks_mutex); - - // NOLINTNEXTLINE(google-runtime-int) - tasks.emplace_back(config.fd, event_t(config.events), config.reaction); - - // Resort our list - std::sort(tasks.begin(), tasks.end()); - - // Let the poll command know that stuff happened - dirty = true; - bump(); - }); - - on>().then("IO Finished", [this](const dsl::word::IOFinished& event) { - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - - // Find the reaction that finished processing - auto task = std::find_if(tasks.begin(), tasks.end(), [&event](const Task& t) { - return t.reaction->id == event.id; - }); - - if (task != tasks.end()) { - // If the events we were processing included close remove it from the list - if ((task->processing_events & IO::CLOSE) != 0) { - dirty = true; - tasks.erase(task); - } - else { - // Make sure poll isn't currently waiting for an event to happen - bump(); - - // Unmask the events that were just processed - auto it = std::lower_bound(watches.begin(), - watches.end(), - task->fd, - [&](const pollfd& w, const fd_t& fd) { return w.fd < fd; }); - if (it != watches.end() && it->fd == task->fd) { - it->events = event_t(it->events | task->processing_events); - } - - // No longer processing events - task->processing_events = 0; - - // Try to fire again which will check if there are any waiting events - fire_event(*task); - } - } - }); - - on>>().then( - "Unbind IO Reaction", - [this](const dsl::operation::Unbind& unbind) { - // Lock our mutex to avoid concurrent modification - const std::lock_guard lock(tasks_mutex); - - // Find our reaction - auto reaction = std::find_if(tasks.begin(), tasks.end(), [&unbind](const Task& t) { - return t.reaction->id == unbind.id; - }); - - if (reaction != tasks.end()) { - tasks.erase(reaction); - } - - // Let the poll command know that stuff happened - dirty = true; - bump(); - }); - - on().then("Shutdown IO Controller", [this] { - // Set shutdown to true so it won't try to poll again - shutdown.store(true); - bump(); - }); - - on().then("IO Controller", [this] { - // To make sure we don't get caught in a weird loop - // shutdown keeps us out here - if (!shutdown.load()) { - - // Rebuild the list if something changed - if (dirty) { - rebuild_list(); - } - - // Wait for an event to happen on one of our file descriptors - /* mutex scope */ { - const std::lock_guard lock(poll_mutex); - if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) { - throw std::system_error( - network_errno, - std::system_category(), - "There was an IO error while attempting to poll the file descriptors"); - } - } - - // Collect the events that happened into the tasks list - process_events(); - } - }); - } - - private: - /// The receive file descriptor for our notification pipe - fd_t notify_recv{-1}; - /// The send file descriptor for our notification pipe - fd_t notify_send{-1}; - - /// The mutex to wait on when bumping to ensure poll has returned - std::mutex poll_mutex; - - /// Whether or not we are shutting down - std::atomic shutdown{false}; - /// The mutex that protects the tasks list - std::mutex tasks_mutex; - /// Whether or not the list of file descriptors is dirty compared to tasks - bool dirty = true; - /// The list of file descriptors to poll - std::vector watches; - /// The list of tasks that are waiting for IO events - std::vector tasks; - }; - -} // namespace extension -} // namespace NUClear - -#endif // NUCLEAR_EXTENSION_IOCONTROLLER_POSIX_HPP diff --git a/src/extension/IOController_Posix.ipp b/src/extension/IOController_Posix.ipp new file mode 100644 index 00000000..c007477b --- /dev/null +++ b/src/extension/IOController_Posix.ipp @@ -0,0 +1,292 @@ +/* + * MIT License + * + * Copyright (c) 2015 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include "IOController.hpp" + +namespace NUClear { +namespace extension { + + void IOController::rebuild_list() { + // Get the lock so we don't concurrently modify the list + const std::lock_guard lock(tasks_mutex); + + // Clear our fds to be rebuilt + watches.resize(0); + + // Insert our notify fd + watches.push_back(pollfd{notifier.recv, POLLIN, 0}); + + for (const auto& r : tasks) { + // If we are the same fd, then add our interest set and mask out events that are already being processed + if (r.fd == watches.back().fd) { + watches.back().events = + event_t((watches.back().events | r.listening_events) & ~(r.processing_events | r.waiting_events)); + } + // Otherwise add a new one and mask out events that are already being processed + else { + watches.push_back( + pollfd{r.fd, event_t(r.listening_events & ~(r.processing_events | r.waiting_events)), 0}); + } + } + + // We just cleaned the list! + dirty = false; + } + + void IOController::fire_event(Task& task) { + if (task.processing_events == 0 && task.waiting_events != 0) { + + // Make our event to pass through and store it in the local cache + IO::Event e{}; + e.fd = task.fd; + e.events = task.waiting_events; + + // Submit the task (which should run the get) + IO::ThreadEventStore::value = &e; + std::unique_ptr r = task.reaction->get_task(); + IO::ThreadEventStore::value = nullptr; + + if (r != nullptr) { + // Clear the waiting events, we are now processing them + task.processing_events = task.waiting_events; + task.waiting_events = 0; + + // Mask out the currently processing events so poll doesn't notify for them + auto it = + std::lower_bound(watches.begin(), watches.end(), task.fd, [&](const pollfd& w, const fd_t& fd) { + return w.fd < fd; + }); + if (it != watches.end() && it->fd == task.fd) { + it->events = event_t(it->events & ~task.processing_events); + } + + powerplant.submit(std::move(r)); + } + else { + task.waiting_events = event_t(task.waiting_events | task.processing_events); + task.processing_events = 0; + } + } + } + + void IOController::process_event(pollfd& event) { + + + // It's our notification handle + if (event.fd == notifier.recv) { + // Read our value to clear it's read status + char val = 0; + if (::read(event.fd, &val, sizeof(char)) < 0) { + throw std::system_error(network_errno, + std::system_category(), + "There was an error reading our notification pipe?"); + } + } + // It's a regular handle + else { + // Check if we have a read event but 0 bytes to read, this can happen when a socket is closed + // On linux we don't get a close event, we just keep getting read events with 0 bytes + // To make the close event happen if we get a read event with 0 bytes we will check if there are + // any currently processing reads and if not, then close + bool maybe_eof = false; + if ((event.revents & IO::READ) != 0) { + int bytes_available = 0; + const bool valid = ::ioctl(event.fd, FIONREAD, &bytes_available) == 0; + if (valid && bytes_available == 0) { + maybe_eof = true; + } + } + + // Find our relevant tasks + auto range = std::equal_range(tasks.begin(), + tasks.end(), + Task{event.fd, 0, nullptr}, + [](const Task& a, const Task& b) { return a.fd < b.fd; }); + + // There are no tasks for this! + if (range.first == tasks.end()) { + // If this happens then our list is definitely dirty... + dirty = true; + } + else { + // Loop through our values + for (auto it = range.first; it != range.second; ++it) { + // Load in the relevant events that happened into the waiting events + it->waiting_events = event_t(it->waiting_events | (it->listening_events & event.revents)); + + if (maybe_eof && (it->processing_events & IO::READ) == 0) { + it->waiting_events |= IO::CLOSE; + } + + fire_event(*it); + } + } + } + + // Clear the events from poll to avoid double firing + event.revents = 0; + } + + void IOController::bump() { + // Check if there was an error + uint8_t val = 1; + if (::write(notifier.send, &val, sizeof(val)) < 0) { + throw std::system_error(network_errno, + std::system_category(), + "There was an error while writing to the notification pipe"); + } + + // Locking here will ensure we won't return until poll is not running + const std::lock_guard lock(notifier.mutex); + } + + IOController::IOController(std::unique_ptr environment) : Reactor(std::move(environment)) { + + std::array vals = {-1, -1}; + const int i = ::pipe(vals.data()); + if (i < 0) { + throw std::system_error(network_errno, + std::system_category(), + "We were unable to make the notification pipe for IO"); + } + notifier.recv = vals[0]; + notifier.send = vals[1]; + + // Start by rebuilding the list + rebuild_list(); + + on>().then( + "Configure IO Reaction", + [this](const dsl::word::IOConfiguration& config) { + // Lock our mutex to avoid concurrent modification + const std::lock_guard lock(tasks_mutex); + + // NOLINTNEXTLINE(google-runtime-int) + tasks.emplace_back(config.fd, event_t(config.events), config.reaction); + + // Resort our list + std::sort(tasks.begin(), tasks.end()); + + // Let the poll command know that stuff happened + dirty = true; + bump(); + }); + + on>().then("IO Finished", [this](const dsl::word::IOFinished& event) { + // Get the lock so we don't concurrently modify the list + const std::lock_guard lock(tasks_mutex); + + // Find the reaction that finished processing + auto task = std::find_if(tasks.begin(), tasks.end(), [&event](const Task& t) { + return t.reaction->id == event.id; + }); + + if (task != tasks.end()) { + // If the events we were processing included close remove it from the list + if ((task->processing_events & IO::CLOSE) != 0) { + dirty = true; + tasks.erase(task); + } + else { + // Make sure poll isn't currently waiting for an event to happen + bump(); + + // Unmask the events that were just processed + auto it = std::lower_bound(watches.begin(), + watches.end(), + task->fd, + [&](const pollfd& w, const fd_t& fd) { return w.fd < fd; }); + if (it != watches.end() && it->fd == task->fd) { + it->events = event_t(it->events | task->processing_events); + } + + // No longer processing events + task->processing_events = 0; + + // Try to fire again which will check if there are any waiting events + fire_event(*task); + } + } + }); + + on>>().then( + "Unbind IO Reaction", + [this](const dsl::operation::Unbind& unbind) { + // Lock our mutex to avoid concurrent modification + const std::lock_guard lock(tasks_mutex); + + // Find our reaction + auto reaction = std::find_if(tasks.begin(), tasks.end(), [&unbind](const Task& t) { + return t.reaction->id == unbind.id; + }); + + if (reaction != tasks.end()) { + tasks.erase(reaction); + } + + // Let the poll command know that stuff happened + dirty = true; + bump(); + }); + + on().then("Shutdown IO Controller", [this] { + // Set shutdown to true so it won't try to poll again + shutdown.store(true); + bump(); + }); + + on().then("IO Controller", [this] { + // To make sure we don't get caught in a weird loop + // shutdown keeps us out here + if (!shutdown.load()) { + + // Rebuild the list if something changed + if (dirty) { + rebuild_list(); + } + + // Wait for an event to happen on one of our file descriptors + /* mutex scope */ { + const std::lock_guard lock(notifier.mutex); + if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) { + throw std::system_error(network_errno, + std::system_category(), + "There was an IO error while attempting to poll the file descriptors"); + } + } + + // Get the lock so we don't concurrently modify the list + const std::lock_guard lock(tasks_mutex); + for (auto& fd : watches) { + + // Collect the events that happened into the tasks list + // Something happened + if (fd.revents != 0) { + process_event(fd); + } + } + } + }); + } + +} // namespace extension +} // namespace NUClear diff --git a/src/extension/IOController_Windows.hpp b/src/extension/IOController_Windows.hpp deleted file mode 100644 index 2aaf8344..00000000 --- a/src/extension/IOController_Windows.hpp +++ /dev/null @@ -1,335 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2016 NUClear Contributors - * - * This file is part of the NUClear codebase. - * See https://github.com/Fastcode/NUClear for further info. - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated - * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the - * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to - * permit persons to whom the Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the - * Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE - * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR - * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -#ifndef NUCLEAR_EXTENSION_IOCONTROLLER_WINDOWS_HPP -#define NUCLEAR_EXTENSION_IOCONTROLLER_WINDOWS_HPP - -#include "../Reactor.hpp" -#include "../dsl/word/IO.hpp" -#include "../util/platform.hpp" - -namespace NUClear { -namespace extension { - - class IOController : public Reactor { - private: - /// The type that poll uses for events - using event_t = long; // NOLINT(google-runtime-int) - - /** - * A task that is waiting for an IO event. - */ - struct Task { - Task() = default; - Task(const fd_t& fd, event_t listening_events, std::shared_ptr reaction) - : fd(fd), listening_events(listening_events), reaction(std::move(reaction)) {} - - /// The socket we are waiting on - fd_t fd; - /// The events that the task is interested in - event_t listening_events{0}; - /// The events that are waiting to be fired - event_t waiting_events{0}; - /// The events that are currently being processed - event_t processing_events{0}; - /// The reaction that is waiting for this event - std::shared_ptr reaction{nullptr}; - }; - - /** - * Rebuilds the list of file descriptors to poll. - * - * This function is called when the list of file descriptors to poll changes. - * It will rebuild the list of file descriptors used by poll. - */ - void rebuild_list() { - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - - // Clear our fds to be rebuilt - watches.resize(0); - - // Insert our notify fd - watches.push_back(notifier); - - for (const auto& r : tasks) { - watches.push_back(r.first); - } - - // We just cleaned the list! - dirty = false; - } - - /** - * Fires the event for the task if it is ready. - * - * @param task The task to try to fire the event for - */ - void fire_event(Task& task) { - if (task.processing_events == 0 && task.waiting_events != 0) { - - // Make our event to pass through and store it in the local cache - IO::Event e{}; - e.fd = task.fd; - e.events = task.waiting_events; - - // Clear the waiting events, we are now processing them - task.processing_events = task.waiting_events; - task.waiting_events = 0; - - // Submit the task (which should run the get) - IO::ThreadEventStore::value = &e; - std::unique_ptr r = task.reaction->get_task(); - IO::ThreadEventStore::value = nullptr; - - if (r != nullptr) { - powerplant.submit(std::move(r)); - } - else { - // Waiting events are still waiting - task.waiting_events |= task.processing_events; - task.processing_events = 0; - } - } - } - - /** - * Collects the events that have happened and sets them up to fire. - */ - void process_event(const WSAEVENT& event) { - - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - - if (event == notifier) { - // Reset the notifier signal - if (!WSAResetEvent(event)) { - throw std::system_error(WSAGetLastError(), - std::system_category(), - "WSAResetEvent() for notifier failed"); - } - } - else { - // Get our associated Event object, which has the reaction - auto r = tasks.find(event); - - // If it was found... - if (r != tasks.end()) { - // Enum the socket events to work out which ones fired - WSANETWORKEVENTS wsae; - if (WSAEnumNetworkEvents(r->second.fd, event, &wsae) == SOCKET_ERROR) { - throw std::system_error(WSAGetLastError(), - std::system_category(), - "WSAEnumNetworkEvents() failed"); - } - - r->second.waiting_events |= wsae.lNetworkEvents; - fire_event(r->second); - } - // If we can't find the event then our list is dirty - else { - dirty = true; - } - } - } - - /** - * Bumps the notification pipe to wake up the poll command. - * - * If the poll command is waiting it will wait forever if something doesn't happen. - * When trying to update what to poll or shut down we need to wake it up so it can. - */ - // NOLINTNEXTLINE(readability-make-member-function-const) this changes states - void bump() { - if (!WSASetEvent(notifier)) { - throw std::system_error(WSAGetLastError(), - std::system_category(), - "WSASetEvent() for configure io reaction failed"); - } - } - - /** - * Removes a task from the list and closes the event. - * - * @param it The iterator to the task to remove - * - * @return The iterator to the next task - */ - std::map::iterator remove_task(std::map::iterator it) { - // Close the event - WSAEVENT event = it->first; - - // Remove the task - auto new_it = tasks.erase(it); - - // Try to close the WSA event - if (!WSACloseEvent(event)) { - throw std::system_error(WSAGetLastError(), std::system_category(), "WSACloseEvent() failed"); - } - - return new_it; - } - - - public: - explicit IOController(std::unique_ptr environment) : Reactor(std::move(environment)) { - - // Create an event to use for the notifier (used for getting out of WSAWaitForMultipleEvents()) - notifier = WSACreateEvent(); - if (notifier == WSA_INVALID_EVENT) { - throw std::system_error(WSAGetLastError(), - std::system_category(), - "WSACreateEvent() for notifier failed"); - } - - // Start by rebuliding the list - rebuild_list(); - - on>().then( - "Configure IO Reaction", - [this](const dsl::word::IOConfiguration& config) { - // Lock our mutex - std::lock_guard lock(tasks_mutex); - - // Make an event for this SOCKET - auto event = WSACreateEvent(); - if (event == WSA_INVALID_EVENT) { - throw std::system_error(WSAGetLastError(), - std::system_category(), - "WSACreateEvent() for configure io reaction failed"); - } - - // Link the event to signal when there are events on the socket - if (WSAEventSelect(config.fd, event, config.events) == SOCKET_ERROR) { - throw std::system_error(WSAGetLastError(), std::system_category(), "WSAEventSelect() failed"); - } - - // Add all the information to the list and mark the list as dirty, to sync with the list of events - tasks.insert(std::make_pair(event, Task{config.fd, config.events, config.reaction})); - dirty = true; - - bump(); - }); - - on>().then("IO Finished", [this](const dsl::word::IOFinished& event) { - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - - // Find the reaction that finished processing - auto it = std::find_if(tasks.begin(), tasks.end(), [&event](const std::pair& t) { - return t.second.reaction->id == event.id; - }); - - // If we found it then clear the waiting events - if (it != tasks.end()) { - auto& task = it->second; - // If the events we were processing included close remove it from the list - if (task.processing_events & IO::CLOSE) { - dirty = true; - remove_task(it); - } - else { - // We have finished processing events - task.processing_events = 0; - - // Try to fire again which will check if there are any waiting events - fire_event(task); - } - } - }); - - on>>().then( - "Unbind IO Reaction", - [this](const dsl::operation::Unbind& unbind) { - // Lock our mutex to avoid concurrent modification - const std::lock_guard lock(tasks_mutex); - - // Find our reaction - auto it = std::find_if(tasks.begin(), tasks.end(), [&unbind](const std::pair& t) { - return t.second.reaction->id == unbind.id; - }); - - if (it != tasks.end()) { - remove_task(it); - } - - // Let the poll command know that stuff happened - dirty = true; - bump(); - }); - - on().then("Shutdown IO Controller", [this] { - // Set shutdown to true so it won't try to poll again - shutdown.store(true); - bump(); - }); - - on().then("IO Controller", [this] { - // To make sure we don't get caught in a weird loop - // shutdown keeps us out here - if (!shutdown.load()) { - - // Rebuild the list if something changed - if (dirty) { - rebuild_list(); - } - - // Wait for events - auto event_index = WSAWaitForMultipleEvents(static_cast(watches.size()), - watches.data(), - false, - WSA_INFINITE, - false); - - // Check if the return value is an event in our list - if (event_index >= WSA_WAIT_EVENT_0 && event_index < WSA_WAIT_EVENT_0 + watches.size()) { - // Get the signalled event - auto& event = watches[event_index - WSA_WAIT_EVENT_0]; - - // Collect the events that happened into the tasks list - process_event(event); - } - } - }); - } - - private: - /// The event that is used to wake up the WaitForMultipleEvents call - WSAEVENT notifier; - - /// Whether or not we are shutting down - std::atomic shutdown{false}; - /// The mutex that protects the tasks list - std::mutex tasks_mutex; - /// Whether or not the list of file descriptors is dirty compared to tasks - bool dirty = true; - - /// The list of tasks that are currently being processed - std::vector watches; - /// The list of tasks that are waiting for IO events - std::map tasks; - }; - -} // namespace extension -} // namespace NUClear - -#endif // NUCLEAR_EXTENSION_IOCONTROLLER_WINDOWS_HPP diff --git a/src/extension/IOController_Windows.ipp b/src/extension/IOController_Windows.ipp new file mode 100644 index 00000000..b7c124be --- /dev/null +++ b/src/extension/IOController_Windows.ipp @@ -0,0 +1,261 @@ +/* + * MIT License + * + * Copyright (c) 2016 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include "IOController.hpp" + +namespace NUClear { +namespace extension { + + /** + * Removes a task from the list and closes the event + * + * @param it the iterator to the task to remove + * + * @return the iterator to the next task + */ + std::map::iterator remove_task(std::map& tasks, + std::map::iterator it) { + // Close the event + WSAEVENT event = it->first; + + // Remove the task + auto new_it = tasks.erase(it); + + // Try to close the WSA event + if (!WSACloseEvent(event)) { + throw std::system_error(WSAGetLastError(), std::system_category(), "WSACloseEvent() failed"); + } + + return new_it; + } + + void IOController::rebuild_list() { + // Get the lock so we don't concurrently modify the list + const std::lock_guard lock(tasks_mutex); + + // Clear our fds to be rebuilt + watches.resize(0); + + // Insert our notify fd + watches.push_back(notifier); + + for (const auto& r : tasks) { + watches.push_back(r.first); + } + + // We just cleaned the list! + dirty = false; + } + + void IOController::fire_event(Task& task) { + if (task.processing_events == 0 && task.waiting_events != 0) { + + // Make our event to pass through and store it in the local cache + IO::Event e{}; + e.fd = task.fd; + e.events = task.waiting_events; + + // Clear the waiting events, we are now processing them + task.processing_events = task.waiting_events; + task.waiting_events = 0; + + // Submit the task (which should run the get) + IO::ThreadEventStore::value = &e; + std::unique_ptr r = task.reaction->get_task(); + IO::ThreadEventStore::value = nullptr; + + if (r != nullptr) { + powerplant.submit(std::move(r)); + } + else { + // Waiting events are still waiting + task.waiting_events |= task.processing_events; + task.processing_events = 0; + } + } + } + + void IOController::process_event(const WSAEVENT& event) { + + // Get the lock so we don't concurrently modify the list + const std::lock_guard lock(tasks_mutex); + + if (event == notifier) { + // Reset the notifier signal + if (!WSAResetEvent(event)) { + throw std::system_error(WSAGetLastError(), + std::system_category(), + "WSAResetEvent() for notifier failed"); + } + } + else { + // Get our associated Event object, which has the reaction + auto r = tasks.find(event); + + // If it was found... + if (r != tasks.end()) { + // Enum the socket events to work out which ones fired + WSANETWORKEVENTS wsae; + if (WSAEnumNetworkEvents(r->second.fd, event, &wsae) == SOCKET_ERROR) { + throw std::system_error(WSAGetLastError(), std::system_category(), "WSAEnumNetworkEvents() failed"); + } + + r->second.waiting_events |= wsae.lNetworkEvents; + fire_event(r->second); + } + // If we can't find the event then our list is dirty + else { + dirty = true; + } + } + } + + void IOController::bump() { + if (!WSASetEvent(notifier)) { + throw std::system_error(WSAGetLastError(), + std::system_category(), + "WSASetEvent() for configure io reaction failed"); + } + } + + + IOController::IOController(std::unique_ptr environment) : Reactor(std::move(environment)) { + + // Create an event to use for the notifier (used for getting out of WSAWaitForMultipleEvents()) + notifier = WSACreateEvent(); + if (notifier == WSA_INVALID_EVENT) { + throw std::system_error(WSAGetLastError(), std::system_category(), "WSACreateEvent() for notifier failed"); + } + + // Start by rebuilding the list + rebuild_list(); + + on>().then( + "Configure IO Reaction", + [this](const dsl::word::IOConfiguration& config) { + // Lock our mutex + std::lock_guard lock(tasks_mutex); + + // Make an event for this SOCKET + auto event = WSACreateEvent(); + if (event == WSA_INVALID_EVENT) { + throw std::system_error(WSAGetLastError(), + std::system_category(), + "WSACreateEvent() for configure io reaction failed"); + } + + // Link the event to signal when there are events on the socket + if (WSAEventSelect(config.fd, event, config.events) == SOCKET_ERROR) { + throw std::system_error(WSAGetLastError(), std::system_category(), "WSAEventSelect() failed"); + } + + // Add all the information to the list and mark the list as dirty, to sync with the list of events + tasks.insert(std::make_pair(event, Task{config.fd, config.events, config.reaction})); + dirty = true; + + bump(); + }); + + on>().then("IO Finished", [this](const dsl::word::IOFinished& event) { + // Get the lock so we don't concurrently modify the list + const std::lock_guard lock(tasks_mutex); + + // Find the reaction that finished processing + auto it = std::find_if(tasks.begin(), tasks.end(), [&event](const std::pair& t) { + return t.second.reaction->id == event.id; + }); + + // If we found it then clear the waiting events + if (it != tasks.end()) { + auto& task = it->second; + // If the events we were processing included close remove it from the list + if (task.processing_events & IO::CLOSE) { + dirty = true; + remove_task(tasks, it); + } + else { + // We have finished processing events + task.processing_events = 0; + + // Try to fire again which will check if there are any waiting events + fire_event(task); + } + } + }); + + on>>().then( + "Unbind IO Reaction", + [this](const dsl::operation::Unbind& unbind) { + // Lock our mutex to avoid concurrent modification + const std::lock_guard lock(tasks_mutex); + + // Find our reaction + auto it = std::find_if(tasks.begin(), tasks.end(), [&unbind](const std::pair& t) { + return t.second.reaction->id == unbind.id; + }); + + if (it != tasks.end()) { + remove_task(tasks, it); + } + + // Let the poll command know that stuff happened + dirty = true; + bump(); + }); + + on().then("Shutdown IO Controller", [this] { + // Set shutdown to true so it won't try to poll again + shutdown.store(true); + bump(); + }); + + on().then("IO Controller", [this] { + // To make sure we don't get caught in a weird loop + // shutdown keeps us out here + if (!shutdown.load()) { + + // Rebuild the list if something changed + if (dirty) { + rebuild_list(); + } + + // Wait for events + auto event_index = WSAWaitForMultipleEvents(static_cast(watches.size()), + watches.data(), + false, + WSA_INFINITE, + false); + + // Check if the return value is an event in our list + if (event_index >= WSA_WAIT_EVENT_0 && event_index < WSA_WAIT_EVENT_0 + watches.size()) { + // Get the signalled event + auto& event = watches[event_index - WSA_WAIT_EVENT_0]; + + // Collect the events that happened into the tasks list + process_event(event); + } + } + }); + } + +} // namespace extension +} // namespace NUClear diff --git a/src/util/TransientDataElements.hpp b/src/util/TransientDataElements.hpp index 5a183e32..9ab9ed52 100644 --- a/src/util/TransientDataElements.hpp +++ b/src/util/TransientDataElements.hpp @@ -23,6 +23,9 @@ #ifndef NUCLEAR_UTIL_TRANSIENTDATAELEMENTS_HPP #define NUCLEAR_UTIL_TRANSIENTDATAELEMENTS_HPP +#include "../dsl/trait/is_transient.hpp" +#include "Sequence.hpp" + namespace NUClear { namespace util {