Skip to content

Commit

Permalink
Merge branch 'develop-openai-instrumentation' into feature-openai-err…
Browse files Browse the repository at this point in the history
…or-traces
  • Loading branch information
mergify[bot] authored Oct 19, 2023
2 parents 97cfc40 + 25729d3 commit 98a0911
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 42 deletions.
47 changes: 36 additions & 11 deletions newrelic/core/otlp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging

from newrelic.api.time_trace import get_service_linking_metadata
from newrelic.common.encoding_utils import json_encode
from newrelic.core.config import global_settings
from newrelic.core.stats_engine import CountStats, TimeStats
Expand Down Expand Up @@ -124,8 +125,11 @@ def create_key_values_from_iterable(iterable):
)


def create_resource(attributes=None):
def create_resource(attributes=None, attach_apm_entity=True):
attributes = attributes or {"instrumentation.provider": "newrelic-opentelemetry-python-ml"}
if attach_apm_entity:
metadata = get_service_linking_metadata()
attributes.update(metadata)
return Resource(attributes=create_key_values_from_iterable(attributes))


Expand Down Expand Up @@ -203,7 +207,7 @@ def stats_to_otlp_metrics(metric_data, start_time, end_time):


def encode_metric_data(metric_data, start_time, end_time, resource=None, scope=None):
resource = resource or create_resource()
resource = resource or create_resource(attach_apm_entity=False)
return MetricsData(
resource_metrics=[
ResourceMetrics(
Expand All @@ -220,24 +224,45 @@ def encode_metric_data(metric_data, start_time, end_time, resource=None, scope=N


def encode_ml_event_data(custom_event_data, agent_run_id):
resource = create_resource()
ml_events = []
# An InferenceEvent is attached to a separate ML Model entity instead
# of the APM entity.
ml_inference_events = []
ml_apm_events = []
for event in custom_event_data:
event_info, event_attrs = event
event_type = event_info["type"]
event_attrs.update(
{
"real_agent_id": agent_run_id,
"event.domain": "newrelic.ml_events",
"event.name": event_info["type"],
"event.name": event_type,
}
)
ml_attrs = create_key_values_from_iterable(event_attrs)
unix_nano_timestamp = event_info["timestamp"] * 1e6
ml_events.append(
{
"time_unix_nano": int(unix_nano_timestamp),
"attributes": ml_attrs,
}
if event_type == "InferenceEvent":
ml_inference_events.append(
{
"time_unix_nano": int(unix_nano_timestamp),
"attributes": ml_attrs,
}
)
else:
ml_apm_events.append(
{
"time_unix_nano": int(unix_nano_timestamp),
"attributes": ml_attrs,
}
)

resource_logs = []
if ml_inference_events:
inference_resource = create_resource(attach_apm_entity=False)
resource_logs.append(
ResourceLogs(resource=inference_resource, scope_logs=[ScopeLogs(log_records=ml_inference_events)])
)
if ml_apm_events:
apm_resource = create_resource()
resource_logs.append(ResourceLogs(resource=apm_resource, scope_logs=[ScopeLogs(log_records=ml_apm_events)]))

return LogsData(resource_logs=[ResourceLogs(resource=resource, scope_logs=[ScopeLogs(log_records=ml_events)])])
return LogsData(resource_logs=resource_logs)
133 changes: 129 additions & 4 deletions tests/agent_features/test_ml_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,94 @@ def core_app(collector_agent_registration):


@validate_ml_event_payload(
[{"foo": "bar", "real_agent_id": "1234567", "event.domain": "newrelic.ml_events", "event.name": "InferenceEvent"}]
{
"apm": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "MyCustomEvent",
}
]
}
)
@reset_core_stats_engine()
def test_ml_event_payload_inside_transaction(core_app):
def test_ml_event_payload_noninference_event_inside_transaction(core_app):
@background_task(name="test_ml_event_payload_inside_transaction")
def _test():
record_ml_event("MyCustomEvent", {"foo": "bar"})

_test()
core_app.harvest()


@validate_ml_event_payload(
{
"inference": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "InferenceEvent",
}
]
}
)
@reset_core_stats_engine()
def test_ml_event_payload_inference_event_inside_transaction(core_app):
@background_task(name="test_ml_event_payload_inside_transaction")
def _test():
record_ml_event("InferenceEvent", {"foo": "bar"})

_test()
core_app.harvest()


@validate_ml_event_payload(
{
"apm": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "MyCustomEvent",
}
],
"inference": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "InferenceEvent",
}
],
}
)
@reset_core_stats_engine()
def test_ml_event_payload_both_events_inside_transaction(core_app):
@background_task(name="test_ml_event_payload_inside_transaction")
def _test():
record_ml_event("InferenceEvent", {"foo": "bar"})
record_ml_event("MyCustomEvent", {"foo": "bar"})

_test()
core_app.harvest()


