Skip to content

Commit

Permalink
Make shutdowns more reliable by making poll have its own running flag (
Browse files Browse the repository at this point in the history
…#126)

Currently shutdowns aren't always reliable as when the Shutdown event is
emitted, the scheduler itself hasn't been told to stop. Therefore the
on<Always> that the IO Controller runs in may decide to start up again
before the scheduler stops accepting tasks.

It will then sit on the poll forever
  • Loading branch information
TrentHouliston authored Aug 15, 2024
1 parent 03d3138 commit aa44740
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 68 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/macos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ jobs:
- name: Test
timeout-minutes: 3
working-directory: build/tests
run: ctest -j$(nproc) --output-on-failure
run: ctest --output-on-failure -j$(nproc)
2 changes: 1 addition & 1 deletion .github/workflows/windows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,4 @@ jobs:
- name: Test
timeout-minutes: 3
working-directory: build/tests
run: ctest -j$(nproc) --output-on-failure
run: ctest --output-on-failure -j$(nproc)
11 changes: 0 additions & 11 deletions src/PowerPlant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ PowerPlant::~PowerPlant() {

void PowerPlant::start() {

// We are now running
is_running.store(true, std::memory_order_release);

// Direct emit startup event and command line arguments
emit<dsl::word::emit::Direct>(std::make_unique<dsl::word::Startup>());
emit_shared<dsl::word::emit::Direct>(dsl::store::DataStore<message::CommandLineArguments>::get());
Expand Down Expand Up @@ -148,19 +145,11 @@ void PowerPlant::log(const LogLevel& level, std::stringstream& message) {

void PowerPlant::shutdown() {

// Stop running before we emit the Shutdown event
// Some things such as on<Always> depend on this flag and it's possible to miss it
is_running.store(false, std::memory_order_release);

// Emit our shutdown event
emit(std::make_unique<dsl::word::Shutdown>());

// Shutdown the scheduler
scheduler.shutdown();
}

bool PowerPlant::running() const {
return is_running.load(std::memory_order_acquire);
}

} // namespace NUClear
9 changes: 0 additions & 9 deletions src/PowerPlant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,6 @@ class PowerPlant {
*/
void shutdown();

/**
* Gets the current running state of the PowerPlant.
*
* @return `true` if the PowerPlant is running, `false` if it is shut down, or is in the process of shutting down.
*/
bool running() const;

/**
* Installs a reactor of a particular type to the system.
*
Expand Down Expand Up @@ -352,8 +345,6 @@ class PowerPlant {
threading::TaskScheduler scheduler;
/// Our vector of Reactors, will get destructed when this vector is
std::vector<std::unique_ptr<NUClear::Reactor>> reactors;
/// True if the powerplant is running
std::atomic<bool> is_running{false};
};

/**
Expand Down
7 changes: 4 additions & 3 deletions src/dsl/word/UDP.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,9 @@ namespace dsl {
// Get our file descriptor from the magic cache
auto event = IO::get<DSL>(task);

// If our get is being run without an fd (something else triggered) then short circuit
if (!event) {
// If our get is being run without an fd (something else triggered)
// Or if the event is not a read event then short circuit
if (!event || (event.events & IO::READ) != IO::READ) {
return {};
}

Expand All @@ -366,7 +367,7 @@ namespace dsl {
mh.msg_iovlen = 1;

// Receive our message
ssize_t received = recvmsg(event.fd, &mh, 0);
ssize_t received = recvmsg(event.fd, &mh, MSG_DONTWAIT);
if (received < 0) {
return {};
}
Expand Down
2 changes: 2 additions & 0 deletions src/extension/IOController.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ namespace extension {
private:
/// The event that is used to wake up the WaitForMultipleEvents call
notifier_t notifier;
/// If the IOController should continue running
std::atomic<bool> running{true};

/// The mutex that protects the tasks list
std::mutex tasks_mutex;
Expand Down
45 changes: 24 additions & 21 deletions src/extension/IOController_Posix.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace extension {
watches.resize(0);

// Insert our notify fd
watches.push_back(pollfd{notifier.recv, POLLIN, 0});
watches.push_back(pollfd{notifier.recv, POLLIN | POLLERR | POLLNVAL, 0});

for (const auto& r : tasks) {
// If we are the same fd, then add our interest set and mask out events that are already being processed
Expand Down Expand Up @@ -249,34 +249,37 @@ namespace extension {
});

on<Shutdown>().then("Shutdown IO Controller", [this] {
// Ensure the poll command is not waiting
running.store(false, std::memory_order_release);
bump();
});

on<Always>().then("IO Controller", [this] {
// Rebuild the list if something changed
if (dirty) {
rebuild_list();
}
// Stay in this reaction to improve the performance without going back/fourth between reactions
if (running.load(std::memory_order_acquire)) {
// Rebuild the list if something changed
if (dirty) {
rebuild_list();
}

// Wait for an event to happen on one of our file descriptors
/* mutex scope */ {
const std::lock_guard<std::mutex> lock(notifier.mutex);
if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) {
throw std::system_error(network_errno,
std::system_category(),
"There was an IO error while attempting to poll the file descriptors");
// Wait for an event to happen on one of our file descriptors
/* mutex scope */ {
const std::lock_guard<std::mutex> lock(notifier.mutex);
if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) {
throw std::system_error(network_errno,
std::system_category(),
"There was an IO error while attempting to poll the file descriptors");
}
}
}

// Get the lock so we don't concurrently modify the list
const std::lock_guard<std::mutex> lock(tasks_mutex);
for (auto& fd : watches) {
// Get the lock so we don't concurrently modify the list
const std::lock_guard<std::mutex> lock(tasks_mutex);
for (auto& fd : watches) {

// Collect the events that happened into the tasks list
// Something happened
if (fd.revents != 0) {
process_event(fd);
// Collect the events that happened into the tasks list
// Something happened
if (fd.revents != 0) {
process_event(fd);
}
}
}
});
Expand Down
47 changes: 26 additions & 21 deletions src/extension/IOController_Windows.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -224,32 +224,37 @@ namespace extension {
bump();
});

on<Shutdown>().then("Shutdown IO Controller", [this] { bump(); });
on<Shutdown>().then("Shutdown IO Controller", [this] {
running.store(false, std::memory_order_release);
bump();
});

on<Always>().then("IO Controller", [this] {
// Rebuild the list if something changed
if (dirty) {
rebuild_list();
}
while (running.load(std::memory_order_acquire)) {
// Rebuild the list if something changed
if (dirty) {
rebuild_list();
}

// Wait for events
DWORD event_index = 0;
/*mutex scope*/ {
const std::lock_guard<std::mutex> lock(notifier.mutex);
event_index = WSAWaitForMultipleEvents(static_cast<DWORD>(watches.size()),
watches.data(),
false,
WSA_INFINITE,
false);
}
// Wait for events
DWORD event_index = 0;
/*mutex scope*/ {
const std::lock_guard<std::mutex> lock(notifier.mutex);
event_index = WSAWaitForMultipleEvents(static_cast<DWORD>(watches.size()),
watches.data(),
false,
WSA_INFINITE,
false);
}

// Check if the return value is an event in our list
if (event_index >= WSA_WAIT_EVENT_0 && event_index < WSA_WAIT_EVENT_0 + watches.size()) {
// Get the signalled event
auto& event = watches[event_index - WSA_WAIT_EVENT_0];
// Check if the return value is an event in our list
if (event_index >= WSA_WAIT_EVENT_0 && event_index < WSA_WAIT_EVENT_0 + watches.size()) {
// Get the signalled event
auto& event = watches[event_index - WSA_WAIT_EVENT_0];

// Collect the events that happened into the tasks list
process_event(event);
// Collect the events that happened into the tasks list
process_event(event);
}
}
});
}
Expand Down
3 changes: 3 additions & 0 deletions src/util/platform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ using socklen_t = int;
#define msg_controllen Control.len
#define msg_flags flags

// Windows doesn't have this flag, maybe we can implement it later for recvmsg
#define MSG_DONTWAIT 0

// Reimplement the recvmsg function
int recvmsg(fd_t fd, msghdr* msg, int flags);

Expand Down
1 change: 0 additions & 1 deletion tests/tests/dsl/UDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ TEST_CASE("Testing sending and receiving of UDP messages", "[api][network][udp]"
NUClear::Configuration config;
config.thread_count = 1;
NUClear::PowerPlant plant(config);
plant.install<NUClear::extension::IOController>();
plant.install<TestReactor>();
plant.start();

Expand Down

0 comments on commit aa44740

Please sign in to comment.