Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sidekiq:Add trace correlation for internal logs #3823

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -1847,6 +1847,21 @@ end
| `on_error` | | `Proc` | Custom error handler invoked when a job raises an error. Provided `span` and `error` as arguments. Sets error on the span by default. Useful for ignoring transient errors. | `proc { \|span, error\| span.set_error(error) unless span.nil? }` |
| `quantize` | | `Hash` | Hash containing options for quantization of job arguments. | `{}` |

#### Log Correlation

To correlate Sidekiq logs with traces, you can configure [Sidekiq's logger](https://github.com/sidekiq/sidekiq/wiki/Logging)
to either use an [existing supported logger instance](#for-logging-in-rails-applications) or use Sidekiq's JSON logger formatter:

```ruby
Sidekiq.configure_client do |config|
config.logger.formatter = Sidekiq::Logger::Formatters::JSON.new
end

Sidekiq.configure_server do |config|
config.logger.formatter = Sidekiq::Logger::Formatters::JSON.new
end
```

### Sinatra

The Sinatra integration traces requests and template rendering.
Expand Down
7 changes: 7 additions & 0 deletions lib/datadog/tracing/contrib/sidekiq/patcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,16 @@ def patch

config.server_middleware do |chain|
chain.add(Sidekiq::ServerTracer)

# Patch late to ensure `Sidekiq::Processor` is loaded.
# Used for log correlation.
::Sidekiq::Processor.prepend(Sidekiq::ServerTracer::Processor)
end

patch_server_internals if Integration.compatible_with_server_internal_tracing?

# Patch for log correlation
::Sidekiq::Logger::Formatters::JSON.prepend(Sidekiq::ServerTracer::JSONFormatter)
end
end

Expand Down
48 changes: 42 additions & 6 deletions lib/datadog/tracing/contrib/sidekiq/server_tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,12 @@ class ServerTracer
def initialize(options = {})
@sidekiq_service = options[:service_name] || configuration[:service_name]
@on_error = options[:on_error] || configuration[:on_error]
@distributed_tracing = options[:distributed_tracing] || configuration[:distributed_tracing]
@quantize = options[:quantize] || configuration[:quantize]
end

def call(worker, job, queue)
resource = job_resource(job)

if @distributed_tracing
trace_digest = Sidekiq.extract(job)
Datadog::Tracing.continue_trace!(trace_digest)
end

Datadog::Tracing.trace(
Ext::SPAN_JOB,
service: @sidekiq_service,
Expand Down Expand Up @@ -81,6 +75,48 @@ def propagation
def configuration
Datadog.configuration.tracing[:sidekiq]
end

# Since Sidekiq 5, the server logger runs before any middleware is run.
# (https://github.com/sidekiq/sidekiq/blob/40de8236e927d752fc1ec5d220f276a9b4b5c84b/lib/sidekiq/processor.rb#L135)
# Due of this, we cannot create a trace early enough using middlewares that allow log correlation to work
# A way around it is to create a TraceOperation early (and thus a `trace_id`), and let the middleware handle
# the span creation.
# This works because logs are correlated on the `trace_id`, not `span_id`.
module Processor
# Copy visibility from Sidekiq::Processor's class declaration, to ensure
# we are declaring `dispatch` with the correct visibility. Only applicable in testing mode.
# @see https://github.com/sidekiq/sidekiq/blob/40de8236e927d752fc1ec5d220f276a9b4b5c84b/lib/sidekiq/processor.rb#L68
private if defined?($TESTING) && $TESTING # rubocop:disable Layout/EmptyLinesAroundAccessModifier, Style/GlobalVars

# The main method used by Sidekiq to process jobs.
# The Sidekiq logger runs inside this method.
# @see Sidekiq::Processor#dispatch
def dispatch(*args, **kwargs, &block)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Quality Violation

Optional arguments should appear at the end (...read more)

The rule "Optional arguments should appear at the end" is an important programming practice in Ruby. It is used to ensure that the optional parameters in a method definition are placed after the required parameters. This rule is important because when a method is called, Ruby fills in the arguments from left to right. If an optional argument is placed before a required argument and the method is called with fewer arguments, Ruby will assign the provided arguments to the optional parameters, leaving the required parameters without values and causing an error.

Non-compliance with this rule often leads to unexpected behavior or bugs in the code, which can be quite challenging to debug. This is particularly true when the method is called with fewer arguments than defined. The errors caused by this can be hard to track down, as they may not occur at the place where the method is defined, but rather in some distant place where the method is called.

To avoid this, always place optional parameters at the end of the list of parameters in your method definitions. This way, Ruby will fill in the required parameters first, and only then use any remaining arguments for the optional ones. If there are no remaining arguments, the optional parameters will be set to their default values. This keeps your code clear, predictable, and free of unnecessary bugs.

View in Datadog  Leave us feedback  Documentation

if Datadog.configuration.tracing[:sidekiq][:distributed_tracing]
trace_digest = Sidekiq.extract(args[0]) rescue nil
marcotc marked this conversation as resolved.
Show resolved Hide resolved
end

Datadog::Tracing.continue_trace!(trace_digest)

super
end
end

# Performs log correlation injecting for Sidekiq.
# Currently only supports Sidekiq's JSON formatter.
module JSONFormatter
SKIP_FIRST_STRING_CHAR = (1..-1).freeze

def call(severity, time, program_name, message)
entry = super

# Concatenate the correlation with the JSON string log entry,
# since there's no way to inject the correlation values into
# the original JSON.
correlation = ::Sidekiq.dump_json(Tracing.correlation.to_h)
"#{correlation.chop},#{entry[SKIP_FIRST_STRING_CHAR]}"
end
end
end
end
end
Expand Down
177 changes: 62 additions & 115 deletions spec/datadog/tracing/contrib/sidekiq/distributed_tracing_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,24 @@
require 'datadog/tracing/contrib/sidekiq/server_tracer'

require 'sidekiq/testing'
require_relative 'support/helper'
require_relative 'support/legacy_test_helpers' if Sidekiq::VERSION < '4'
require 'sidekiq/api'

RSpec.describe 'Sidekiq distributed tracing' do
around do |example|
Sidekiq::Testing.fake! do
Sidekiq::Testing.server_middleware.clear
Sidekiq::Testing.server_middleware do |chain|
chain.add(Datadog::Tracing::Contrib::Sidekiq::ServerTracer)
end
include_context 'Sidekiq server'

example.run
end
end

after do
Datadog.configuration.tracing[:sidekiq].reset!
Sidekiq::Queues.clear_all
end

let!(:empty_worker) do
before do
stub_const(
'EmptyWorker',
'PropagationWorker',
Class.new do
include Sidekiq::Worker
def perform; end
def perform
# Save the trace digest in the job span for future inspection
data = {}
Datadog::Tracing::Contrib::Sidekiq.inject(Datadog::Tracing.active_trace.to_digest, data)
Datadog::Tracing.active_span.set_tag('digest', data.to_json)
end
end
)
end
Expand All @@ -43,6 +35,12 @@ def perform; end
end

context 'when dispatching' do
before do
configure_sidekiq
Sidekiq::Testing.fake!
Sidekiq::Queues.clear_all
end

it 'propagates through serialized job' do
EmptyWorker.perform_async

Expand All @@ -65,59 +63,31 @@ def perform; end
end
end

context 'when receiving' do
let(:trace_id) { Datadog::Tracing::Utils::TraceId.next_id }
let(:span_id) { Datadog::Tracing::Utils.next_id }
let(:jid) { '123abc' }

it 'continues trace from serialized job' do
Sidekiq::Queues.push(
EmptyWorker.queue,
EmptyWorker.to_s,
EmptyWorker.sidekiq_options.merge(
'args' => [],
'class' => EmptyWorker.to_s,
'jid' => jid,
'x-datadog-trace-id' => low_order_trace_id(trace_id).to_s,
'x-datadog-parent-id' => span_id.to_s,
'x-datadog-sampling-priority' => '2',
'x-datadog-tags' => "_dd.p.dm=-99,_dd.p.tid=#{high_order_hex_trace_id(trace_id)}",
'x-datadog-origin' => 'my-origin'
)
)

EmptyWorker.perform_one

expect(span.trace_id).to eq(trace_id)
expect(span.parent_id).to eq(span_id)
expect(span.service).to eq(tracer.default_service)
expect(span.resource).to eq('EmptyWorker')
expect(span.get_tag('sidekiq.job.queue')).to eq('default')
expect(span.status).to eq(0)
expect(span.get_tag('component')).to eq('sidekiq')
expect(span.get_tag('operation')).to eq('job')
expect(span.get_tag('span.kind')).to eq('consumer')

expect(trace.send(:meta)['_dd.p.dm']).to eq('-99')
expect(trace.sampling_priority).to eq(2)
expect(trace.origin).to eq('my-origin')
end
end

context 'round trip' do
it 'creates 2 spans for a distributed trace' do
EmptyWorker.perform_async
EmptyWorker.perform_one
expect_in_sidekiq_server do
Datadog::Tracing.trace('test setup') do |_span, trace|
trace.sampling_priority = 2
trace.origin = 'my-origin'
trace.set_tag('_dd.p.dm', '-99')

expect(spans).to have(2).items
PropagationWorker.perform_async
end

job_span, push_span = spans
job_span = fetch_job_span
push_span = spans.find { |s| s.name == 'sidekiq.push' }

expect(push_span).to be_root_span
expect(push_span.get_tag('sidekiq.job.id')).to eq(job_span.get_tag('sidekiq.job.id'))
expect(push_span.get_tag('sidekiq.job.id')).to eq(job_span.get_tag('sidekiq.job.id'))

expect(job_span.trace_id).to eq(push_span.trace_id)
expect(job_span.parent_id).to eq(push_span.id)
expect(job_span.trace_id).to eq(push_span.trace_id)
expect(job_span.parent_id).to eq(push_span.id)

digest = Datadog::Tracing::Contrib::Sidekiq.extract(JSON.parse(job_span.get_tag('digest')))

expect(digest.trace_distributed_tags['_dd.p.dm']).to eq('-99')
expect(digest.trace_sampling_priority).to eq(2)
expect(digest.trace_origin).to eq('my-origin')
end
end
end
end
Expand All @@ -130,6 +100,12 @@ def perform; end
end

context 'when dispatching' do
before do
configure_sidekiq
Sidekiq::Testing.fake!
Sidekiq::Queues.clear_all
end

it 'does not propagate through serialized job' do
EmptyWorker.perform_async

Expand All @@ -152,63 +128,34 @@ def perform; end
end
end

context 'when receiving' do
let(:trace_id) { Datadog::Tracing::Utils::TraceId.next_id }
let(:span_id) { Datadog::Tracing::Utils.next_id }
let(:jid) { '123abc' }

it 'does not continue trace from serialized job' do
Sidekiq::Queues.push(
EmptyWorker.queue,
EmptyWorker.to_s,
EmptyWorker.sidekiq_options.merge(
'args' => [],
'class' => EmptyWorker.to_s,
'jid' => jid,
'x-datadog-trace-id' => trace_id.to_s,
'x-datadog-parent-id' => span_id.to_s,
'x-datadog-sampling-priority' => '2',
'x-datadog-tags' => '_dd.p.dm=99',
'x-datadog-origin' => 'my-origin'
)
)

EmptyWorker.perform_one

expect(span).to be_root_span
expect(span.trace_id).not_to eq(trace_id)
expect(span.parent_id).to eq(0)
expect(span.service).to eq(tracer.default_service)
expect(span.resource).to eq('EmptyWorker')
expect(span.get_tag('sidekiq.job.queue')).to eq('default')
expect(span.status).to eq(0)
expect(span.get_tag('component')).to eq('sidekiq')
expect(span.get_tag('operation')).to eq('job')
expect(span.get_tag('span.kind')).to eq('consumer')

expect(trace.send(:meta)['_dd.p.dm']).to eq('-0')
expect(trace.sampling_priority).to eq(1)
expect(trace.origin).to be_nil
end
end

context 'round trip' do
it 'creates 2 spans with separate traces' do
EmptyWorker.perform_async
EmptyWorker.perform_one
expect_in_sidekiq_server do
Datadog::Tracing.trace('test setup') do |_span, trace|
trace.sampling_priority = 2
trace.origin = 'my-origin'
trace.set_tag('_dd.p.dm', '-99')

PropagationWorker.perform_async
end

job_span = fetch_job_span
push_span = spans.find { |s| s.name == 'sidekiq.push' }

expect(spans).to have(2).items
expect(push_span.trace_id).to_not eq(job_span.trace_id)
expect(push_span.get_tag('sidekiq.job.id')).to eq(job_span.get_tag('sidekiq.job.id'))

job_span, push_span = spans
expect(job_span.resource).to eq('PropagationWorker')

expect(push_span.trace_id).to_not eq(job_span.trace_id)
expect(push_span.get_tag('sidekiq.job.id')).to eq(job_span.get_tag('sidekiq.job.id'))
expect(job_span).to be_root_span
expect(job_span.resource).to eq('PropagationWorker')

expect(push_span).to be_root_span
expect(job_span.resource).to eq('EmptyWorker')
digest = Datadog::Tracing::Contrib::Sidekiq.extract(JSON.parse(job_span.get_tag('digest')))

expect(job_span).to be_root_span
expect(job_span.resource).to eq('EmptyWorker')
expect(digest.trace_distributed_tags['_dd.p.dm']).to_not eq('-99')
expect(digest.trace_sampling_priority).to_not eq(2)
expect(digest.trace_origin).to_not eq('my-origin')
end
end
end
end
Expand Down
37 changes: 37 additions & 0 deletions spec/datadog/tracing/contrib/sidekiq/logging_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
require 'datadog/tracing/contrib/support/spec_helper'
require_relative 'support/helper'

RSpec.describe 'Sidekiq Logging' do
include_context 'Sidekiq server'

before do
stub_const(
'EmptyWorker',
Class.new do
include Sidekiq::Worker
def perform
logger.info('Running EmptyWorker')
end
end
)
end

it 'traces the looping job fetching' do
expect_in_sidekiq_server(log_level: Logger::INFO) do
EmptyWorker.perform_async

span = try_wait_until { fetch_spans.find { |s| s.name == 'sidekiq.job' } }

# Traces in propagation can get truncated to 64-bits by default
trace_id = Datadog::Tracing::Utils::TraceId.to_low_order(span.trace_id).to_s
stdout = File.read($stdout)

expect(stdout).to match(/"trace_id":"#{trace_id}".*start/)

expect(stdout).to match(/"trace_id":"#{trace_id}".*Running EmptyWorker/)
expect(stdout).to match(/"span_id":"#{span.id}".*Running EmptyWorker/)

expect(stdout).to match(/"trace_id":"#{trace_id}".*done/)
end
end
end
Loading
Loading