Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PROF-11003] Fix allocation profiling + otel tracing causing Ruby crash #4240

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/profiler_gc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def create_profiler

# We take a dummy sample so that the context for the main thread is created, as otherwise the GC profiling methods do
# not create it (because we don't want to do memory allocations in the middle of GC)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, Thread.current)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, Thread.current, false)
end

def run_benchmark
Expand All @@ -29,7 +29,7 @@ def run_benchmark
x.report('profiler gc') do
Datadog::Profiling::Collectors::ThreadContext::Testing._native_on_gc_start(@collector)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_on_gc_finish(@collector)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample_after_gc(@collector, false)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample_after_gc(@collector, false, false)
end

x.save! "#{File.basename(__FILE__)}-results.json" unless VALIDATE_BENCHMARK_MODE
Expand All @@ -52,7 +52,7 @@ def run_benchmark
estimated_gc_per_minute.times do
Datadog::Profiling::Collectors::ThreadContext::Testing._native_on_gc_start(@collector)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_on_gc_finish(@collector)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample_after_gc(@collector, false)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample_after_gc(@collector, false, false)
end

@recorder.serialize
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/profiler_sample_gvl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def initialize
@target_thread = thread_with_very_deep_stack

# Sample once to trigger thread context creation for all threads (including @target_thread)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, PROFILER_OVERHEAD_STACK_THREAD)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, PROFILER_OVERHEAD_STACK_THREAD, false)
end

def create_profiler
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/profiler_sample_loop_v2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def run_benchmark
)

x.report("stack collector #{ENV['CONFIG']}") do
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, PROFILER_OVERHEAD_STACK_THREAD)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, PROFILER_OVERHEAD_STACK_THREAD, false)
end

x.save! "#{File.basename(__FILE__)}-results.json" unless VALIDATE_BENCHMARK_MODE
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/profiler_sample_serialize.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def run_benchmark
simulate_seconds = 60

(samples_per_second * simulate_seconds).times do
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, PROFILER_OVERHEAD_STACK_THREAD)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, PROFILER_OVERHEAD_STACK_THREAD, false)
end

@recorder.serialize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,16 @@ static void on_newobj_event(DDTRACE_UNUSED VALUE unused1, DDTRACE_UNUSED void *u
return;
}

// If Ruby is in the middle of raising an exception, we don't want to try to sample. This is because if we accidentally
// trigger an exception inside the profiler code, bad things will happen (specifically, Ruby will try to kill off the
// thread even though we may try to catch the exception).
//
// Note that "in the middle of raising an exception" means the exception itself has already been allocated.
// What's getting allocated now is probably the backtrace objects (@ivoanjo or at least that's what I've observed)
if (is_raised_flag_set(rb_thread_current())) {
return;
}

