Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
elijahbenizzy committed Aug 26, 2024
1 parent 1b30a92 commit 77ee10f
Show file tree
Hide file tree
Showing 72 changed files with 1,490 additions and 1,264 deletions.
8 changes: 4 additions & 4 deletions burr/cli/demo_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,14 @@ def generate_rag_data(
def generate_all(
data_dir: Optional[str] = None, s3_bucket: Optional[str] = None, unique_app_names: bool = False
):
logger.info("Generating chatbot data")
generate_chatbot_data(
data_dir=data_dir, s3_bucket=s3_bucket, use_traces=False, unique_app_names=unique_app_names
)
logger.info("Generating chatbot data with traces")
generate_chatbot_data(
data_dir=data_dir, s3_bucket=s3_bucket, use_traces=True, unique_app_names=unique_app_names
)
logger.info("Generating chatbot data")
generate_chatbot_data(
data_dir=data_dir, s3_bucket=s3_bucket, use_traces=False, unique_app_names=unique_app_names
)
logger.info("Generating counter data")
generate_counter_data(data_dir=data_dir, s3_bucket=s3_bucket, unique_app_names=unique_app_names)
logger.info("Generating RAG data")
Expand Down
61 changes: 61 additions & 0 deletions burr/integrations/opentelemetry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import dataclasses
import datetime
import json
import logging
import random
Expand Down Expand Up @@ -232,8 +233,68 @@ class OpenTelemetryTracker(
Note that this globally sets a tracer provider -- it is possible that this will interfere with
other tracers, and we are actively investigating it.
TODO -- add stream start/end to opentel + TTFS, etc...
"""

def pre_start_stream(
self,
*,
action: str,
sequence_id: int,
app_id: str,
partition_key: Optional[str],
**future_kwargs: Any,
):
return self.burr_tracker.pre_start_stream(
action=action,
sequence_id=sequence_id,
app_id=app_id,
partition_key=partition_key,
**future_kwargs,
)

def post_stream_item(
self,
*,
item: Any,
item_index: int,
stream_initialize_time: datetime.datetime,
first_stream_item_start_time: datetime.datetime,
action: str,
sequence_id: int,
app_id: str,
partition_key: Optional[str],
**future_kwargs: Any,
):
return self.burr_tracker.post_stream_item(
item=item,
item_index=item_index,
stream_initialize_time=stream_initialize_time,
first_stream_item_start_time=first_stream_item_start_time,
action=action,
sequence_id=sequence_id,
app_id=app_id,
partition_key=partition_key,
**future_kwargs,
)

def post_end_stream(
self,
*,
action: str,
sequence_id: int,
app_id: str,
partition_key: Optional[str],
**future_kwargs: Any,
):
return self.burr_tracker.post_end_stream(
action=action,
sequence_id=sequence_id,
app_id=app_id,
partition_key=partition_key,
**future_kwargs,
)

def __init__(self, burr_tracker: SyncTrackingClient):
initialize_tracer()
self.tracer = trace.get_tracer("burr.integrations.opentelemetry")
Expand Down
10 changes: 9 additions & 1 deletion burr/tracking/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
PreRunStepHook,
PreStartSpanHook,
)
from burr.lifecycle.base import DoLogAttributeHook
from burr.lifecycle.base import (
DoLogAttributeHook,
PostEndStreamHook,
PostStreamItemHook,
PreStartStreamHook,
)


class SyncTrackingClient(
Expand All @@ -17,6 +22,9 @@ class SyncTrackingClient(
PreStartSpanHook,
PostEndSpanHook,
DoLogAttributeHook,
PreStartStreamHook,
PostStreamItemHook,
PostEndStreamHook,
abc.ABC,
):
"""Base class for synchronous tracking clients. All tracking clients must implement from this
Expand Down
1 change: 1 addition & 0 deletions burr/tracking/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ def do_log_attributes(
span_id=span.uid if span is not None else None,
value=serde.serialize(attribute, **self.serde_kwargs),
tags=tags,
time_logged=system.now(),
)
self._append_write_line(attribute_model)

Expand Down
1 change: 1 addition & 0 deletions burr/tracking/common/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class AttributeModel(IdentifyingModel):
] # It doesn't have to relate to a span, it can be at the level of an action as well
value: Union[dict, str, int, float, bool, list, None]
tags: Dict[str, str]
time_logged: Optional[datetime.datetime] = None
type: str = "attribute"

@property
Expand Down
2 changes: 2 additions & 0 deletions burr/tracking/s3client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import pydantic

from burr import system
from burr.common import types as burr_types
from burr.core import Action, ApplicationGraph, State, serde
from burr.integrations.base import require_plugin
Expand Down Expand Up @@ -137,6 +138,7 @@ def do_log_attributes(
span_id=span.uid if span is not None else None,
value=serde.serialize(attribute, **self.serde_kwargs),
tags=tags,
time_logged=system.now(),
)
self.submit_log_event(attribute_model, app_id=app_id, partition_key=partition_key)

Expand Down

This file was deleted.

Loading

0 comments on commit 77ee10f

Please sign in to comment.