Skip to content

Commit

Permalink
fix(celery): stop closing prerun_span too soon to account for Celery …
Browse files Browse the repository at this point in the history
…chains scenario [backport 2.19] (#12001)

Backport e8aab65 from #11498 to 2.19.

We've made a few changes to handle celery context recently, including:
#10676

In particular the goal of
#10676 was to handle a
scenario where a long running task may run into an exception, preventing
it from closing.

Unfortunately, this scenario did not account for cases where tasks are
chained and may not close until later.

See: #11479 and
#11624

With this PR, the sample app in
#11479 would attach the
celery specific span back to the root span.

I also need to add tests for the chains scenario.

Related to AIDM-494

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

Co-authored-by: wantsui <[email protected]>
  • Loading branch information
github-actions[bot] and wantsui authored Jan 17, 2025
1 parent c11edb8 commit a08f68a
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 14 deletions.
11 changes: 0 additions & 11 deletions ddtrace/contrib/internal/celery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,6 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
if task_span:
task_span.set_exc_info(*sys.exc_info())

prerun_span = core.get_item("prerun_span")
if prerun_span:
prerun_span.set_exc_info(*sys.exc_info())

raise
finally:
task_span = core.get_item("task_span")
Expand All @@ -147,11 +143,4 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
)
task_span.finish()

prerun_span = core.get_item("prerun_span")
if prerun_span:
log.debug(
"The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint()
)
prerun_span.finish()

return _traced_apply_async_inner
3 changes: 0 additions & 3 deletions ddtrace/contrib/internal/celery/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ def trace_prerun(*args, **kwargs):
service = config.celery["worker_service_name"]
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER)

# Store an item called "prerun span" in case task_postrun doesn't get called
core.set_item("prerun_span", span)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
tracing(celery): Fixes an issue where ``celery.apply`` spans from Celery prerun got closed too soon leading to span tags being missing.
5 changes: 5 additions & 0 deletions tests/contrib/celery/run_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from tasks import fn_a
from tasks import fn_b


(fn_a.si() | fn_b.si()).delay()
14 changes: 14 additions & 0 deletions tests/contrib/celery/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from celery import Celery


app = Celery("tasks")


@app.task(name="tests.contrib.celery.tasks.fn_a")
def fn_a():
return "a"


@app.task(name="tests.contrib.celery.tasks.fn_b")
def fn_b():
return "b"
62 changes: 62 additions & 0 deletions tests/contrib/celery/test_chained_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os
import re
import subprocess
import time

from celery import Celery


# Ensure that when we call Celery chains, the root span has celery specific span tags
# The test_integration.py setup doesn't perfectly mimic the condition of a worker process running.
# This test runs the worker as a side so we can check the tracer logs afterwards to ensure expected span results.
# See https://github.com/DataDog/dd-trace-py/issues/11479
def test_task_chain_task_call_task():
app = Celery("tasks")

celery_worker_cmd = "ddtrace-run celery -A tasks worker -c 1 -l DEBUG -n uniquename1 -P solo"
celery_task_runner_cmd = "ddtrace-run python run_tasks.py"

# The commands need to run from the directory where this test file lives
current_directory = str(os.path.dirname(__file__))

worker_process = subprocess.Popen(
celery_worker_cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
close_fds=True,
cwd=current_directory,
)

max_wait_time = 10
waited_so_far = 0
# {app.control.inspect().active() returns {'celery@uniquename1': []} when the worker is running}
while app.control.inspect().active() is None and waited_so_far < max_wait_time:
time.sleep(1)
waited_so_far += 1

# The task should only run after the Celery worker has sufficient time to start up
task_runner_process = subprocess.Popen(
celery_task_runner_cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
close_fds=True,
cwd=current_directory,
)

task_runner_process.wait()
# Kill the process so it starts to send traces to the Trace Agent
worker_process.kill()
worker_logs = worker_process.stderr.read()

# Check that the root span was created with one of the Celery specific tags, such as celery.correlation_id
# Some versions of python seem to require escaping when using `re.search`:
old_pattern_match = r"resource=\\'tests.contrib.celery.tasks.fn_a\\' type=\\'worker\\' .* tags=.*correlation_id.*"
new_pattern_match = r"resource=\'tests.contrib.celery.tasks.fn_a\' type=\'worker\' .* tags=.*correlation_id.*"

pattern_exists = (
re.search(old_pattern_match, str(worker_logs)) is not None
or re.search(new_pattern_match, str(worker_logs)) is not None
)
assert pattern_exists is not None

0 comments on commit a08f68a

Please sign in to comment.