Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve some of the statistics to allow for better tracing #140

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/message/LogMessage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ namespace message {
* @param level The logging level of the log
* @param display_level The logging level of the reactor that made this log
* @param message The string contents of the message
* @param task The currently executing task that made this message or nullptr if not in a task
* @param statistics The statistics of the currently executing task or nullptr if not in a task
*/
LogMessage(const LogLevel& level,
const LogLevel& display_level,
std::string message,
std::shared_ptr<const ReactionStatistics> task)
: level(level), display_level(display_level), message(std::move(message)), task(std::move(task)) {}
std::shared_ptr<const ReactionStatistics> statistics)
: level(level)
, display_level(display_level)
, message(std::move(message))
, statistics(std::move(statistics)) {}

/// The logging level of the log
LogLevel level{};
Expand All @@ -59,8 +62,8 @@ namespace message {
/// The string contents of the message
std::string message;

/// The currently executing task that made this message
const std::shared_ptr<const ReactionStatistics> task{nullptr};
/// The statistics of the currently executing task that made this message
const std::shared_ptr<const ReactionStatistics> statistics{nullptr};
};

} // namespace message
Expand Down
4 changes: 2 additions & 2 deletions src/message/ReactionStatistics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ namespace message {
struct ReactionEvent {
enum Event : uint8_t { CREATED, MISSING_DATA, BLOCKED, STARTED, FINISHED };

ReactionEvent(const Event& type, std::shared_ptr<ReactionStatistics> statistics)
ReactionEvent(const Event& type, std::shared_ptr<const ReactionStatistics> statistics)
: type(type), statistics(std::move(statistics)) {}

Event type;
std::shared_ptr<ReactionStatistics> statistics;
std::shared_ptr<const ReactionStatistics> statistics;
};

} // namespace message
Expand Down
10 changes: 7 additions & 3 deletions src/threading/Reaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
namespace NUClear {
namespace threading {

// Initialize our reaction source
std::atomic<NUClear::id_t> Reaction::reaction_id_source(0); // NOLINT

Reaction::Reaction(Reactor& reactor, ReactionIdentifiers&& identifiers, TaskGenerator&& generator)
: reactor(reactor)
, identifiers(std::make_shared<ReactionIdentifiers>(std::move(identifiers)))
Expand Down Expand Up @@ -62,5 +59,12 @@ namespace threading {
bool Reaction::is_enabled() const {
return enabled;
}

NUClear::id_t Reaction::next_id() {
// Start at 1 to make 0 an invalid id
static std::atomic<NUClear::id_t> id_source(1);
return id_source.fetch_add(1, std::memory_order_seq_cst);
}

} // namespace threading
} // namespace NUClear
12 changes: 8 additions & 4 deletions src/threading/Reaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ namespace threading {
std::shared_ptr<const ReactionIdentifiers> identifiers;

/// the unique identifier for this Reaction object
const NUClear::id_t id{reaction_id_source.fetch_add(1, std::memory_order_relaxed)};
const NUClear::id_t id{next_id()};

/// if this is false, we cannot emit ReactionStatistics from any reaction triggered by this one
bool emit_stats{true};
Expand All @@ -119,9 +119,13 @@ namespace threading {
std::vector<std::function<void(Reaction&)>> unbinders;

private:
/// A source for reaction_ids, atomically creates ids
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
static std::atomic<NUClear::id_t> reaction_id_source;
/**
* Generate a new unique reaction id.
*
* @return A new unique reaction id
*/
static NUClear::id_t next_id();

/// The callback generator function (creates databound callbacks)
TaskGenerator generator;
};
Expand Down
31 changes: 28 additions & 3 deletions src/threading/ReactionTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <utility>

#include "../id.hpp"
#include "../message/ReactionStatistics.hpp"
#include "../util/platform.hpp"
#include "Reaction.hpp"

Expand Down Expand Up @@ -61,9 +62,33 @@ namespace threading {
current_task = t;
}

NUClear::id_t ReactionTask::new_task_id() {
static std::atomic<NUClear::id_t> task_id_source(0);
return task_id_source.fetch_add(1, std::memory_order_seq_cst);
NUClear::id_t ReactionTask::next_id() {
// Start at 1 to make 0 an invalid id
static std::atomic<NUClear::id_t> id_source(1);
return id_source.fetch_add(1, std::memory_order_seq_cst);
}

std::shared_ptr<message::ReactionStatistics> ReactionTask::make_stats() {

// Stats are disabled if they are disabled in the parent or in the causing task
if ((parent != nullptr && !parent->emit_stats) || (current_task != nullptr && current_task->stats == nullptr)) {
return nullptr;
}

// Identifiers come from the parent reaction if there is one
auto identifiers = parent != nullptr ? parent->identifiers : nullptr;

const auto* ct = current_task;
const auto cause_reaction_id = ct != nullptr && ct->parent != nullptr ? ct->parent->id : 0;
const auto cause_task_id = current_task != nullptr ? current_task->id : 0;
const auto target_reaction_id = parent != nullptr ? parent->id : 0;
const auto target_task_id = id;

return std::make_shared<message::ReactionStatistics>(identifiers,
IDPair{cause_reaction_id, cause_task_id},
IDPair{target_reaction_id, target_task_id},
pool_descriptor,
group_descriptors);
}

// Initialize our current task
Expand Down
23 changes: 11 additions & 12 deletions src/threading/ReactionTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,11 @@ namespace threading {
const GetThreadPool& thread_pool_fn,
const GetGroups& groups_fn)
: parent(parent)
, id(new_task_id())
, id(next_id())
, priority(priority_fn(*this))
, pool_descriptor(thread_pool_fn(*this))
, group_descriptors(groups_fn(*this))
// Only create a stats object if we wouldn't cause an infinite loop of stats
, stats(
parent != nullptr && parent->emit_stats && (current_task == nullptr || current_task->stats != nullptr)
? std::make_shared<message::ReactionStatistics>(
parent->identifiers,
IDPair{parent->id, id},
current_task != nullptr ? IDPair{current_task->parent->id, current_task->id} : IDPair{0, 0},
pool_descriptor,
group_descriptors)
: nullptr) {
, stats(make_stats()) {
// Increment the number of active tasks
if (parent != nullptr) {
parent->active_tasks.fetch_add(1, std::memory_order_release);
Expand Down Expand Up @@ -122,7 +113,7 @@ namespace threading {
*
* @return A new unique task id
*/
static NUClear::id_t new_task_id();
static NUClear::id_t next_id();

/// The parent Reaction object which spawned this, or nullptr if this is a floating task
std::shared_ptr<Reaction> parent;
Expand Down Expand Up @@ -163,6 +154,14 @@ namespace threading {
friend bool operator<(const ReactionTask& lhs, const ReactionTask& rhs) {
return lhs.priority == rhs.priority ? lhs.id < rhs.id : lhs.priority > rhs.priority;
}

private:
/**
* Creates a new ReactionStatistics object for this task.
*
* @return A new ReactionStatistics object for this task
*/
std::shared_ptr<message::ReactionStatistics> make_stats();
};

} // namespace threading
Expand Down
2 changes: 2 additions & 0 deletions src/threading/scheduler/Pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ namespace threading {

/// A thread local pointer to the current pool this thread is running in
static ATTRIBUTE_TLS Pool* current_pool; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)

friend class Scheduler;
};

} // namespace scheduler
Expand Down
5 changes: 4 additions & 1 deletion src/threading/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ namespace NUClear {
namespace threading {
namespace scheduler {

Scheduler::Scheduler(const int& thread_count) : default_thread_count(thread_count) {}
Scheduler::Scheduler(const int& thread_count) : default_thread_count(thread_count) {
// Create the main thread pool and assign it as our "current pool" so things we do pre startup are assigned
Pool::current_pool = get_pool(dsl::word::MainThread::descriptor()).get();
}

void Scheduler::start() {
// We have to scope this mutex, otherwise the main thread will hold the mutex while it is running
Expand Down
3 changes: 2 additions & 1 deletion tests/tests/log/Log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class TestReactor : public NUClear::Reactor {
// Capture the log messages
on<Trigger<NUClear::message::LogMessage>>().then([](const NUClear::message::LogMessage& log_message) {
if (log_message.level >= log_message.display_level) {
messages.push_back(LogTestOutput{log_message.message, log_message.level, log_message.task != nullptr});
messages.push_back(
LogTestOutput{log_message.message, log_message.level, log_message.statistics != nullptr});
}
});

Expand Down
Loading