Skip to content

Commit

Permalink
Add basic tracing for InvokeModelWithResponseStream
Browse files Browse the repository at this point in the history
  • Loading branch information
xrmx committed Jan 24, 2025
1 parent bd8a50f commit 611578a
Show file tree
Hide file tree
Showing 10 changed files with 716 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Available examples
- `converse.py` uses `bedrock-runtime` `Converse API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html>_`.
- `converse_stream.py` uses `bedrock-runtime` `ConverseStream API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html>_`.
- `invoke_model.py` uses `bedrock-runtime` `InvokeModel API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html>_`.
- `invoke_model_stream.py` uses `bedrock-runtime` `InvokeModelWithResponseStrea API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModelWithResponseStream.html>_`.

Setup
-----
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import json
import os

import boto3


def main():
chat_model = os.getenv("CHAT_MODEL", "amazon.titan-text-lite-v1")
prompt = "Write a short poem on OpenTelemetry."
if "amazon.titan" in chat_model:
body = {
"inputText": prompt,
"textGenerationConfig": {},
}
elif "amazon.nova" in chat_model:
body = {
"messages": [{"role": "user", "content": [{"text": prompt}]}],
"schemaVersion": "messages-v1",
}
elif "anthropic.claude" in chat_model:
body = {
"messages": [
{"role": "user", "content": [{"text": prompt, "type": "text"}]}
],
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 200,
}
else:
raise ValueError()
client = boto3.client("bedrock-runtime")
response = client.invoke_model_with_response_stream(
modelId=chat_model,
body=json.dumps(body),
)

