Skip to content

Commit

Permalink
Backpressure handling (#2185)
Browse files Browse the repository at this point in the history
  • Loading branch information
sl0thentr0py authored Dec 20, 2023
1 parent 0874ca8 commit 7a6cc87
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 19 deletions.
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
## Unreleased

### Features

- Enable backpressure handling by default [#2185](https://github.com/getsentry/sentry-ruby/pull/2185)

The SDK can now dynamically downsamples transactions to reduce backpressure in high
throughput systems. It starts a new `BackpressureMonitor` thread to perform some health checks
which decide to downsample (halved each time) in 10 second intervals till the system
is healthy again.

To enable this behavior, use:

```ruby
Sentry.init do |config|
# ...
config.traces_sample_rate = 1.0
config.enable_backpressure_handling = true
end
```

If your system serves heavy load, please let us know how this feature works for you!

## 5.15.2

### Bug Fixes
Expand Down
24 changes: 13 additions & 11 deletions sentry-ruby/lib/sentry-ruby.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
require "sentry/hub"
require "sentry/background_worker"
require "sentry/session_flusher"
require "sentry/backpressure_monitor"
require "sentry/cron/monitor_check_ins"

[
Expand Down Expand Up @@ -72,6 +73,10 @@ def exception_locals_tp
# @return [SessionFlusher, nil]
attr_reader :session_flusher

# @!attribute [r] backpressure_monitor
# @return [BackpressureMonitor, nil]
attr_reader :backpressure_monitor

##### Patch Registration #####

# @!visibility private
Expand Down Expand Up @@ -217,17 +222,9 @@ def init(&block)
Thread.current.thread_variable_set(THREAD_LOCAL, hub)
@main_hub = hub
@background_worker = Sentry::BackgroundWorker.new(config)

@session_flusher = if config.auto_session_tracking
Sentry::SessionFlusher.new(config, client)
else
nil
end

if config.include_local_variables
exception_locals_tp.enable
end

@session_flusher = config.auto_session_tracking ? Sentry::SessionFlusher.new(config, client) : nil
@backpressure_monitor = config.enable_backpressure_handling ? Sentry::BackpressureMonitor.new(config, client) : nil
exception_locals_tp.enable if config.include_local_variables
at_exit { close }
end

Expand All @@ -246,6 +243,11 @@ def close
@session_flusher = nil
end

if @backpressure_monitor
@backpressure_monitor.kill
@backpressure_monitor = nil
end

if configuration&.include_local_variables
exception_locals_tp.disable
end
Expand Down
5 changes: 5 additions & 0 deletions sentry-ruby/lib/sentry/background_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ def shutdown
@shutdown_callback&.call
end

def full?
@executor.is_a?(Concurrent::ThreadPoolExecutor) &&
@executor.remaining_capacity == 0
end

private

def _perform(&block)
Expand Down
75 changes: 75 additions & 0 deletions sentry-ruby/lib/sentry/backpressure_monitor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# frozen_string_literal: true

module Sentry
class BackpressureMonitor
include LoggingHelper

DEFAULT_INTERVAL = 10
MAX_DOWNSAMPLE_FACTOR = 10

def initialize(configuration, client, interval: DEFAULT_INTERVAL)
@interval = interval
@client = client
@logger = configuration.logger

@thread = nil
@exited = false

@healthy = true
@downsample_factor = 0
end

def healthy?
ensure_thread
@healthy
end

def downsample_factor
ensure_thread
@downsample_factor
end

def run
check_health
set_downsample_factor
end

def check_health
@healthy = !(@client.transport.any_rate_limited? || Sentry.background_worker&.full?)
end

def set_downsample_factor
if @healthy
log_debug("[BackpressureMonitor] health check positive, reverting to normal sampling") if @downsample_factor.positive?
@downsample_factor = 0
else
@downsample_factor += 1 if @downsample_factor < MAX_DOWNSAMPLE_FACTOR
log_debug("[BackpressureMonitor] health check negative, downsampling with a factor of #{@downsample_factor}")
end
end

def kill
log_debug("[BackpressureMonitor] killing monitor")

@exited = true
@thread&.kill
end

private

def ensure_thread
return if @exited
return if @thread&.alive?

@thread = Thread.new do
loop do
sleep(@interval)
run
end
end
rescue ThreadError
log_debug("[BackpressureMonitor] Thread creation failed")
@exited = true
end
end
end
7 changes: 7 additions & 0 deletions sentry-ruby/lib/sentry/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ def capture_exception_frame_locals=(value)
# @return [Boolean]
attr_accessor :auto_session_tracking

# Whether to downsample transactions automatically because of backpressure.
# Starts a new monitor thread to check health of the SDK every 10 seconds.
# Default is false
# @return [Boolean]
attr_accessor :enable_backpressure_handling

# Allowlist of outgoing request targets to which sentry-trace and baggage headers are attached.
# Default is all (/.*/)
# @return [Array<String, Regexp>]
Expand Down Expand Up @@ -358,6 +364,7 @@ def initialize
self.skip_rake_integration = false
self.send_client_reports = true
self.auto_session_tracking = true
self.enable_backpressure_handling = false
self.trusted_proxies = []
self.dsn = ENV['SENTRY_DSN']
self.spotlight = false
Expand Down
11 changes: 9 additions & 2 deletions sentry-ruby/lib/sentry/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,12 @@ def set_initial_sample_decision(sampling_context:)
if sample_rate == true
@sampled = true
else
@sampled = Random.rand < sample_rate
if Sentry.backpressure_monitor
factor = Sentry.backpressure_monitor.downsample_factor
@effective_sample_rate /= 2**factor
end

@sampled = Random.rand < @effective_sample_rate
end

if @sampled
Expand Down Expand Up @@ -257,7 +262,9 @@ def finish(hub: nil, end_timestamp: nil)
event = hub.current_client.event_from_transaction(self)
hub.capture_event(event)
else
hub.current_client.transport.record_lost_event(:sample_rate, 'transaction')
is_backpressure = Sentry.backpressure_monitor&.downsample_factor&.positive?
reason = is_backpressure ? :backpressure : :sample_rate
hub.current_client.transport.record_lost_event(reason, 'transaction')
end
end

Expand Down
7 changes: 6 additions & 1 deletion sentry-ruby/lib/sentry/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class Transport
:sample_rate,
:before_send,
:event_processor,
:insufficient_data
:insufficient_data,
:backpressure
]

include LoggingHelper
Expand Down Expand Up @@ -119,6 +120,10 @@ def is_rate_limited?(item_type)
!!delay && delay > Time.now
end

def any_rate_limited?
@rate_limits.values.any? { |t| t && t > Time.now }
end

def envelope_from_event(event)
# Convert to hash
event_payload = event.to_hash
Expand Down
20 changes: 20 additions & 0 deletions sentry-ruby/spec/sentry/background_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,24 @@
expect(string_io.string).to match(/Shutting down background worker/)
end
end

describe "#full?" do
it "returns false if not a thread pool" do
configuration.background_worker_threads = 0
worker = described_class.new(configuration)
expect(worker.full?).to eq(false)
end

it "returns true if thread pool and full" do
configuration.background_worker_threads = 1
configuration.background_worker_max_queue = 1
worker = described_class.new(configuration)
expect(worker.full?).to eq(false)

2.times { worker.perform { sleep 0.1 } }
expect(worker.full?).to eq(true)
sleep 0.2
expect(worker.full?).to eq(false)
end
end
end
117 changes: 117 additions & 0 deletions sentry-ruby/spec/sentry/backpressure_monitor_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
require 'spec_helper'

RSpec.describe Sentry::BackpressureMonitor do
let(:string_io) { StringIO.new }

before do
perform_basic_setup do |config|
config.enable_backpressure_handling = true
config.logger = Logger.new(string_io)
end
end

let(:configuration) { Sentry.configuration }
let(:client) { Sentry.get_current_client }
let(:transport) { client.transport }
let(:background_worker) { Sentry.background_worker }

subject { described_class.new(configuration, client) }

describe '#healthy?' do
it 'returns true by default' do
expect(subject.healthy?).to eq(true)
end

it 'returns false when unhealthy' do
expect(transport).to receive(:any_rate_limited?).and_return(true)
subject.run
expect(subject.healthy?).to eq(false)
end

it 'spawns new thread' do
expect { subject.healthy? }.to change { Thread.list.count }.by(1)
expect(subject.instance_variable_get(:@thread)).to be_a(Thread)
end

it 'spawns only one thread' do
expect { subject.healthy? }.to change { Thread.list.count }.by(1)
thread = subject.instance_variable_get(:@thread)
expect(thread).to receive(:alive?).and_return(true)
expect { subject.healthy? }.to change { Thread.list.count }.by(0)
end

context 'when thread creation fails' do
before do
expect(Thread).to receive(:new).and_raise(ThreadError)
end

it 'does not create new thread' do
expect { subject.healthy? }.to change { Thread.list.count }.by(0)
end

it 'returns true (the default)' do
expect(subject.healthy?).to eq(true)
end

it 'logs error' do
subject.healthy?
expect(string_io.string).to match(/\[BackpressureMonitor\] Thread creation failed/)
end
end

context 'when killed' do
before { subject.kill }

it 'returns true (the default)' do
expect(subject.healthy?).to eq(true)
end

it 'does not create new thread' do
expect(Thread).not_to receive(:new)
expect { subject.healthy? }.to change { Thread.list.count }.by(0)
end
end
end

# thread behavior is tested above in healthy?
describe '#downsample_factor' do
it 'returns 0 by default' do
expect(subject.downsample_factor).to eq(0)
end

it 'increases when unhealthy upto limit' do
allow(transport).to receive(:any_rate_limited?).and_return(true)

10.times do |i|
subject.run
expect(subject.downsample_factor).to eq(i + 1)
end

2.times do |i|
subject.run
expect(subject.downsample_factor).to eq(10)
end
end
end

describe '#run' do
it 'logs behavior' do
allow(background_worker).to receive(:full?).and_return(true)
subject.run
expect(string_io.string).to match(/\[BackpressureMonitor\] health check negative, downsampling with a factor of 1/)

allow(background_worker).to receive(:full?).and_return(false)
subject.run
expect(string_io.string).to match(/\[BackpressureMonitor\] health check positive, reverting to normal sampling/)
end
end

describe '#kill' do
it 'kills the thread and logs a message' do
subject.healthy?
expect(subject.instance_variable_get(:@thread)).to receive(:kill)
subject.kill
expect(string_io.string).to match(/\[BackpressureMonitor\] killing monitor/)
end
end
end
Loading

0 comments on commit 7a6cc87

Please sign in to comment.