Skip to content

Commit

Permalink
Change how ReactionStatistics works to be more useful for monitoring …
Browse files Browse the repository at this point in the history
…events (#125)

Changes ReactionStatistics so it no longer only fires at the very end of
a reaction. It will now fire progress updates as it goes through to
allow monitoring of how reactions are going.

It also better conforms to trace file semantics so that they can be used
to track reactions

Superceeds #99 as it integrates those changes too
  • Loading branch information
TrentHouliston authored Aug 15, 2024
1 parent 0767eab commit 03d3138
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 58 deletions.
3 changes: 3 additions & 0 deletions src/dsl/operation/TypeBind.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace NUClear {

// Forward declarations
namespace message {
struct ReactionEvent;
struct ReactionStatistics;
struct LogMessage;
} // namespace message
Expand All @@ -40,6 +41,8 @@ namespace dsl {
template <typename T>
struct EmitStats : std::true_type {};
template <>
struct EmitStats<message::ReactionEvent> : std::false_type {};
template <>
struct EmitStats<message::ReactionStatistics> : std::false_type {};
template <>
struct EmitStats<message::LogMessage> : std::false_type {};
Expand Down
6 changes: 6 additions & 0 deletions src/id.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ namespace NUClear {
/// This type is used when NUClear requires a unique identifier
using id_t = std::size_t;

/// A reaction and task id pair identify a specific task/reaction combination
struct IDPair {
id_t reaction_id{0};
id_t task_id{0};
};

} // namespace NUClear

#endif // NUCLEAR_UTIL_ID_HPP
105 changes: 71 additions & 34 deletions src/message/ReactionStatistics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@

#include <exception>
#include <string>
#include <thread>
#include <vector>

#include "../clock.hpp"
#include "../id.hpp"
#include "../threading/ReactionIdentifiers.hpp"
#include "../util/GroupDescriptor.hpp"
#include "../util/ThreadPoolDescriptor.hpp"
#include "../util/usage_clock.hpp"

namespace NUClear {
namespace message {
Expand All @@ -39,43 +43,76 @@ namespace message {
*/
struct ReactionStatistics {

struct Event {
struct ThreadInfo {
std::thread::id thread_id;
util::ThreadPoolDescriptor pool{util::ThreadPoolDescriptor::AllPools()};
};

ThreadInfo thread{};
NUClear::clock::time_point nuclear_time;
std::chrono::steady_clock::time_point realtime;
util::cpu_clock::time_point cpu_time;

static Event now() {

// Get the thread pool information for this thread if it's a NUClear thread
const Event::ThreadInfo thread_info;

return Event{
thread_info,
NUClear::clock::now(),
std::chrono::steady_clock::now(),
util::cpu_clock::now(),
};
}
};

ReactionStatistics(std::shared_ptr<const threading::ReactionIdentifiers> identifiers,
const NUClear::id_t& reaction_id,
const NUClear::id_t& task_id,
const NUClear::id_t& cause_reaction_id,
const NUClear::id_t& cause_task_id,
const clock::time_point& emitted,
const clock::time_point& start,
const clock::time_point& finish,
std::exception_ptr exception)
const IDPair& cause,
const IDPair& target,
util::ThreadPoolDescriptor target_threadpool,
util::GroupDescriptor target_group)
: identifiers(std::move(identifiers))
, reaction_id(reaction_id)
, task_id(task_id)
, cause_reaction_id(cause_reaction_id)
, cause_task_id(cause_task_id)
, emitted(emitted)
, started(start)
, finished(finish)
, exception(std::move(exception)) {}

/// A string containing the username/on arguments/and callback name of the reaction
, cause(cause)
, target(target)
, target_threadpool(std::move(target_threadpool))
, target_group(std::move(target_group))
, created(Event::now()) {};


/// The identifiers for the reaction that was executed
std::shared_ptr<const threading::ReactionIdentifiers> identifiers;
/// The id of this reaction
NUClear::id_t reaction_id{0};
/// The task id of this reaction
NUClear::id_t task_id{0};
/// The reaction id of the reaction that caused this one or 0 if there was not one
NUClear::id_t cause_reaction_id{0};
/// The reaction id of the task that caused this task or 0 if there was not one
NUClear::id_t cause_task_id{0};
/// The time that this reaction was emitted to the thread pool
clock::time_point emitted;
/// The time that execution started on this reaction
clock::time_point started;
/// The time that execution finished on this reaction
clock::time_point finished;
/// An exception pointer that can be rethrown (if the reaction threw an exception)
std::exception_ptr exception{nullptr};

/// The reaction/task pair that caused this reaction or 0s if it was a non NUClear cause
IDPair cause;
/// The reaction/task pair that was executed
IDPair target;

/// The thread pool that this reaction was intended to run on
util::ThreadPoolDescriptor target_threadpool;
/// The group that this reaction was intended to run in
util::GroupDescriptor target_group;

/// The time and thread information for when this reaction was created
Event created;
/// The time and thread information for when this reaction was started
Event started;
/// The time and thread information for when this reaction was finished
Event finished;

/// The exception that was thrown by this reaction or nullptr if no exception was thrown
std::exception_ptr exception;
};

struct ReactionEvent {
enum Event : uint8_t { CREATED, MISSING_DATA, BLOCKED, STARTED, FINISHED };

ReactionEvent(const Event& type, std::shared_ptr<ReactionStatistics> statistics)
: type(type), statistics(std::move(statistics)) {}

Event type;
std::shared_ptr<ReactionStatistics> statistics;
};

} // namespace message
Expand Down
22 changes: 9 additions & 13 deletions src/threading/ReactionTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,15 @@ namespace threading {
, pool_descriptor(thread_pool_fn(*this))
, group_descriptor(group_fn(*this))
// Only create a stats object if we wouldn't cause an infinite loop of stats
, stats(parent != nullptr && parent->emit_stats
&& (current_task == nullptr || current_task->stats != nullptr)
? std::make_shared<message::ReactionStatistics>(
parent->identifiers,
parent->id,
id,
current_task != nullptr ? current_task->parent->id : 0,
current_task != nullptr ? current_task->id : 0,
clock::now(),
clock::time_point(std::chrono::seconds(0)),
clock::time_point(std::chrono::seconds(0)),
nullptr)
: nullptr) {
, stats(
parent != nullptr && parent->emit_stats && (current_task == nullptr || current_task->stats != nullptr)
? std::make_shared<message::ReactionStatistics>(
parent->identifiers,
IDPair{parent->id, id},
current_task != nullptr ? IDPair{current_task->parent->id, current_task->id} : IDPair{0, 0},
pool_descriptor,
group_descriptor)
: nullptr) {
// Increment the number of active tasks
if (parent != nullptr) {
parent->active_tasks.fetch_add(1, std::memory_order_release);
Expand Down
36 changes: 28 additions & 8 deletions src/util/CallbackGenerator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ namespace util {

// Check if we should even run
if (!DSL::precondition(*task)) {

// Set the created status as rejected and emit it
if (task->stats != nullptr) {
PowerPlant::powerplant->emit<dsl::word::emit::Direct>(
std::make_unique<message::ReactionEvent>(message::ReactionEvent::BLOCKED, task->stats));
}

// Nothing to run
return nullptr;
}

Expand All @@ -87,18 +95,33 @@ namespace util {

// Check if our data is good (all the data exists) otherwise terminate the call
if (!check_data(data)) {

// Set the created status as no data and emit it
if (task->stats != nullptr) {
PowerPlant::powerplant->emit<dsl::word::emit::Direct>(
std::make_unique<message::ReactionEvent>(message::ReactionEvent::MISSING_DATA, task->stats));
}

// Nothing to run
return nullptr;
}

// Set the created status as no data and emit it
if (task->stats != nullptr) {
PowerPlant::powerplant->emit<dsl::word::emit::Direct>(
std::make_unique<message::ReactionEvent>(message::ReactionEvent::CREATED, task->stats));
}

// We have to make a copy of the callback because the "this" variable can go out of scope
auto c = callback;
task->callback = [c, data](threading::ReactionTask& task) noexcept {
// Update our thread's priority to the correct level
update_current_thread_priority(task.priority);

// Record our start time
if (task.stats != nullptr) {
task.stats->started = clock::now();
task.stats->started = message::ReactionStatistics::Event::now();
PowerPlant::powerplant->emit<dsl::word::emit::Direct>(
std::make_unique<message::ReactionEvent>(message::ReactionEvent::STARTED, task.stats));
}

// We have to catch any exceptions
Expand All @@ -113,17 +136,14 @@ namespace util {
}
}

// Our finish time
if (task.stats != nullptr) {
task.stats->finished = clock::now();
}

// Run our postconditions
DSL::postcondition(task);

// Emit our reaction statistics if it wouldn't cause a loop
if (task.stats != nullptr) {
PowerPlant::powerplant->emit_shared<dsl::word::emit::Direct>(task.stats);
task.stats->finished = message::ReactionStatistics::Event::now();
PowerPlant::powerplant->emit<dsl::word::emit::Direct>(
std::make_unique<message::ReactionEvent>(message::ReactionEvent::FINISHED, task.stats));
}
};

Expand Down
44 changes: 44 additions & 0 deletions src/util/usage_clock.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include "usage_clock.hpp"

#include <chrono>

// Windows
#if defined(_WIN32)
#include "platform.hpp"

namespace NUClear {
namespace util {

cpu_clock::time_point cpu_clock::now() noexcept {
FILETIME creation_time;
FILETIME exit_time;
FILETIME kernel_time;
FILETIME user_time;
if (GetThreadTimes(GetCurrentThread(), &creation_time, &exit_time, &kernel_time, &user_time) != -1) {
// Time in in 100 nanosecond intervals
uint64_t time = ((uint64_t(user_time.dwHighDateTime) << 32) | user_time.dwLowDateTime)
+ ((uint64_t(kernel_time.dwHighDateTime) << 32) | kernel_time.dwLowDateTime);
return time_point(std::chrono::duration<uint64_t, std::ratio<1LL, 10000000LL>>(time));
}
return time_point();
}

} // namespace util
} // namespace NUClear

#else
#include <ctime>

namespace NUClear {
namespace util {

cpu_clock::time_point cpu_clock::now() noexcept {
::timespec ts{};
::clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
return time_point(std::chrono::seconds(ts.tv_sec) + std::chrono::nanoseconds(ts.tv_nsec));
}

} // namespace util
} // namespace NUClear

#endif // _WIN32
30 changes: 30 additions & 0 deletions src/util/usage_clock.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef NUCLEAR_UTIL_USAGE_CLOCK_HPP
#define NUCLEAR_UTIL_USAGE_CLOCK_HPP

#include <chrono>

namespace NUClear {
namespace util {

/**
* A clock that measures CPU time.
*/
struct cpu_clock {
using duration = std::chrono::nanoseconds; ///< The duration type of the clock.
using rep = duration::rep; ///< The representation type of the duration.
using period = duration::period; ///< The tick period of the clock.
using time_point = std::chrono::time_point<cpu_clock>; ///< The time point type of the clock.
static const bool is_steady = true; ///< Indicates if the clock is steady.

/**
* Get the current time point of the cpu clock for the current thread
*
* @return The current time point.
*/
static time_point now() noexcept;
};

} // namespace util
} // namespace NUClear

#endif // NUCLEAR_UTIL_USAGE_CLOCK_HPP
14 changes: 11 additions & 3 deletions tests/tests/api/ReactionStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,34 @@ template <int id>
struct Message {};
struct LoopMessage {};

using NUClear::message::ReactionStatistics;
using NUClear::message::ReactionEvent;

class TestReactor : public test_util::TestBase<TestReactor> {
public:
TestReactor(std::unique_ptr<NUClear::Environment> environment) : TestBase(std::move(environment)) {

// This reaction is here to emit something from a ReactionStatistics trigger
// This shouldn't cause reaction statistics of their own otherwise everything would explode
on<Trigger<ReactionStatistics>>().then("Loop Statistics", [this](const ReactionStatistics&) {
on<Trigger<ReactionEvent>>().then("Loop Statistics", [this](const ReactionEvent&) { //
emit(std::make_unique<LoopMessage>());
});
on<Trigger<LoopMessage>>().then("No Statistics", [] {});


on<Trigger<ReactionStatistics>>().then("Reaction Stats Handler", [this](const ReactionStatistics& stats) {
on<Trigger<ReactionEvent>>().then("Reaction Stats Handler", [this](const ReactionEvent& event) {
const auto& stats = *event.statistics;

// Other reactions statistics run on this because of built in NUClear reactors (e.g. chrono controller etc)
// We want to filter those out so only our own stats are shown
if (stats.identifiers->name.empty() || stats.identifiers->reactor != reactor_name) {
return;
}

// Skip stats until the finished event
if (event.type != ReactionEvent::FINISHED) {
return;
}

events.push_back("Stats for " + stats.identifiers->name + " from " + stats.identifiers->reactor);
events.push_back(stats.identifiers->dsl);

Expand Down
Loading

0 comments on commit 03d3138

Please sign in to comment.