diff --git a/src/message/LogMessage.hpp b/src/message/LogMessage.hpp index 1ac21e20..1d8fcc64 100644 --- a/src/message/LogMessage.hpp +++ b/src/message/LogMessage.hpp @@ -42,13 +42,16 @@ namespace message { * @param level The logging level of the log * @param display_level The logging level of the reactor that made this log * @param message The string contents of the message - * @param task The currently executing task that made this message or nullptr if not in a task + * @param statistics The statistics of the currently executing task or nullptr if not in a task */ LogMessage(const LogLevel& level, const LogLevel& display_level, std::string message, - std::shared_ptr task) - : level(level), display_level(display_level), message(std::move(message)), task(std::move(task)) {} + std::shared_ptr statistics) + : level(level) + , display_level(display_level) + , message(std::move(message)) + , statistics(std::move(statistics)) {} /// The logging level of the log LogLevel level{}; @@ -59,8 +62,8 @@ namespace message { /// The string contents of the message std::string message; - /// The currently executing task that made this message - const std::shared_ptr task{nullptr}; + /// The statistics of the currently executing task that made this message + const std::shared_ptr statistics{nullptr}; }; } // namespace message diff --git a/src/message/ReactionStatistics.hpp b/src/message/ReactionStatistics.hpp index 109c7b1e..e8fcbe2b 100644 --- a/src/message/ReactionStatistics.hpp +++ b/src/message/ReactionStatistics.hpp @@ -109,11 +109,11 @@ namespace message { struct ReactionEvent { enum Event : uint8_t { CREATED, MISSING_DATA, BLOCKED, STARTED, FINISHED }; - ReactionEvent(const Event& type, std::shared_ptr statistics) + ReactionEvent(const Event& type, std::shared_ptr statistics) : type(type), statistics(std::move(statistics)) {} Event type; - std::shared_ptr statistics; + std::shared_ptr statistics; }; } // namespace message diff --git a/src/threading/Reaction.cpp b/src/threading/Reaction.cpp index 2b39d6d2..778ac0e2 100644 --- a/src/threading/Reaction.cpp +++ b/src/threading/Reaction.cpp @@ -32,9 +32,6 @@ namespace NUClear { namespace threading { - // Initialize our reaction source - std::atomic Reaction::reaction_id_source(0); // NOLINT - Reaction::Reaction(Reactor& reactor, ReactionIdentifiers&& identifiers, TaskGenerator&& generator) : reactor(reactor) , identifiers(std::make_shared(std::move(identifiers))) @@ -62,5 +59,12 @@ namespace threading { bool Reaction::is_enabled() const { return enabled; } + + NUClear::id_t Reaction::next_id() { + // Start at 1 to make 0 an invalid id + static std::atomic id_source(1); + return id_source.fetch_add(1, std::memory_order_seq_cst); + } + } // namespace threading } // namespace NUClear diff --git a/src/threading/Reaction.hpp b/src/threading/Reaction.hpp index 880bc206..1e5b7a84 100644 --- a/src/threading/Reaction.hpp +++ b/src/threading/Reaction.hpp @@ -104,7 +104,7 @@ namespace threading { std::shared_ptr identifiers; /// the unique identifier for this Reaction object - const NUClear::id_t id{reaction_id_source.fetch_add(1, std::memory_order_relaxed)}; + const NUClear::id_t id{next_id()}; /// if this is false, we cannot emit ReactionStatistics from any reaction triggered by this one bool emit_stats{true}; @@ -119,9 +119,13 @@ namespace threading { std::vector> unbinders; private: - /// A source for reaction_ids, atomically creates ids - // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) - static std::atomic reaction_id_source; + /** + * Generate a new unique reaction id. + * + * @return A new unique reaction id + */ + static NUClear::id_t next_id(); + /// The callback generator function (creates databound callbacks) TaskGenerator generator; }; diff --git a/src/threading/ReactionTask.cpp b/src/threading/ReactionTask.cpp index d3f4c12a..2715f697 100644 --- a/src/threading/ReactionTask.cpp +++ b/src/threading/ReactionTask.cpp @@ -27,6 +27,7 @@ #include #include "../id.hpp" +#include "../message/ReactionStatistics.hpp" #include "../util/platform.hpp" #include "Reaction.hpp" @@ -61,9 +62,33 @@ namespace threading { current_task = t; } - NUClear::id_t ReactionTask::new_task_id() { - static std::atomic task_id_source(0); - return task_id_source.fetch_add(1, std::memory_order_seq_cst); + NUClear::id_t ReactionTask::next_id() { + // Start at 1 to make 0 an invalid id + static std::atomic id_source(1); + return id_source.fetch_add(1, std::memory_order_seq_cst); + } + + std::shared_ptr ReactionTask::make_stats() { + + // Stats are disabled if they are disabled in the parent or in the causing task + if ((parent != nullptr && !parent->emit_stats) || (current_task != nullptr && current_task->stats == nullptr)) { + return nullptr; + } + + // Identifiers come from the parent reaction if there is one + auto identifiers = parent != nullptr ? parent->identifiers : nullptr; + + const auto* ct = current_task; + const auto cause_reaction_id = ct != nullptr && ct->parent != nullptr ? ct->parent->id : 0; + const auto cause_task_id = current_task != nullptr ? current_task->id : 0; + const auto target_reaction_id = parent != nullptr ? parent->id : 0; + const auto target_task_id = id; + + return std::make_shared(identifiers, + IDPair{cause_reaction_id, cause_task_id}, + IDPair{target_reaction_id, target_task_id}, + pool_descriptor, + group_descriptors); } // Initialize our current task diff --git a/src/threading/ReactionTask.hpp b/src/threading/ReactionTask.hpp index ce43d9e1..c451f12e 100644 --- a/src/threading/ReactionTask.hpp +++ b/src/threading/ReactionTask.hpp @@ -79,20 +79,11 @@ namespace threading { const GetThreadPool& thread_pool_fn, const GetGroups& groups_fn) : parent(parent) - , id(new_task_id()) + , id(next_id()) , priority(priority_fn(*this)) , pool_descriptor(thread_pool_fn(*this)) , group_descriptors(groups_fn(*this)) - // Only create a stats object if we wouldn't cause an infinite loop of stats - , stats( - parent != nullptr && parent->emit_stats && (current_task == nullptr || current_task->stats != nullptr) - ? std::make_shared( - parent->identifiers, - IDPair{parent->id, id}, - current_task != nullptr ? IDPair{current_task->parent->id, current_task->id} : IDPair{0, 0}, - pool_descriptor, - group_descriptors) - : nullptr) { + , stats(make_stats()) { // Increment the number of active tasks if (parent != nullptr) { parent->active_tasks.fetch_add(1, std::memory_order_release); @@ -122,7 +113,7 @@ namespace threading { * * @return A new unique task id */ - static NUClear::id_t new_task_id(); + static NUClear::id_t next_id(); /// The parent Reaction object which spawned this, or nullptr if this is a floating task std::shared_ptr parent; @@ -163,6 +154,14 @@ namespace threading { friend bool operator<(const ReactionTask& lhs, const ReactionTask& rhs) { return lhs.priority == rhs.priority ? lhs.id < rhs.id : lhs.priority > rhs.priority; } + + private: + /** + * Creates a new ReactionStatistics object for this task. + * + * @return A new ReactionStatistics object for this task + */ + std::shared_ptr make_stats(); }; } // namespace threading diff --git a/src/threading/scheduler/Pool.hpp b/src/threading/scheduler/Pool.hpp index 78d27898..a5b8e823 100644 --- a/src/threading/scheduler/Pool.hpp +++ b/src/threading/scheduler/Pool.hpp @@ -232,6 +232,8 @@ namespace threading { /// A thread local pointer to the current pool this thread is running in static ATTRIBUTE_TLS Pool* current_pool; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) + + friend class Scheduler; }; } // namespace scheduler diff --git a/src/threading/scheduler/Scheduler.cpp b/src/threading/scheduler/Scheduler.cpp index a39959f7..b90f1311 100644 --- a/src/threading/scheduler/Scheduler.cpp +++ b/src/threading/scheduler/Scheduler.cpp @@ -31,7 +31,10 @@ namespace NUClear { namespace threading { namespace scheduler { - Scheduler::Scheduler(const int& thread_count) : default_thread_count(thread_count) {} + Scheduler::Scheduler(const int& thread_count) : default_thread_count(thread_count) { + // Create the main thread pool and assign it as our "current pool" so things we do pre startup are assigned + Pool::current_pool = get_pool(dsl::word::MainThread::descriptor()).get(); + } void Scheduler::start() { // We have to scope this mutex, otherwise the main thread will hold the mutex while it is running diff --git a/tests/tests/log/Log.cpp b/tests/tests/log/Log.cpp index a0097c69..4bbcb3a7 100644 --- a/tests/tests/log/Log.cpp +++ b/tests/tests/log/Log.cpp @@ -57,7 +57,8 @@ class TestReactor : public NUClear::Reactor { // Capture the log messages on>().then([](const NUClear::message::LogMessage& log_message) { if (log_message.level >= log_message.display_level) { - messages.push_back(LogTestOutput{log_message.message, log_message.level, log_message.task != nullptr}); + messages.push_back( + LogTestOutput{log_message.message, log_message.level, log_message.statistics != nullptr}); } });