Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Library for uploading blobs as part of instrumentation #GenAI #MultiModal #3065

Open
michaelsafyan opened this issue Dec 3, 2024 · 17 comments
Labels
feature-request gen-ai Related to generative AI

Comments

@michaelsafyan
Copy link

michaelsafyan commented Dec 3, 2024

What problem do you want to solve?

Many (if not most) observability backends are not built to accept arbitrarily large blobs.

However, GenAI observability introduces a use case where users want to log full prompt/response data. In "multi-modal" use cases where large files (PDFs, PNGs, JPEGs, etc.) are involved, users still want these to be recorded. An example use case: the user prompts an LLM with a PDF and asks a question about it, asking to generate an infographic summarizing the essential details; the LLM responds with summary text as well as a JPEG infographic summarizing the salient points. GenAI Semantic conventions are aiming towards recording the prompt/response details in event bodies (which end up in logs), but many logging backends are not capable of receiving these large blob payloads.

In OpenLLMetry, this has been addressed with an ImageUploader, which is very specific to the Traceloop backend. It is likely that generic instrumentation of GenAI frameworks (such as moving frameworks from OpenLLMetry to the OTel Python repo) may require a more generic alternative to provide such capability/functionality.

Describe the solution you'd like

At a high-level, I would like to separate this out into:

  1. A consumption interface that is aimed at implementors of instrumentation packages.
  2. A producer interface/library that is aimed at those trying to provide a storage backend for such blobs.
  3. Default implementations that use the above to provide useful out-of-the-box functionality
  4. Conventions for actually making use of this mechanism

Common

NOT_UPLOADED

A constant used as a response when failing to upload.

NOT_UPLOADED = '/dev/null'

Blob

Encapsulates the raw payload together with associated properties:

    class Blob(object):

        def __init__(self, raw_bytes: bytes, content_type: Optional[str]=None, labels: Optional[dict]=None):
            ...

        @staticmethod
        def from_data_uri(cls, uri: str, labels: Optional[dict]=None) -> Blob:
          ...

        @property
        def raw_bytes(self) -> bytes:
            ...
  
        @property
        def content_type(self) -> Optional[str]:
            ...

         @property
         def labels(self) -> dict:
             ... 

The from_data_uri function can construct a Blob from a URI like data:image/jpeg;base64,... (or other, similar, base64-encoded data URIs).

Consumption Interface

get_blob_uploader

A function used in instrumentation to retrieve the uploader for a certain context/usage:

   def get_blob_uploader(use_case: Optional[str] = None) -> BlobUploader:
       ...

This allows different uploaders to be configured for different contexts/usages (think the logger library in Python). It always returns an uploader of some kind (falling back to a default that reports an error and drops the data).

BlobUploader

Provides a way to upload data asynchronously, getting the URL to which the data is expected to land.

class BlobUploader(ABC):

   @abstractmethod
   def upload_async(self, blob: Blob) -> str:
         ...

The upload_async function returns quickly with a URL where the data is expected to get uploaded and, in the background, will attempt to write the data to the returned URL. May return NOT_UPLOADED if uploading is disabled.

Helpers: detect_content_type, generate_labels_for_X

If instrumentation does not know the content type, it may use the following to generate it:

   def detect_content_type(raw_bytes: bytes) -> str:
      ...

Instrumentation should use helpers such as the below to populate a minimum set of labels:

    def generate_labels_for_span(trace_id, span_id) -> dict:
       ...

    def generate_labels_for_span_event(trace_id, span_id, event_name, event_index) -> dict:
       ...

    ...

The above ensures that labels includes a minimum subset of metadata needed to tie back to the original source.

Provider Interface

SimpleBlobUploader

Base class that providers can use to implement blob uploading without dealing with asynchronous processing, queuing, other associated details.

class SimpleBlobUploader(ABC):

   @abstractmethod
   def generate_destination_uri(self, blob: Blob) -> str
      ...

   @abstractmethod
   def upload_sync(self, uri: str, blob: Blob):
      ...

blob_uploader_from_simple_blob_uploader

Method that constructs a BlobUploader given the simpler interface:

  def blob_uploader_from_simple_blob_uploader(simple_uploader: SimpleBlobUploader) -> BlobUploader:
     ....

Default implementations

Ideally, it should be possible to specify the following environment variables:

  • BLOB_UPLOAD_ENABLED
  • BLOB_UPLOAD_URI_PREFIX

... and to have things work "out of the box" where BLOB_UPLOAD_ENABLED is true and where BLOB_UPLOAD_URI_PREFIX starts with any of the following:

  • "gs://GCS_BUCKET_NAME" (Google Cloud Storage)
  • "s3://S3_BUCKET_NAME" (Amazon S3)
  • "azblob://ACCOUNT/CONTAINER" (Azure Blob)

