Skip to content

Commit

Permalink
Add support for distributed tracing for DelayedJob (#2233)
Browse files Browse the repository at this point in the history
  • Loading branch information
sl0thentr0py authored Jan 28, 2024
1 parent a882a60 commit 3135516
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features

- Add support for distributed tracing in `sentry-delayed_job` [#2233](https://github.com/getsentry/sentry-ruby/pull/2233)
- Fix warning about default gems on Ruby 3.3.0 ([#2225](https://github.com/getsentry/sentry-ruby/pull/2225))
- Add `hint:` support to `Sentry::Rails::ErrorSubscriber` [#2235](https://github.com/getsentry/sentry-ruby/pull/2235)

Expand Down
42 changes: 35 additions & 7 deletions sentry-delayed_job/lib/sentry/delayed_job/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ class Plugin < ::Delayed::Plugin
OP_NAME = "queue.delayed_job".freeze

callbacks do |lifecycle|
lifecycle.before(:enqueue) do |job, *args, &block|
inject_trace_data(job) if Sentry.initialized?
end

lifecycle.around(:invoke_job) do |job, *args, &block|
env = extract_trace_data(job)
next block.call(job, *args) unless Sentry.initialized?

Sentry.with_scope do |scope|
Expand All @@ -20,12 +25,7 @@ class Plugin < ::Delayed::Plugin
scope.set_contexts(**contexts)
scope.set_tags("delayed_job.queue" => job.queue, "delayed_job.id" => job.id.to_s)

transaction = Sentry.start_transaction(
name: scope.transaction_name,
source: scope.transaction_source,
op: OP_NAME,
custom_sampling_context: contexts
)
transaction = start_transaction(scope, env, contexts)
scope.set_span(transaction) if transaction

begin
Expand Down Expand Up @@ -70,7 +70,7 @@ def self.generate_contexts(job)
end

def self.compute_job_class(payload_object)
if payload_object.is_a? Delayed::PerformableMethod
if payload_object.is_a?(Delayed::PerformableMethod)
klass = payload_object.object.is_a?(Class) ? payload_object.object.name : payload_object.object.class.name
"#{klass}##{payload_object.method_name}"
else
Expand All @@ -91,12 +91,40 @@ def self.report?(job)
job.attempts >= max_attempts
end

def self.start_transaction(scope, env, contexts)
options = { name: scope.transaction_name, source: scope.transaction_source, op: OP_NAME }
transaction = Sentry.continue_trace(env, **options)
Sentry.start_transaction(transaction: transaction, custom_sampling_context: contexts, **options)
end

def self.finish_transaction(transaction, status)
return unless transaction

transaction.set_http_status(status)
transaction.finish
end

def self.inject_trace_data(job)
# active job style is handled in the sentry-rails/active_job extension more generally
# if someone enqueues manually with some other job class, we cannot make assumptions unfortunately
payload_object = job.payload_object
return unless payload_object.is_a?(Delayed::PerformableMethod)

# we will add the trace data to args and remove it again
# this is hacky but it's the only reliable way to survive the YAML serialization/deserialization
payload_object.args << { sentry: Sentry.get_trace_propagation_headers }
job.payload_object = payload_object
end

def self.extract_trace_data(job)
payload_object = job.payload_object
return nil unless payload_object.is_a?(Delayed::PerformableMethod)

target_payload = payload_object.args.find { |a| a.is_a?(Hash) && a.key?(:sentry) }
return nil unless target_payload
payload_object.args.delete(target_payload)
target_payload[:sentry]
end
end
end
end
Expand Down
43 changes: 43 additions & 0 deletions sentry-delayed_job/spec/sentry/delayed_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def do_nothing

def self.class_do_nothing
end

def do_nothing_with_args(a)
end
end

it "sets correct extra/tags context for each job" do
Expand Down Expand Up @@ -404,6 +407,46 @@ def perform
event = transport.events.last
expect(event.contexts.dig(:trace, :trace_id)).to eq(transaction.contexts.dig(:trace, :trace_id))
end

context "with upstream trace" do
before do
transaction = Sentry.start_transaction
Sentry.get_current_scope.set_span(transaction)

Post.new.delay.do_nothing_with_args(1)
end

let(:parent_transaction) { Sentry.get_current_scope.span }
let(:enqueued_job) { Delayed::Backend::ActiveRecord::Job.last }

it "injects the trace propagation headers to args for PerformableMethod" do
payload_object = enqueued_job.payload_object
expect(payload_object).to be_a(Delayed::PerformableMethod)
expect(payload_object.args.last).to include(:sentry)
expect(payload_object.args.last[:sentry]["sentry-trace"]).to eq(parent_transaction.to_sentry_trace)
expect(payload_object.args.last[:sentry]["baggage"]).to eq(parent_transaction.to_baggage)
end

it "invokes the job with correct args" do
payload_object = enqueued_job.payload_object
expect(payload_object.object).to be_a(Post)
expect(payload_object.object).to receive(:do_nothing_with_args).with(1)

enqueued_job.invoke_job
end

it "continues the trace" do
enqueued_job.invoke_job

expect(transport.events.count).to eq(1)
transaction = transport.events.last

expect(transaction.transaction).to eq("Post#do_nothing_with_args")
expect(transaction.contexts.dig(:trace, :trace_id)).to eq(parent_transaction.trace_id)
expect(transaction.contexts.dig(:trace, :parent_span_id)).to eq(parent_transaction.span_id)
expect(transaction.dynamic_sampling_context).to eq(parent_transaction.get_baggage.dynamic_sampling_context)
end
end
end
end

Expand Down
1 change: 1 addition & 0 deletions sentry-delayed_job/spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "bundler/setup"
require "debug" if RUBY_VERSION.to_f >= 2.6 && RUBY_ENGINE == "ruby"
require "pry"

require "active_record"
Expand Down

0 comments on commit 3135516

Please sign in to comment.