// Hot path: Dynamic sampling rate is usually enabled and the sampling decision is usually false
if (RB_LIKELY(state->dynamic_sampling_rate_enabled && !discrete_dynamic_sampler_should_sample(&state->allocation_sampler))) {
state->stats.allocation_skipped++;
Expand Down
119 changes: 108 additions & 11 deletions ext/datadog_profiling_native_extension/collectors_thread_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "private_vm_api_access.h"
#include "stack_recorder.h"
#include "time_helpers.h"
#include "unsafe_api_calls_check.h"

// Used to trigger sampling of threads, based on external "events", such as:
// * periodic timer for cpu-time and wall-time
Expand Down Expand Up @@ -203,10 +204,10 @@ static int hash_map_per_thread_context_mark(st_data_t key_thread, st_data_t _val
static int hash_map_per_thread_context_free_values(st_data_t _thread, st_data_t value_per_thread_context, st_data_t _argument);
static VALUE _native_new(VALUE klass);
static VALUE _native_initialize(int argc, VALUE *argv, DDTRACE_UNUSED VALUE _self);
static VALUE _native_sample(VALUE self, VALUE collector_instance, VALUE profiler_overhead_stack_thread);
static VALUE _native_sample(VALUE self, VALUE collector_instance, VALUE profiler_overhead_stack_thread, VALUE allow_exception);
static VALUE _native_on_gc_start(VALUE self, VALUE collector_instance);
static VALUE _native_on_gc_finish(VALUE self, VALUE collector_instance);
static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE reset_monotonic_to_system_state);
static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE reset_monotonic_to_system_state, VALUE allow_exception);
static void update_metrics_and_sample(
struct thread_context_collector_state *state,
VALUE thread_being_sampled,
Expand Down Expand Up @@ -290,6 +291,7 @@ static void otel_without_ddtrace_trace_identifiers_for(
);
static struct otel_span otel_span_from(VALUE otel_context, VALUE otel_current_span_key);
static uint64_t otel_span_id_to_uint(VALUE otel_span_id);
static VALUE safely_lookup_hash_without_going_into_ruby_code(VALUE hash, VALUE key);

void collectors_thread_context_init(VALUE profiling_module) {
VALUE collectors_module = rb_define_module_under(profiling_module, "Collectors");
Expand All @@ -310,11 +312,11 @@ void collectors_thread_context_init(VALUE profiling_module) {
rb_define_singleton_method(collectors_thread_context_class, "_native_initialize", _native_initialize, -1);
rb_define_singleton_method(collectors_thread_context_class, "_native_inspect", _native_inspect, 1);
rb_define_singleton_method(collectors_thread_context_class, "_native_reset_after_fork", _native_reset_after_fork, 1);
rb_define_singleton_method(testing_module, "_native_sample", _native_sample, 2);
rb_define_singleton_method(testing_module, "_native_sample", _native_sample, 3);
rb_define_singleton_method(testing_module, "_native_sample_allocation", _native_sample_allocation, 3);
rb_define_singleton_method(testing_module, "_native_on_gc_start", _native_on_gc_start, 1);
rb_define_singleton_method(testing_module, "_native_on_gc_finish", _native_on_gc_finish, 1);
rb_define_singleton_method(testing_module, "_native_sample_after_gc", _native_sample_after_gc, 2);
rb_define_singleton_method(testing_module, "_native_sample_after_gc", _native_sample_after_gc, 3);
rb_define_singleton_method(testing_module, "_native_thread_list", _native_thread_list, 0);
rb_define_singleton_method(testing_module, "_native_per_thread_context", _native_per_thread_context, 1);
rb_define_singleton_method(testing_module, "_native_stats", _native_stats, 1);
Expand Down Expand Up @@ -504,31 +506,49 @@ static VALUE _native_initialize(int argc, VALUE *argv, DDTRACE_UNUSED VALUE _sel

// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_sample(DDTRACE_UNUSED VALUE _self, VALUE collector_instance, VALUE profiler_overhead_stack_thread) {
static VALUE _native_sample(DDTRACE_UNUSED VALUE _self, VALUE collector_instance, VALUE profiler_overhead_stack_thread, VALUE allow_exception) {
ENFORCE_BOOLEAN(allow_exception);

if (!is_thread_alive(profiler_overhead_stack_thread)) rb_raise(rb_eArgError, "Unexpected: profiler_overhead_stack_thread is not alive");

if (allow_exception == Qfalse) debug_enter_unsafe_context();

thread_context_collector_sample(collector_instance, monotonic_wall_time_now_ns(RAISE_ON_FAILURE), profiler_overhead_stack_thread);

if (allow_exception == Qfalse) debug_leave_unsafe_context();

return Qtrue;
}

// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_on_gc_start(DDTRACE_UNUSED VALUE self, VALUE collector_instance) {
debug_enter_unsafe_context();

thread_context_collector_on_gc_start(collector_instance);

debug_leave_unsafe_context();

return Qtrue;
}

// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_on_gc_finish(DDTRACE_UNUSED VALUE self, VALUE collector_instance) {
debug_enter_unsafe_context();

(void) !thread_context_collector_on_gc_finish(collector_instance);

debug_leave_unsafe_context();

return Qtrue;
}

// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE reset_monotonic_to_system_state) {
static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE reset_monotonic_to_system_state, VALUE allow_exception) {
ENFORCE_BOOLEAN(reset_monotonic_to_system_state);
ENFORCE_BOOLEAN(allow_exception);

struct thread_context_collector_state *state;
TypedData_Get_Struct(collector_instance, struct thread_context_collector_state, &thread_context_collector_typed_data, state);
Expand All @@ -537,7 +557,12 @@ static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_
state->time_converter_state = (monotonic_to_system_epoch_state) MONOTONIC_TO_SYSTEM_EPOCH_INITIALIZER;
}

if (allow_exception == Qfalse) debug_enter_unsafe_context();

thread_context_collector_sample_after_gc(collector_instance);

if (allow_exception == Qfalse) debug_leave_unsafe_context();

return Qtrue;
}

