Skip to content

Commit

Permalink
feat: Update controller in realtime analytics #26 (#27)
Browse files Browse the repository at this point in the history
* feat: Update controller in realtime analytics #26

* fix: CVE-2023-50447

* fix: resolve PR comments

* fix: resolve PR comments

* chore: reformat files

* chore: remove the field model_price

* fix: unify field names along with Core logs

---------

Co-authored-by: Aliaksandr Stsiapanay <[email protected]>
  • Loading branch information
astsiapanay and astsiapanay authored Jan 24, 2024
1 parent 4ec210d commit a35e08e
Show file tree
Hide file tree
Showing 6 changed files with 1,496 additions and 909 deletions.
56 changes: 51 additions & 5 deletions aidial_analytics_realtime/analytics.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from decimal import Decimal
from enum import Enum
from logging import Logger
from typing import Awaitable, Callable
Expand Down Expand Up @@ -66,6 +67,8 @@ def make_point(
usage: dict | None,
topic_model: TopicModel,
rates_calculator: RatesCalculator,
parent_deployment: str | None,
trace: dict | None,
):
topic = None
response_content = ""
Expand All @@ -90,14 +93,29 @@ def make_point(
else "\n\n".join(request["input"])
)

price = rates_calculator.calculate_price(
deployment, model, request_content, response_content, usage
)
price = Decimal(0)
if usage is not None and usage.get("price") is not None:
price = usage["price"]
else:
price = rates_calculator.calculate_price(
deployment, model, request_content, response_content, usage
)

point = (
Point("analytics")
.tag("model", model)
.tag("deployment", deployment)
.tag("parent_deployment", to_string(parent_deployment))
.tag("trace_id", "undefined" if not trace else trace["trace_id"])
.tag(
"core_span_id", "undefined" if not trace else trace["core_span_id"]
)
.tag(
"core_parent_span_id",
"undefined"
if not trace
else to_string(trace.get("core_parent_span_id", None)),
)
.tag("project_id", project_id)
.tag(
"language",
Expand Down Expand Up @@ -177,12 +195,34 @@ async def on_message(
type: RequestType,
topic_model: TopicModel,
rates_calculator: RatesCalculator,
token_usage: dict | None,
parent_deployment: str | None,
trace: dict | None,
):
logger.info(f"Chat completion response length {len(response)}")

usage_per_model = await parse_usage_per_model(response)

if len(usage_per_model) == 0:
if token_usage is not None:
point = make_point(
deployment,
model,
project_id,
chat_id,
upstream_url,
user_hash,
user_title,
timestamp,
request,
response,
type,
token_usage,
topic_model,
rates_calculator,
parent_deployment,
trace,
)
await influx_writer(point)
elif len(usage_per_model) == 0:
point = make_point(
deployment,
model,
Expand All @@ -198,6 +238,8 @@ async def on_message(
response.get("usage", None),
topic_model,
rates_calculator,
parent_deployment,
trace,
)
await influx_writer(point)
else:
Expand All @@ -216,6 +258,8 @@ async def on_message(
None,
topic_model,
rates_calculator,
parent_deployment,
trace,
)
await influx_writer(point)

Expand All @@ -235,5 +279,7 @@ async def on_message(
usage,
topic_model,
rates_calculator,
parent_deployment,
trace,
)
await influx_writer(point)
22 changes: 22 additions & 0 deletions aidial_analytics_realtime/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ async def on_chat_completion_message(
influx_writer: InfluxWriterAsync,
topic_model: TopicModel,
rates_calculator: RatesCalculator,
token_usage: dict | None,
parent_deployment: str | None,
trace: dict | None,
):
if response["status"] != "200":
return
Expand Down Expand Up @@ -118,6 +121,9 @@ async def on_chat_completion_message(
RequestType.CHAT_COMPLETION,
topic_model,
rates_calculator,
token_usage,
parent_deployment,
trace,
)


Expand All @@ -134,6 +140,9 @@ async def on_embedding_message(
influx_writer: InfluxWriterAsync,
topic_model: TopicModel,
rates_calculator: RatesCalculator,
token_usage: dict | None,
parent_deployment: str | None,
trace: dict | None,
):
if response["status"] != "200":
return
Expand All @@ -154,6 +163,9 @@ async def on_embedding_message(
RequestType.EMBEDDING,
topic_model,
rates_calculator,
token_usage,
parent_deployment,
trace,
)


Expand All @@ -176,6 +188,10 @@ async def on_log_message(

timestamp = parse_time(request["time"])

token_usage = message.get("token_usage", None)
trace = message.get("trace", None)
parent_deployment = message.get("parent_deployment", None)

match = re.search(RATE_PATTERN, uri)
if match:
await on_rate_message(request, response)
Expand All @@ -196,6 +212,9 @@ async def on_log_message(
influx_writer,
topic_model,
rates_calculator,
token_usage,
parent_deployment,
trace,
)

match = re.search(EMBEDDING_PATTERN, uri)
Expand All @@ -214,6 +233,9 @@ async def on_log_message(
influx_writer,
topic_model,
rates_calculator,
token_usage,
parent_deployment,
trace,
)


Expand Down
Loading

0 comments on commit a35e08e

Please sign in to comment.