Skip to content

Commit

Permalink
chore(telemetry): queue app-started on first sent payload (#9767)
Browse files Browse the repository at this point in the history
## Background

The instrumentation telemetry writer is enabled when `ddtrace` is
imported, this is done so all products can begin queuing telemetry
metrics, logs, and configurations immediately. However the
`TelemetryWriter.app_started` is called when the first trace is sent to
the agent (when the AgentWriter is started), this delay allows us to
capture start up errors and configurations.

## Description

- Ensuring `telemetry_writer.app_started()` is called when the first
CIVisibility, Profiling, and LLMOBS payloads are sent to the agent.
Previously `app-started` (and `app-closing`) events were only sent when
the first trace was submitted. With this change `app-started` and
`app-closed` events are sent when tracing is disabled or products are
run with a datadog agent.

- Renames `TelemetryWriter._app_started_event` to
`TelemetryWriter.app_started`. The `_app_started_event` method is not
internal to the telemetry writer. This name change should hopefully make
this method easier to understand.

## Checklist

- [x] The PR description includes an overview of the change
- [x] The PR description articulates the motivation for the change
- [x] The change includes tests OR the PR description describes a
testing strategy
- [x] The PR description notes risks associated with the change, if any
- [x] Newly-added code is easy to change
- [x] The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- [x] The change includes or references documentation updates if
necessary
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist

- [x] Title is accurate
- [x] All changes are related to the pull request's stated goal
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- [x] Testing strategy adequately addresses listed risks
- [x] Newly-added code is easy to change
- [x] Release note makes sense to a user of the library
- [x] If necessary, author has acknowledged and discussed the
performance implications of this PR as reported in the benchmarks PR
comment
- [x] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
mabdinur authored Aug 8, 2024
1 parent 0399bd2 commit 7b24081
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 18 deletions.
8 changes: 3 additions & 5 deletions ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def _telemetry_entry(self, cfg_name: str) -> Tuple[str, str, _ConfigSource]:
raise ValueError("Unknown configuration item: %s" % cfg_name)
return name, value, item.source()

def _app_started_event(self, register_app_shutdown=True):
def app_started(self, register_app_shutdown=True):
# type: (bool) -> None
"""Sent when TelemetryWriter is enabled or forks"""
if self._forked or self.started:
Expand Down Expand Up @@ -770,9 +770,7 @@ def periodic(self, force_flush=False, shutting_down=False):
self._generate_logs_event(logs_metrics)

# Telemetry metrics and logs should be aggregated into payloads every time periodic is called.
# This ensures metrics and logs are submitted in 0 to 10 second time buckets.
# Optimization: All other events should be aggregated using `config._telemetry_heartbeat_interval`.
# Telemetry payloads will be submitted according to `config._telemetry_heartbeat_interval`.
# This ensures metrics and logs are submitted in 10 second time buckets.
if self._is_periodic and force_flush is False:
if self._periodic_count < self._periodic_threshold:
self._periodic_count += 1
Expand Down Expand Up @@ -879,7 +877,7 @@ def _telemetry_excepthook(self, tp, value, root_traceback):
self.add_integration(integration_name, True, error_msg=error_msg)

if self._enabled and not self.started:
self._app_started_event(False)
self.app_started(False)

self.app_shutdown()

Expand Down
6 changes: 2 additions & 4 deletions ddtrace/internal/writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ def write(self, spans=None):
# type: (Optional[List[Span]]) -> None
if not spans:
return

encoded = self.encoder.encode_traces([spans])
self.out.write(encoded + "\n")
self.out.flush()
Expand Down Expand Up @@ -321,6 +320,8 @@ def _send_payload(self, payload, count, client):
return response

def write(self, spans=None):
# Queues an app-started event before the first ci-visibility/llmobs/trace payload is sent
telemetry_writer.app_started()
for client in self._clients:
self._write_with_client(client, spans=spans)
if self._sync_mode:
Expand Down Expand Up @@ -608,9 +609,6 @@ def _send_payload(self, payload, count, client):
def start(self):
super(AgentWriter, self).start()
try:
if config._telemetry_enabled and not telemetry_writer.started:
telemetry_writer._app_started_event()

# appsec remote config should be enabled/started after the global tracer and configs
# are initialized
if os.getenv("AWS_LAMBDA_FUNCTION_NAME") is None and (
Expand Down
3 changes: 3 additions & 0 deletions ddtrace/profiling/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ddtrace.internal import compat
from ddtrace.internal import periodic
from ddtrace.internal.datadog.profiling import ddup
from ddtrace.internal.telemetry import telemetry_writer
from ddtrace.profiling import _traceback
from ddtrace.profiling import exporter
from ddtrace.settings.profiling import config
Expand Down Expand Up @@ -52,6 +53,8 @@ def flush(self):
# type: (...) -> None
"""Flush events from recorder to exporters."""
LOG.debug("Flushing events")
# Enable telemetry before the first profile is sent
telemetry_writer.app_started()
if self._export_libdd_enabled:
ddup.upload()

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_setting_origin_code(test_agent_session, run_python_code_in_subprocess):
from ddtrace.internal.telemetry import telemetry_writer
# simulate app start event, this occurs when the first span is sent to the datadog agent
telemetry_writer._app_started_event()
telemetry_writer.app_started()
""",
env=env,
)
Expand Down
2 changes: 1 addition & 1 deletion tests/telemetry/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def index():
def starting_app_view():
# We must call app-started before telemetry events can be sent to the agent.
# This endpoint mocks the behavior of the agent writer.
telemetry_writer._app_started_event()
telemetry_writer.app_started()
return "OK", 200


Expand Down
8 changes: 4 additions & 4 deletions tests/telemetry/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ def test_enable_fork(test_agent_session, run_python_code_in_subprocess):
from ddtrace.internal.runtime import get_runtime_id
from ddtrace.internal.telemetry import telemetry_writer
telemetry_writer._app_started_event()
telemetry_writer.app_started()
if os.fork() == 0:
# Send multiple started events to confirm none get sent
telemetry_writer._app_started_event()
telemetry_writer._app_started_event()
telemetry_writer._app_started_event()
telemetry_writer.app_started()
telemetry_writer.app_started()
telemetry_writer.app_started()
else:
# Print the parent process runtime id for validation
print(get_runtime_id())
Expand Down
6 changes: 3 additions & 3 deletions tests/telemetry/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ def test_add_event_disabled_writer(telemetry_writer, test_agent_session):


def test_app_started_event(telemetry_writer, test_agent_session, mock_time):
"""asserts that _app_started_event() queues a valid telemetry request which is then sent by periodic()"""
"""asserts that app_started() queues a valid telemetry request which is then sent by periodic()"""
with override_global_config(dict(_telemetry_dependency_collection=False)):
# queue an app started event
telemetry_writer._app_started_event()
telemetry_writer.app_started()
# force a flush
telemetry_writer.periodic(force_flush=True)

Expand Down Expand Up @@ -422,7 +422,7 @@ def test_update_dependencies_event_not_duplicated(telemetry_writer, test_agent_s
def test_app_closing_event(telemetry_writer, test_agent_session, mock_time):
"""asserts that app_shutdown() queues and sends an app-closing telemetry request"""
# app started event must be queued before any other telemetry event
telemetry_writer._app_started_event(register_app_shutdown=False)
telemetry_writer.app_started(register_app_shutdown=False)
assert telemetry_writer.started
# send app closed event
telemetry_writer.app_shutdown()
Expand Down

0 comments on commit 7b24081

Please sign in to comment.