Skip to content

Commit

Permalink
Merge branch '2.19' into backport-11950-to-2.19
Browse files Browse the repository at this point in the history
  • Loading branch information
erikayasuda authored Jan 17, 2025
2 parents 026132d + a08f68a commit 6a07e0f
Show file tree
Hide file tree
Showing 16 changed files with 110 additions and 22 deletions.
11 changes: 0 additions & 11 deletions ddtrace/contrib/internal/celery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,6 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
if task_span:
task_span.set_exc_info(*sys.exc_info())

prerun_span = core.get_item("prerun_span")
if prerun_span:
prerun_span.set_exc_info(*sys.exc_info())

raise
finally:
task_span = core.get_item("task_span")
Expand All @@ -147,11 +143,4 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
)
task_span.finish()

prerun_span = core.get_item("prerun_span")
if prerun_span:
log.debug(
"The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint()
)
prerun_span.finish()

return _traced_apply_async_inner
3 changes: 0 additions & 3 deletions ddtrace/contrib/internal/celery/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ def trace_prerun(*args, **kwargs):
service = config.celery["worker_service_name"]
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER)

# Store an item called "prerun span" in case task_postrun doesn't get called
core.set_item("prerun_span", span)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class UploaderBuilder
static void set_output_filename(std::string_view _output_filename);

static std::variant<Uploader, std::string> build();

static void postfork_child();
};

} // namespace Datadog
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ namespace Datadog {
void
Datadog::CodeProvenance::postfork_child()
{
get_instance().mtx.~mutex(); // Destroy the mutex
// NB placement-new to re-init and leak the mutex because doing anything else is UB
new (&get_instance().mtx) std::mutex(); // Recreate the mutex
get_instance().reset(); // Reset the state
}

void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ ddup_postfork_child()
Datadog::Uploader::postfork_child();
Datadog::SampleManager::postfork_child();
Datadog::CodeProvenance::postfork_child();
Datadog::UploaderBuilder::postfork_child();
}

void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,6 @@ Datadog::Profile::collect(const ddog_prof_Sample& sample, int64_t endtime_ns)
void
Datadog::Profile::postfork_child()
{
profile_mtx.unlock();
new (&profile_mtx) std::mutex();
cycle_buffers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ Datadog::Sample::push_absolute_ns(int64_t _timestamp_ns)
return true;
}


bool
Datadog::Sample::push_monotonic_ns(int64_t _monotonic_ns)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,5 +175,6 @@ Datadog::Uploader::postfork_parent()
void
Datadog::Uploader::postfork_child()
{
unlock();
// NB placement-new to re-init and leak the mutex because doing anything else is UB
new (&upload_lock) std::mutex();
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,10 @@ Datadog::UploaderBuilder::build()

return Datadog::Uploader{ output_filename, ddog_exporter };
}

void
Datadog::UploaderBuilder::postfork_child()
{
// NB placement-new to re-init and leak the mutex because doing anything else is UB
new (&tag_mutex) std::mutex();
}
5 changes: 5 additions & 0 deletions ddtrace/internal/datadog/profiling/stack_v2/src/sampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ _stack_v2_atfork_child()
// so we don't even reveal this function to the user
_set_pid(getpid());
ThreadSpanLinks::postfork_child();

// `thread_info_map_lock` and `task_link_map_lock` are global locks held in echion
// NB placement-new to re-init and leak the mutex because doing anything else is UB
new (&thread_info_map_lock) std::mutex;
new (&task_link_map_lock) std::mutex;
}

__attribute__((constructor)) void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@ ThreadSpanLinks::reset()
void
ThreadSpanLinks::postfork_child()
{
// Explicitly destroy and reconstruct the mutex to avoid undefined behavior
get_instance().mtx.~mutex();
// NB placement-new to re-init and leak the mutex because doing anything else is UB
new (&get_instance().mtx) std::mutex();

get_instance().reset();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
tracing(celery): Fixes an issue where ``celery.apply`` spans from Celery prerun got closed too soon leading to span tags being missing.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
profiling: Fixes a bug where profiling mutexes were not cleared on fork in the child process. This could
cause deadlocks in certain configurations.
5 changes: 5 additions & 0 deletions tests/contrib/celery/run_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from tasks import fn_a
from tasks import fn_b


(fn_a.si() | fn_b.si()).delay()
14 changes: 14 additions & 0 deletions tests/contrib/celery/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from celery import Celery


app = Celery("tasks")


@app.task(name="tests.contrib.celery.tasks.fn_a")
def fn_a():
return "a"


@app.task(name="tests.contrib.celery.tasks.fn_b")
def fn_b():
return "b"
62 changes: 62 additions & 0 deletions tests/contrib/celery/test_chained_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os
import re
import subprocess
import time

from celery import Celery


# Ensure that when we call Celery chains, the root span has celery specific span tags
# The test_integration.py setup doesn't perfectly mimic the condition of a worker process running.
# This test runs the worker as a side so we can check the tracer logs afterwards to ensure expected span results.
# See https://github.com/DataDog/dd-trace-py/issues/11479
def test_task_chain_task_call_task():
app = Celery("tasks")

celery_worker_cmd = "ddtrace-run celery -A tasks worker -c 1 -l DEBUG -n uniquename1 -P solo"
celery_task_runner_cmd = "ddtrace-run python run_tasks.py"

# The commands need to run from the directory where this test file lives
current_directory = str(os.path.dirname(__file__))

worker_process = subprocess.Popen(
celery_worker_cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
close_fds=True,
cwd=current_directory,
)

max_wait_time = 10
waited_so_far = 0
# {app.control.inspect().active() returns {'celery@uniquename1': []} when the worker is running}
while app.control.inspect().active() is None and waited_so_far < max_wait_time:
time.sleep(1)
waited_so_far += 1

# The task should only run after the Celery worker has sufficient time to start up
task_runner_process = subprocess.Popen(
celery_task_runner_cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
close_fds=True,
cwd=current_directory,
)

task_runner_process.wait()
# Kill the process so it starts to send traces to the Trace Agent
worker_process.kill()
worker_logs = worker_process.stderr.read()

# Check that the root span was created with one of the Celery specific tags, such as celery.correlation_id
# Some versions of python seem to require escaping when using `re.search`:
old_pattern_match = r"resource=\\'tests.contrib.celery.tasks.fn_a\\' type=\\'worker\\' .* tags=.*correlation_id.*"
new_pattern_match = r"resource=\'tests.contrib.celery.tasks.fn_a\' type=\'worker\' .* tags=.*correlation_id.*"

pattern_exists = (
re.search(old_pattern_match, str(worker_logs)) is not None
or re.search(new_pattern_match, str(worker_logs)) is not None
)
assert pattern_exists is not None

0 comments on commit 6a07e0f

Please sign in to comment.