@validate_ml_event_payload(
[{"foo": "bar", "real_agent_id": "1234567", "event.domain": "newrelic.ml_events", "event.name": "InferenceEvent"}]
{
"inference": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "InferenceEvent",
}
]
}
)
@reset_core_stats_engine()
def test_ml_event_payload_outside_transaction(core_app):
def test_ml_event_payload_inference_event_outside_transaction(core_app):
def _test():
app = application()
record_ml_event("InferenceEvent", {"foo": "bar"}, application=app)
Expand All @@ -83,6 +154,59 @@ def _test():
core_app.harvest()


@validate_ml_event_payload(
{
"apm": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "MyCustomEvent",
}
],
"inference": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "InferenceEvent",
}
],
}
)
@reset_core_stats_engine()
def test_ml_event_payload_both_events_outside_transaction(core_app):
def _test():
app = application()
record_ml_event("InferenceEvent", {"foo": "bar"}, application=app)
record_ml_event("MyCustomEvent", {"foo": "bar"}, application=app)

_test()
core_app.harvest()


@validate_ml_event_payload(
{
"apm": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "MyCustomEvent",
}
]
}
)
@reset_core_stats_engine()
def test_ml_event_payload_noninference_event_outside_transaction(core_app):
def _test():
app = application()
record_ml_event("MyCustomEvent", {"foo": "bar"}, application=app)

_test()
core_app.harvest()


@pytest.mark.parametrize(
"params,expected",
[
Expand Down Expand Up @@ -151,6 +275,7 @@ def test_record_ml_event_outside_transaction_params_not_a_dict():

# Tests for ML Events configuration settings


@override_application_settings({"ml_insights_events.enabled": False})
@reset_core_stats_engine()
@validate_ml_event_count(count=0)
Expand Down
82 changes: 55 additions & 27 deletions tests/testing_support/validators/validate_ml_event_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,36 @@ def payload_to_ml_events(payload):
else:
message = payload

resource_logs = message.get("resource_logs")
assert len(resource_logs) == 1
resource_logs = resource_logs[0]
resource = resource_logs.get("resource")
assert resource and resource.get("attributes")[0] == {
"key": "instrumentation.provider",
"value": {"string_value": "newrelic-opentelemetry-python-ml"},
}
scope_logs = resource_logs.get("scope_logs")
assert len(scope_logs) == 1
scope_logs = scope_logs[0]

scope = scope_logs.get("scope")
assert scope is None
logs = scope_logs.get("log_records")

return logs
inference_logs = []
apm_logs = []
resource_log_records = message.get("resource_logs")
for resource_logs in resource_log_records:
resource = resource_logs.get("resource")
assert resource
resource_attrs = resource.get("attributes")
assert {
"key": "instrumentation.provider",
"value": {"string_value": "newrelic-opentelemetry-python-ml"},
} in resource_attrs
scope_logs = resource_logs.get("scope_logs")
assert len(scope_logs) == 1
scope_logs = scope_logs[0]

scope = scope_logs.get("scope")
assert scope is None
logs = scope_logs.get("log_records")
event_name = get_event_name(logs)
if event_name == "InferenceEvent":
inference_logs = logs
else:
# Make sure apm entity attrs are present on the resource.
expected_apm_keys = ("entity.type", "entity.name", "entity.guid", "hostname", "instrumentation.provider")
assert all(attr["key"] in expected_apm_keys for attr in resource_attrs)
assert all(attr["value"] not in ("", None) for attr in resource_attrs)

apm_logs = logs

return inference_logs, apm_logs


def validate_ml_event_payload(ml_events=None):
Expand Down Expand Up @@ -86,19 +99,34 @@ def _bind_params(method, payload=(), *args, **kwargs):
assert recorded_ml_events

decoded_payloads = [payload_to_ml_events(payload) for payload in recorded_ml_events]
all_logs = []
for sent_logs in decoded_payloads:
for data_point in sent_logs:
for key in ("time_unix_nano",):
assert key in data_point, "Invalid log format. Missing key: %s" % key
decoded_inference_payloads = [payload[0] for payload in decoded_payloads]
decoded_apm_payloads = [payload[1] for payload in decoded_payloads]
all_apm_logs = normalize_logs(decoded_apm_payloads)
all_inference_logs = normalize_logs(decoded_inference_payloads)

for expected_event in ml_events.get("inference", []):
assert expected_event in all_inference_logs, "%s Not Found. Got: %s" % (expected_event, all_inference_logs)

for expected_event in ml_events.get("apm", []):
assert expected_event in all_apm_logs, "%s Not Found. Got: %s" % (expected_event, all_apm_logs)
return val

return _validate_wrapper


def normalize_logs(decoded_payloads):
all_logs = []
for sent_logs in decoded_payloads:
for data_point in sent_logs:
for key in ("time_unix_nano",):
assert key in data_point, "Invalid log format. Missing key: %s" % key
all_logs.append(
{attr["key"]: attribute_to_value(attr["value"]) for attr in (data_point.get("attributes") or [])}
)
return all_logs

for expected_event in ml_events:
assert expected_event in all_logs, "%s Not Found. Got: %s" % (expected_event, all_logs)

return val

return _validate_wrapper
def get_event_name(logs):
for attr in logs[0]["attributes"]:
if attr["key"] == "event.name":
return attr["value"]["string_value"]

0 comments on commit 98a0911

Please sign in to comment.