It should be easy for additional providers to be added, in support of additional prefixes.

Conventions

The above proposal is independent of conventions for actually putting this into use/practice.

I had originally envisioned a more expansive set of conventions for this (Blob Reference Properties), but I think it's also fine that narrower-scoped conventions are defined where needed (e.g. within GenAI, for a specific GenAI event type, proposing that a specific event body field be a URI referencing content that has been uploaded). This proposal is focused primarily on the technical details of how such uploading be performed and less on when to do so.

I'd ideally like to keep these two things orthogonal, focusing in this proposal on enabling the capability of having such properties come into existence and, separately, define when we take advantage of this capability.

Describe alternatives you've considered

Not separating out "SimpleBlobUploader" and "blob_uploader_from_simple_blob_uploader"

This would likely lead to duplication of effort related to handling of async processing, however. It might also make it difficult to centrally control configuration related to the async processing such as the size of the queue, the number of threads to dedicate to this background processing, whether to use threads or processes, etc.

Not providing "labels"

This would make it hard to include metadata in the upload that makes it possible to link from a blob to the observability that generated it. For example, suppose that GCS, S3, Azure Blob, or some other provider wanted to enable navigation from the blob in that system to the trace, span, event, etc. that generated the blob; this would be difficult without metadata.

Naming this "ImageUploader"

Although this is what OpenLLMetry calls it, the intended usage is broader in scope.

Naming this something that includes "GenAI"

Although GenAI instrumentation is the motivating use case, this could in theory be useful for other uses. Additionally, while this is for GenAI O11y, this does not make use of GenAI in its implementation which might be suggested by such a name.

Making it strictly synchronous

This is not likely going to fly in instrumentation, where blocking on the upload would introduce excessive latency.

Not having a use_case parameter to get_blob_uploader

In the short-term, this would be OK. But this could preclude future enhancements such as to dispatch to different uploader implementations depending on the caller/user of the library. For example, when debugging, it may be useful to capture only a portion of uploads to a local folder while otherwise not injecting/modifying uploads in general. Having this kind of parameter may help to implement such a use case down the line.

Raising an exception instead of returning NOT_UPLOADED

This is a viable alternative approach. However, I imagine that this could lead to accidental failure to handle the exception, whereas recording "/dev/null" as the value of a property indicating that the data never got uploaded is more likely to be OK. In this case, I'm erring on the side of failing gracefully rather than failing loudly (since the severity of the failure is not significant and can mostly be ignored).

Providing a way to be notified when the upload completes or if it fails

I think this could easily be added in the future without breaking changes, but it creates extra complexity. I'd prefer to omit this piece from an initial solution, since I think this complexity will have little return-on-investment.

Additional Context

No response

Would you like to implement a fix?

Yes

@xrmx
Copy link
Contributor

xrmx commented Dec 4, 2024

Good things is we are not tracing api using blobs at the moment :) Anyway I think this must be discussed at the genai semconv call https://docs.google.com/document/d/1EKIeDgBGXQPGehUigIRLwAUpRGa7-1kXB736EaYuJ2M/edit?tab=t.0#heading=h.ylazl6464n0c before doing anything python specific

@michaelsafyan
Copy link
Author

Thanks. I'll join the SemConv call to discuss further.

It's also recently come to my attention that there is another example of this in the e2e-gen-ai-app-starter-pack from Google (specifically in app/utils/tracing.py, albeit inlining much of the logic inside the exporter implementation.

@codefromthecrypt
Copy link
Contributor

Also, this has come up pre-otel and also for some in the ecosystem today. Here are couple notes of interest as "blob processing" isn't a new technique generally, and usually presents itself generically for http req/resp Q/A use cases which also have content types that can be huge, binary or both

Here are a couple open source variants I'm aware of from the past
expedia's blob system - pre otel, originally written by @mchandramouli here
hypertrace - otel cc @pavolloffay

@michaelsafyan
Copy link
Author

Discussed in today's GenAI SIG meeting.

Outcome of the conversation:

  • Agreed to conceptually separate these ideas:

    1. The building block to enable such uploading (this proposal)
    2. How to actually wire it up into instrumentation (some additional indirection/hook mechanism)
    3. Conventions around which event fields should be represented as URIs of uploaded content or other details of how to map multimodal use cases to this mechanism
  • Agreed to enable forward progress on this proposal with the following caveats:

    1. Should not call this directly from instrumentation (should have an additional hook indirection mechanism that is more general purpose, such as to be able to allow when/whether to do uploading in the first place).
    2. Should keep this relegated to an experimental/underscore package thereby allowing changes.
  • Suggestion that get_blob_uploader be replaced with some other mechanism to bind the hooks to specific implementations of the BlobUploader interface in auto-instrumentation