Expand Down Expand Up @@ -982,7 +1007,13 @@ static void trigger_sample_for_thread(
// It SHOULD NOT be used for other purposes.
static VALUE _native_thread_list(DDTRACE_UNUSED VALUE _self) {
VALUE result = rb_ary_new();

debug_enter_unsafe_context();

ddtrace_thread_list(result);

debug_leave_unsafe_context();

return result;
}

Expand Down Expand Up @@ -1501,7 +1532,12 @@ void thread_context_collector_sample_allocation(VALUE self_instance, unsigned in
// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_sample_allocation(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE sample_weight, VALUE new_object) {
debug_enter_unsafe_context();

thread_context_collector_sample_allocation(collector_instance, NUM2UINT(sample_weight), new_object);

debug_leave_unsafe_context();

return Qtrue;
}

Expand Down Expand Up @@ -1597,7 +1633,7 @@ static void ddtrace_otel_trace_identifiers_for(
// trace and span representing it. Each ddtrace trace is then connected to the previous otel span, forming a linked
// list. The local root span is going to be the trace/span we find at the end of this linked list.
while (otel_values != Qnil) {
VALUE otel_span = rb_hash_lookup(otel_values, otel_current_span_key);
VALUE otel_span = safely_lookup_hash_without_going_into_ruby_code(otel_values, otel_current_span_key);
if (otel_span == Qnil) break;
VALUE next_trace = rb_ivar_get(otel_span, at_datadog_trace_id);
if (next_trace == Qnil) break;
Expand Down Expand Up @@ -1640,7 +1676,12 @@ void thread_context_collector_sample_skipped_allocation_samples(VALUE self_insta
}

static VALUE _native_sample_skipped_allocation_samples(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE skipped_samples) {
debug_enter_unsafe_context();

thread_context_collector_sample_skipped_allocation_samples(collector_instance, NUM2UINT(skipped_samples));

debug_leave_unsafe_context();

return Qtrue;
}

Expand Down Expand Up @@ -1709,7 +1750,7 @@ static void otel_without_ddtrace_trace_identifiers_for(

VALUE root_span_type = rb_ivar_get(local_root_span.span, at_kind_id /* @kind */);
// We filter out spans that don't have `kind: :server`
if (root_span_type == Qnil || !RB_TYPE_P(root_span_type, T_SYMBOL) || SYM2ID(root_span_type) != server_id) return;
if (root_span_type == Qnil || !RB_TYPE_P(root_span_type, T_SYMBOL) || !RB_STATIC_SYM_P(root_span_type) || SYM2ID(root_span_type) != server_id) return;

VALUE trace_resource = rb_ivar_get(local_root_span.span, at_name_id /* @name */);
if (!RB_TYPE_P(trace_resource, T_STRING)) return;
Expand All @@ -1726,7 +1767,7 @@ static struct otel_span otel_span_from(VALUE otel_context, VALUE otel_current_sp
if (context_entries == Qnil || !RB_TYPE_P(context_entries, T_HASH)) return failed;

// If it exists, context_entries is expected to be a Hash[OpenTelemetry::Context::Key, OpenTelemetry::Trace::Span]
VALUE span = rb_hash_lookup(context_entries, otel_current_span_key);
VALUE span = safely_lookup_hash_without_going_into_ruby_code(context_entries, otel_current_span_key);
if (span == Qnil) return failed;

// If it exists, span_context is expected to be a OpenTelemetry::Trace::SpanContext (don't confuse it with OpenTelemetry::Context)
Expand Down Expand Up @@ -1979,31 +2020,53 @@ static uint64_t otel_span_id_to_uint(VALUE otel_span_id) {
static VALUE _native_on_gvl_waiting(DDTRACE_UNUSED VALUE self, VALUE thread) {
ENFORCE_THREAD(thread);

debug_enter_unsafe_context();

thread_context_collector_on_gvl_waiting(thread_from_thread_object(thread));

debug_leave_unsafe_context();

return Qnil;
}

static VALUE _native_gvl_waiting_at_for(DDTRACE_UNUSED VALUE self, VALUE thread) {
ENFORCE_THREAD(thread);

debug_enter_unsafe_context();

intptr_t gvl_waiting_at = gvl_profiling_state_thread_object_get(thread);

debug_leave_unsafe_context();

return LONG2NUM(gvl_waiting_at);
}

static VALUE _native_on_gvl_running(DDTRACE_UNUSED VALUE self, VALUE thread) {
ENFORCE_THREAD(thread);

return thread_context_collector_on_gvl_running(thread_from_thread_object(thread)) == ON_GVL_RUNNING_SAMPLE ? Qtrue : Qfalse;
debug_enter_unsafe_context();

VALUE result = thread_context_collector_on_gvl_running(thread_from_thread_object(thread)) == ON_GVL_RUNNING_SAMPLE ? Qtrue : Qfalse;

debug_leave_unsafe_context();

return result;
}

static VALUE _native_sample_after_gvl_running(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE thread) {
ENFORCE_THREAD(thread);

return thread_context_collector_sample_after_gvl_running(
debug_enter_unsafe_context();

VALUE result = thread_context_collector_sample_after_gvl_running(
collector_instance,
thread,
monotonic_wall_time_now_ns(RAISE_ON_FAILURE)
);

debug_leave_unsafe_context();

return result;
}

static VALUE _native_apply_delta_to_cpu_time_at_previous_sample_ns(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE thread, VALUE delta_ns) {
Expand All @@ -2030,3 +2093,37 @@ static uint64_t otel_span_id_to_uint(VALUE otel_span_id) {
DDTRACE_UNUSED long current_cpu_time_ns
) { return false; }
#endif // NO_GVL_INSTRUMENTATION

#define MAX_SAFE_LOOKUP_SIZE 16

typedef struct { VALUE lookup_key; VALUE result; } safe_lookup_hash_state;

static int safe_lookup_hash_iterate(VALUE key, VALUE value, VALUE state_ptr) {
safe_lookup_hash_state *state = (safe_lookup_hash_state *) state_ptr;

if (key == state->lookup_key) {
state->result = value;
return ST_STOP;
}

return ST_CONTINUE;
}

// This method exists because we need to look up a hash during sampling, but we don't want to invoke any
// Ruby code as a side effect. To be able to look up by hash, `rb_hash_lookup` calls `#hash` on the key,
// which we want to avoid.
// Thus, instead, we opt to just iterate through the hash and check if we can find what we're looking for.
//
// To avoid having too much overhead here we only iterate in hashes up to MAX_SAFE_LOOKUP_SIZE.
// Note that we don't even try to iterate if the hash is bigger -- this is to avoid flaky behavior where
// depending on the internal storage order of the hash we may or not find the key, and instead we always
// enforce the size.
static VALUE safely_lookup_hash_without_going_into_ruby_code(VALUE hash, VALUE key) {
if (!RB_TYPE_P(hash, T_HASH) || RHASH_SIZE(hash) > MAX_SAFE_LOOKUP_SIZE) return Qnil;

safe_lookup_hash_state state = {.lookup_key = key, .result = Qnil};

rb_hash_foreach(hash, safe_lookup_hash_iterate, (VALUE) &state);

return state.result;
}
Original file line number Diff line number Diff line change
Expand Up @@ -800,3 +800,6 @@ static inline int ddtrace_imemo_type(VALUE imemo) {
return current_thread;
}
#endif

// Is the VM smack in the middle of raising an exception?
bool is_raised_flag_set(VALUE thread) { return thread_struct_from_object(thread)->ec->raised_flag > 0; }
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,5 @@ const char *imemo_kind(VALUE imemo);

#define ENFORCE_THREAD(value) \
{ if (RB_UNLIKELY(!rb_typeddata_is_kind_of(value, RTYPEDDATA_TYPE(rb_thread_current())))) raise_unexpected_type(value, ADD_QUOTES(value), "Thread", __FILE__, __LINE__, __func__); }

bool is_raised_flag_set(VALUE thread);
2 changes: 2 additions & 0 deletions ext/datadog_profiling_native_extension/profiling.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "ruby_helpers.h"
#include "setup_signal_handler.h"
#include "time_helpers.h"
#include "unsafe_api_calls_check.h"

// Each class/module here is implemented in their separate file
void collectors_cpu_and_wall_time_worker_init(VALUE profiling_module);
Expand Down Expand Up @@ -56,6 +57,7 @@ void DDTRACE_EXPORT Init_datadog_profiling_native_extension(void) {
collectors_thread_context_init(profiling_module);
http_transport_init(profiling_module);
stack_recorder_init(profiling_module);
unsafe_api_calls_check_init();

// Hosts methods used for testing the native code using RSpec
VALUE testing_module = rb_define_module_under(native_extension_module, "Testing");
Expand Down
Loading
Loading