Skip to content

Commit

Permalink
fix: streamed function/tool calls
Browse files Browse the repository at this point in the history
  • Loading branch information
hassiebp committed Jun 20, 2024
1 parent 9e55fb0 commit 99a6944
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 43 deletions.
109 changes: 69 additions & 40 deletions langfuse/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import copy
import logging
import types

from collections import defaultdict
from typing import List, Optional

from packaging.version import Version
Expand Down Expand Up @@ -318,12 +320,12 @@ def _create_langfuse_update(
generation.update(**update)


def _extract_openai_response(resource, responses):
completion = [] if resource.type == "chat" else ""
def _extract_streamed_openai_response(resource, chunks):
completion = defaultdict(str) if resource.type == "chat" else ""
model = None
completion_start_time = None

for index, i in enumerate(responses):
for index, i in enumerate(chunks):
if index == 0:
completion_start_time = _get_timestamp()

Expand All @@ -344,48 +346,70 @@ def _extract_openai_response(resource, responses):
delta = delta.__dict__

if delta.get("role", None) is not None:
completion.append(
{
"role": delta.get("role", None),
"function_call": None,
"tool_calls": None,
"content": None,
}
)
completion["role"] = delta["role"]

elif delta.get("content", None) is not None:
completion[-1]["content"] = (
if delta.get("content", None) is not None:
completion["content"] = (
delta.get("content", None)
if completion[-1]["content"] is None
else completion[-1]["content"] + delta.get("content", None)
if completion["content"] is None
else completion["content"] + delta.get("content", None)
)

elif delta.get("function_call", None) is not None:
completion[-1]["function_call"] = (
delta.get("function_call", None)
if completion[-1]["function_call"] is None
else completion[-1]["function_call"]
+ delta.get("function_call", None)
)
elif delta.get("tools_call", None) is not None:
completion[-1]["tool_calls"] = (
delta.get("tools_call", None)
if completion[-1]["tool_calls"] is None
else completion[-1]["tool_calls"]
+ delta.get("tools_call", None)
curr = completion["function_call"]
tool_call_chunk = delta.get("function_call", None)

if not curr:
completion["function_call"] = {
"name": getattr(tool_call_chunk, "name", ""),
"arguments": getattr(tool_call_chunk, "arguments", ""),
}

else:
curr["name"] = curr["name"] or getattr(
tool_call_chunk, "name", None
)
curr["arguments"] += getattr(tool_call_chunk, "arguments", "")

elif delta.get("tool_calls", None) is not None:
curr = completion["tool_calls"]
tool_call_chunk = getattr(
delta.get("tool_calls", None)[0], "function", None
)

if not curr:
completion["tool_calls"] = {
"name": getattr(tool_call_chunk, "name", ""),
"arguments": getattr(tool_call_chunk, "arguments", ""),
}

else:
curr["name"] = curr["name"] or getattr(
tool_call_chunk, "name", None
)
curr["arguments"] += getattr(tool_call_chunk, "arguments", None)

if resource.type == "completion":
completion += choice.get("text", None)

def get_response_for_chat():
if len(completion) > 0:
if completion[-1].get("content", None) is not None:
return completion[-1]["content"]
elif completion[-1].get("function_call", None) is not None:
return completion[-1]["function_call"]
elif completion[-1].get("tool_calls", None) is not None:
return completion[-1]["tool_calls"]
return None
return (
completion["content"]
or (
completion["function_call"]
and {
"role": "assistant",
"function_call": completion["function_call"],
}
)
or (
completion["tool_calls"]
and {
"role": "assistant",
"tool_calls": [{"function": completion["tool_calls"]}],
}
)
or None
)

return (
model,
Expand Down Expand Up @@ -593,8 +617,13 @@ def _filter_image_data(messages: List[dict]):
output_messages = copy.deepcopy(messages)

for message in output_messages:
if message.get("content", None) is not None:
content = message["content"]
content = (
message.get("content", None)
if isinstance(message, dict)
else getattr(message, "content", None)
)

if content is not None:
for index, item in enumerate(content):
if isinstance(item, dict) and item.get("image_url", None) is not None:
url = item["image_url"]["url"]
Expand Down Expand Up @@ -638,7 +667,7 @@ def __exit__(self, exc_type, exc_value, traceback):
pass

def _finalize(self):
model, completion_start_time, completion = _extract_openai_response(
model, completion_start_time, completion = _extract_streamed_openai_response(
self.resource, self.items
)

Expand Down Expand Up @@ -685,7 +714,7 @@ async def __aexit__(self, exc_type, exc_value, traceback):
pass

async def _finalize(self):
model, completion_start_time, completion = _extract_openai_response(
model, completion_start_time, completion = _extract_streamed_openai_response(
self.resource, self.items
)

Expand Down
126 changes: 123 additions & 3 deletions tests/test_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
_filter_image_data,
openai,
)
from openai.types.chat.chat_completion import ChatCompletionMessage

from tests.utils import create_uuid, get_api

chat_func = (
Expand All @@ -32,7 +34,12 @@ def test_openai_chat_completion():
completion = chat_func(
name=generation_name,
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "1 + 1 = "}],
messages=[
ChatCompletionMessage(
role="assistant", content="You are an expert mathematician"
),
{"role": "user", "content": "1 + 1 = "},
],
temperature=0,
metadata={"someKey": "someResponse"},
)
Expand All @@ -45,7 +52,15 @@ def test_openai_chat_completion():
assert generation.data[0].name == generation_name
assert generation.data[0].metadata == {"someKey": "someResponse"}
assert len(completion.choices) != 0
assert generation.data[0].input == [{"content": "1 + 1 = ", "role": "user"}]
assert generation.data[0].input == [
{
"content": "You are an expert mathematician",
"function_call": None,
"role": "assistant",
"tool_calls": None,
},
{"content": "1 + 1 = ", "role": "user"},
]
assert generation.data[0].type == "GENERATION"
assert generation.data[0].model == "gpt-3.5-turbo-0125"
assert generation.data[0].start_time is not None
Expand All @@ -65,7 +80,15 @@ def test_openai_chat_completion():
assert generation.data[0].output["role"] == "assistant"

trace = api.trace.get(generation.data[0].trace_id)
assert trace.input == [{"role": "user", "content": "1 + 1 = "}]
assert trace.input == [
{
"content": "You are an expert mathematician",
"function_call": None,
"role": "assistant",
"tool_calls": None,
},
{"role": "user", "content": "1 + 1 = "},
]
assert trace.output["content"] == completion.choices[0].message.content
assert trace.output["role"] == completion.choices[0].message.role

Expand Down Expand Up @@ -754,6 +777,47 @@ class StepByStepAIResponse(BaseModel):
assert output["title"] is not None


def test_openai_function_call_streamed():
from typing import List

from pydantic import BaseModel

api = get_api()
generation_name = create_uuid()

class StepByStepAIResponse(BaseModel):
title: str
steps: List[str]

response = openai.chat.completions.create(
name=generation_name,
model="gpt-3.5-turbo-0613",
messages=[{"role": "user", "content": "Explain how to assemble a PC"}],
functions=[
{
"name": "get_answer_for_user_query",
"description": "Get user answer in series of steps",
"parameters": StepByStepAIResponse.schema(),
}
],
function_call={"name": "get_answer_for_user_query"},
stream=True,
)

# Consume the stream
for _ in response:
pass

openai.flush_langfuse()

generation = api.observations.get_many(name=generation_name, type="GENERATION")

assert len(generation.data) != 0
assert generation.data[0].name == generation_name
assert generation.data[0].output is not None
assert "function_call" in generation.data[0].output


def test_openai_tool_call():
api = get_api()
generation_name = create_uuid()
Expand Down Expand Up @@ -806,6 +870,62 @@ def test_openai_tool_call():
assert generation.data[0].input["messages"] == messages


def test_openai_tool_call_streamed():
api = get_api()
generation_name = create_uuid()

tools = [
{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location"],
},
},
}
]
messages = [{"role": "user", "content": "What's the weather like in Boston today?"}]
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
tools=tools,
tool_choice="required",
name=generation_name,
stream=True,
)

# Consume the stream
for _ in response:
pass

openai.flush_langfuse()

generation = api.observations.get_many(name=generation_name, type="GENERATION")

assert len(generation.data) != 0
assert generation.data[0].name == generation_name

assert (
generation.data[0].output["tool_calls"][0]["function"]["name"]
== "get_current_weather"
)
assert (
generation.data[0].output["tool_calls"][0]["function"]["arguments"] is not None
)
assert generation.data[0].input["tools"] == tools
assert generation.data[0].input["messages"] == messages


def test_azure():
api = get_api()
generation_name = create_uuid()
Expand Down

0 comments on commit 99a6944

Please sign in to comment.