@lmolkova is this a fair summary? With regard to the last one of where / how to bind in lieue of get_blob_uploader, I'm wondering if you might be able to point me to what you had in mind? Thanks for the pointers as well as for allowing us to make forward progress on this while some of the other details remain up in the air.

@michaelsafyan
Copy link
Author

What's the right place to get started?

I'm thinking it would go something like:

... with:

  • consumption interface in api
  • helpers like SimpleBlobUploader and blob_uploader_from_simple_blob_uploader in util
  • specific storage systems (like GCS, S3, Azblob, etc.) being under backends

I'm assuming the imports would look something like:

from opentelemetry.util._blobupload.api import Blob, BlobUploader, get_blob_uploader

I'll confess I'm not terribly familiar with pyproject.toml .. any advice here?

Thanks!

@aabmass
Copy link
Member

aabmass commented Dec 13, 2024

I'm assuming the imports would look something like:

from opentelemetry.util._blobupload.api import Blob, BlobUploader, get_blob_uploader

I'll confess I'm not terribly familiar with pyproject.toml .. any advice here?

Thanks!

I think we can define the interfaces and logic inside of the opentelemetry-instrumentation package https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/opentelemetry-instrumentation which already has some instrumentation wide utils. This package is technically unstable, but it would probably be good to guard with underscore anyway.

For actual blob uploader impls, we could either use optional dependencies (pip install opentelemetry-insturmentation[blob-uploader-foo]) and define them in this package, or split them into separate artifacts like we have for exporters.

@michaelsafyan
Copy link
Author

Thanks, Aaron. I will get started on that shortly.

@codefromthecrypt
Copy link
Contributor

I think due to us having separated out the topics, the frequently requested HTTP layer req/resp logging. Is it valid to steer folks interested in that, this way?

@michaelsafyan
Copy link
Author

That would seem reasonable to me.

@michaelsafyan
Copy link
Author

Started draft of pull request here:

#3122

It's still a work in progress, but I figured a draft would be good in that it will make it easier to follow along as well as to leave comments during development.

@michaelsafyan
Copy link
Author

I still need to address some style and polish issues, but the PR is essentially there now in:

#3122

@samuelcolvin
Copy link
Contributor

I've reviewed #3122, I think there is a much simpler and more scalable approach we should take.

@adriangb
Copy link
Contributor

Thank you for working on this @michaelsafyan!
I'm very excited to see this proposal! I think this is a very useful feature.

We've been working on something very similar at Logfire. We've thought hard about this and come to the realisation that it's not as easy as it seems. I'll share some off the cuff points from these conversations that I think are worth considering:

  • This is not gen-ai specific. We've had users request e.g. to log every request/response body in a plain old web server. In my mind this makes the feature even more important and a better candidate to be a fundamental part of OpenTelemetry.
  • Users will want to query this data. For example, if you have an 80kB prompt (say 4 letters per token and 20k tokens) that might make sense to upload as a blob (many backends would currently reject spans that big, never mind the resultant 10MB export) but users might want to do a query for prompt like '%you are a professional%' or similar.
  • Frontends (as in the UI via which users interact with their data) should be able to display this data. A common request we get is to upload images and be able to visualize them seamlessly in span details.
  • Exporting sync can block the current process, but exporting async means not only complexity but also footguns with memory usage, backpressure, cross thread communication (think GIL free Python builds upcoming) and implementation complexity. For example imaging you're generating things you want to blob in a hot loop and trying to upload them. At least until the things get uploaded they must be kept in memory. This means you build up potentially a lot more memory use than if you didn't upload them. Maybe this is expected for users who understand what's going on but a lot of users would not expect that at all.
  • Often times users won't even know what they want to blob or not. Especially when it's not even the end user emitting the data. Consider the case of an agent framework or framework instrumentation that is actually emitting the telemetry. Does it blob anything that is potentially large? If only some backends support it, it has overhead and makes things harder to query that would be very annoying for end users. Does it never blob anything? Then randomly some requests (e.g. when the prompt exceeds some threshold or if the response is an image) will fail silently when the SDK tries to export them and the backend rejects them because they're too big, etc.
  • How does this interact with sampling? I can imagine this two things are often combined. It would be a shame if you do a lot of work both on the client side and server side to upload data that is then immediately orphaned because the traces referencing it are sampled out in the OTEL collector. Can a collector communicate upstream what the result of sampling was such that the backend can discard or not further process those blobs?
  • Does each backend need to make an implementation for each language and guide users towards configuring it? What if a user wants to send data to multiple backends? This seems to be a very common thing nowadays, we see a lot of our users combine a traditional observability backend like DataDog, etc. with a Gen AI specific backend, both via OTEL. Can we come up with a version of this that operates at the protocol level? Imagine we had a /v1/blobs endpoint or something just as we have /v1/traces. I don't think it'd be that hard to design an endpoint that returns links to presigned POST or PUT URLs in a way that's compatible with most object storage providers.