answer = ""
for event in response["body"]:
json_bytes = event.get("chunk", {}).get("bytes", b"")
decoded = json_bytes.decode("utf-8")
chunk = json.loads(decoded)
if "outputText" in chunk:
answer += chunk["outputText"]
elif "completion" in chunk:
answer += chunk["completion"]
elif "contentBlockDelta" in chunk:
answer += chunk["contentBlockDelta"]["delta"]["text"]
print(answer)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from opentelemetry.instrumentation.botocore.extensions.bedrock_utils import (
ConverseStreamWrapper,
InvokeModelWithResponseStreamWrapper,
)
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
Expand Down Expand Up @@ -66,8 +67,16 @@ class _BedrockRuntimeExtension(_AwsSdkExtension):
Amazon Bedrock Runtime</a>.
"""

_HANDLED_OPERATIONS = {"Converse", "ConverseStream", "InvokeModel"}
_DONT_CLOSE_SPAN_ON_END_OPERATIONS = {"ConverseStream"}
_HANDLED_OPERATIONS = {
"Converse",
"ConverseStream",
"InvokeModel",
"InvokeModelWithResponseStream",
}
_DONT_CLOSE_SPAN_ON_END_OPERATIONS = {
"ConverseStream",
"InvokeModelWithResponseStream",
}

def should_end_span_on_exit(self):
return (
Expand Down Expand Up @@ -288,6 +297,20 @@ def stream_done_callback(response):
# InvokeModel
if "body" in result and isinstance(result["body"], StreamingBody):
self._invoke_model_on_success(span, result, model_id)
return

# InvokeModelWithResponseStream
if "body" in result and isinstance(result["body"], EventStream):

def invoke_model_stream_done_callback(response):
# the callback gets data formatted as the simpler converse API
self._converse_on_success(span, response)
span.end()

result["body"] = InvokeModelWithResponseStreamWrapper(
result["body"], invoke_model_stream_done_callback, model_id
)
return

# pylint: disable=no-self-use
def _handle_amazon_titan_response(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from __future__ import annotations

import json

from botocore.eventstream import EventStream
from wrapt import ObjectProxy

Expand Down Expand Up @@ -46,20 +48,21 @@ def __iter__(self):
def _process_event(self, event):
if "messageStart" in event:
# {'messageStart': {'role': 'assistant'}}
pass
return

if "contentBlockDelta" in event:
# {'contentBlockDelta': {'delta': {'text': "Hello"}, 'contentBlockIndex': 0}}
pass
return

if "contentBlockStop" in event:
# {'contentBlockStop': {'contentBlockIndex': 0}}
pass
return

if "messageStop" in event:
# {'messageStop': {'stopReason': 'end_turn'}}
if stop_reason := event["messageStop"].get("stopReason"):
self._response["stopReason"] = stop_reason
return

if "metadata" in event:
# {'metadata': {'usage': {'inputTokens': 12, 'outputTokens': 15, 'totalTokens': 27}, 'metrics': {'latencyMs': 2980}}}
Expand All @@ -72,3 +75,135 @@ def _process_event(self, event):
self._response["usage"]["outputTokens"] = output_tokens

self._stream_done_callback(self._response)
return


# pylint: disable=abstract-method
class InvokeModelWithResponseStreamWrapper(ObjectProxy):
"""Wrapper for botocore.eventstream.EventStream"""

def __init__(
self,
stream: EventStream,
stream_done_callback,
model_id: str,
):
super().__init__(stream)

self._stream_done_callback = stream_done_callback
self._model_id = model_id

# accumulating things in the same shape of the Converse API
# {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"}
self._response = {}

def __iter__(self):
for event in self.__wrapped__:
self._process_event(event)
yield event

def _process_event(self, event):
if "chunk" not in event:
return

json_bytes = event["chunk"].get("bytes", b"")
decoded = json_bytes.decode("utf-8")
try:
chunk = json.loads(decoded)
except json.JSONDecodeError:
return

if "amazon.titan" in self._model_id:
self._process_amazon_titan_chunk(chunk)
elif "amazon.nova" in self._model_id:
self._process_amazon_nova_chunk(chunk)
elif "anthropic.claude" in self._model_id:
self._process_anthropic_claude_chunk(chunk)

def _process_invocation_metrics(self, invocation_metrics):
self._response["usage"] = {}
if input_tokens := invocation_metrics.get("inputTokenCount"):
self._response["usage"]["inputTokens"] = input_tokens

if output_tokens := invocation_metrics.get("outputTokenCount"):
self._response["usage"]["outputTokens"] = output_tokens

def _process_amazon_titan_chunk(self, chunk):
if (stop_reason := chunk.get("completionReason")) is not None:
self._response["stopReason"] = stop_reason

if invocation_metrics := chunk.get("amazon-bedrock-invocationMetrics"):
# "amazon-bedrock-invocationMetrics":{
# "inputTokenCount":9,"outputTokenCount":128,"invocationLatency":3569,"firstByteLatency":2180
# }
self._process_invocation_metrics(invocation_metrics)
self._stream_done_callback(self._response)

def _process_amazon_nova_chunk(self, chunk):
if "messageStart" in chunk:
# {'messageStart': {'role': 'assistant'}}
return

if "contentBlockDelta" in chunk:
# {'contentBlockDelta': {'delta': {'text': "Hello"}, 'contentBlockIndex': 0}}
return

if "contentBlockStop" in chunk:
# {'contentBlockStop': {'contentBlockIndex': 0}}
return

if "messageStop" in chunk:
# {'messageStop': {'stopReason': 'end_turn'}}
if stop_reason := chunk["messageStop"].get("stopReason"):
self._response["stopReason"] = stop_reason
return

if "metadata" in chunk:
# {'metadata': {'usage': {'inputTokens': 8, 'outputTokens': 117}, 'metrics': {}, 'trace': {}}}
if usage := chunk["metadata"].get("usage"):
self._response["usage"] = {}
if input_tokens := usage.get("inputTokens"):
self._response["usage"]["inputTokens"] = input_tokens

if output_tokens := usage.get("outputTokens"):
self._response["usage"]["outputTokens"] = output_tokens

self._stream_done_callback(self._response)
return

def _process_anthropic_claude_chunk(self, chunk):
if not (message_type := chunk.get("type")):
return

if message_type == "message_start":
# {'type': 'message_start', 'message': {'id': 'id', 'type': 'message', 'role': 'assistant', 'model': 'claude-2.0', 'content': [], 'stop_reason': None, 'stop_sequence': None, 'usage': {'input_tokens': 18, 'output_tokens': 1}}}
return

if message_type == "content_block_start":
# {'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}}
return

if message_type == "content_block_delta":
# {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Here'}}
return

if message_type == "content_block_stop":
# {'type': 'content_block_stop', 'index': 0}
return

if message_type == "message_delta":
# {'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None}, 'usage': {'output_tokens': 123}}
if (
stop_reason := chunk.get("delta", {}).get("stop_reason")
) is not None:
self._response["stopReason"] = stop_reason
return

if message_type == "message_stop":
# {'type': 'message_stop', 'amazon-bedrock-invocationMetrics': {'inputTokenCount': 18, 'outputTokenCount': 123, 'invocationLatency': 5250, 'firstByteLatency': 290}}
if invocation_metrics := chunk.get(
"amazon-bedrock-invocationMetrics"
):
self._process_invocation_metrics(invocation_metrics)
self._stream_done_callback(self._response)
return
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def assert_converse_completion_attributes(
)


def assert_converse_stream_completion_attributes(
def assert_stream_completion_attributes(
span: ReadableSpan,
request_model: str,
input_tokens: int | None = None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
interactions:
- request:
body: null
headers:
Content-Length:
- '0'
User-Agent:
- !!binary |
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
X-Amz-Date:
- !!binary |
MjAyNTAxMjRUMTM0NDM5Wg==
X-Amz-Security-Token:
- test_aws_security_token
X-Amzn-Trace-Id:
- !!binary |
Um9vdD0xLTFlMjljM2Y1LTU2MzZhOWI4MmViYTYxOTFiOTcwOTI2YTtQYXJlbnQ9NzA1NzBlZjUy
YzJkZjliYjtTYW1wbGVkPTE=
amz-sdk-invocation-id:
- !!binary |
ZDg2MjFlMzAtNTk3Yi00ZWM3LWJlNGEtMThkMDQwZTRhMzcw
amz-sdk-request:
- !!binary |
YXR0ZW1wdD0x
authorization:
- Bearer test_aws_authorization
method: POST
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/does-not-exist/invoke-with-response-stream
response:
body:
string: '{"message":"The provided model identifier is invalid."}'
headers:
Connection:
- keep-alive
Content-Length:
- '55'
Content-Type:
- application/json
Date:
- Fri, 24 Jan 2025 13:44:40 GMT
Set-Cookie: test_set_cookie
x-amzn-ErrorType:
- ValidationException:http://internal.amazon.com/coral/com.amazon.bedrock/
x-amzn-RequestId:
- 6460a108-875d-4e26-bcdf-f03c4c815f74
status:
code: 400
message: Bad Request
version: 1
Loading

0 comments on commit 611578a

Please sign in to comment.