Personally I would like:

  • For the OTEL SDK/API to support basic uploading of bytes: Why are bytes attributes encoded as strings? opentelemetry-python#3596. This first step would at least unblock basic use cases for uploading images without having to deal with all of the questions above.
  • An API that allows SDKs (e.g. via an exporter) to determine if data should be uploaded inline or via a blob.

All of this said the design we've been iterating on internally looks like this:

with logfire.span(
  'asking {model}',
  model='gpt',
  prompt=TextBlob(text_data),
  image=Blob(binary_data, content_type='png'),
)
   ...

Or more in an OTEL SDK style:

with tracer.span(
  'asking llm',
  attributes={'model': 'gpt'},
  blobs={'prompt': TextBlob(text_data), 'image': Blob(binary_data, content_type='png')},
)
   ...

(This can even be typed as blobs: list[Blob[Any]]`)

The SDK would then decide if the attribute should actually be uploaded via a presigned URL or if it should just be sent inline, based on the size and type of the attribute in our case.

When the data is blobbed what would be uploaded in its place would be an authenticated URL.
For example for us it might be https://logfire-api.pydantic.dev/v1/blobs/{blob-id} where blob-id might be {trace-id}-{span-id}-{uuid} or something, idk.

To query it users will have to unblob the data:

select *
from records
where unblob(attributes->>'prompt') like '%you are a professional%'

Which will be a no-op for things uploaded inline or do efficient batch, parallel reconciling of data that is stored out of line in object storage.

Because the data stored is authenticated URLs it's possible for:

  • A query engine with the unblob function above to identify and download the blobs using its existing authentication.
  • The frontend to download the data and display it inline using the user's credentials.
  • Bulk data exports to still retain access to the data, e.g. if users bulk download their data they can then still retrieve their blobs by making requests with a service account read token.

And there are no issues with expiring presigned urls, etc.

For the sampling issue the best I can think of is that the wire protocol support some way to sending a summary of sampled data. E.g. the collector sends a payload to /v1/sampling to say "hey I sampled these trace ids out, please drop any data associated with them". But that would require a lot more extensive ecosystem changes.

@samuelcolvin
Copy link
Contributor

Thanks @adriangb.

I think sampling and memory are the most immediately problematic concerns which we need conceptual answers for before continuing work on #3122.

My proposal:

  • can we defer the upload until the batch exporter, e.g. after sampling, and thereby avoid doing the upload if we're sampling in that python environment, obviously this won't help if you sample later on, but it's better than nothing?
  • can we write blobs to a temp file in the main thread, then read the file, upload it and delete in a separate thread after sampling (or delete the file if the spans are sampled out)

@adriangb
Copy link
Contributor

adriangb commented Jan 21, 2025

Those are great suggestions to do something but yeah I fear those are questions we do need to answer.

My suggestion is that a first step is to support binary data as attributes. That solves at least some use cases and should not be controversial at all, it just needs implementation work.

@michaelsafyan
Copy link
Author

@adriangb thanks for raising those issues.

I agree that this is not GenAI specific.

With respect to pre-signed URLs, I do think there is value in being able to actually hide the content from observability backend so that authorization is conferred out-of-band of the URL; for example, some of this data may be highly sensitive (more so than other observability data), and so separating auth from linking can actually be beneficial here. (Yes, if the backend is granted the auth, then being able to display inline and index, etc. is good; but it should also be possible to create linkage without granting access to the telemetry backend and without every user of the telemetry backend having access to the underlying content).

With respect to sampling, I agree that this should be applied after the span is sampled. (And there should maybe even be a SampledBlobUploader that allows a subset of blobs from sampled spans to be uploaded, so that even in the case of sampled spans, not all of the blobs get uploaded). To keep things narrow and make forward progress, though, I wanted to limit the scope to just the uploading machinery mechanics and defer how to hook it up into the overall system into a separate scope of work.

Making this a part of the protocol is an interesting thought. If the main concern, though, is having to modify multiple libraries in multiple languages, implementing in Go and adding to the Collector can cover pretty much all cases (by having other languages route to the Collector, which then would do the blob upload within the collector). Having to implement a blob upload endpoint is also a lot of work (in addition to being difficult organizationally in terms of soliciting agreement from all stakeholders and ensuring a protocol that everyone can agree on).

@michaelsafyan
Copy link
Author

@samuelcolvin thanks for those suggestions.

Deferring the upload until sampling makes sense. I had assumed, perhaps incorrectly, that this code wouldn't effectively get called unless the span was being sampled. Do you have a particular mechanism in mind for how the sampling decision could/should get propagated into the uploader code?

Collecting the data in temp files and uploading later also seems like a reasonable request.

Happy to make these changes if there is agreement to proceed with the approach in principle.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request gen-ai Related to generative AI
Development

No branches or pull requests

7 participants