Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlobUploader utilities to enable handling of large data in instrumentation #3122

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions opentelemetry-instrumentation/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ dependencies = [
"packaging >= 18.0",
]

[project.optional-dependencies]
gcs = [
"google-cloud-storage==2.19.0"
]
magic = [
"python-magic==0.4.27"
]

[project.scripts]
opentelemetry-bootstrap = "opentelemetry.instrumentation.bootstrap:run"
opentelemetry-instrument = "opentelemetry.instrumentation.auto_instrumentation:run"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Exposes API methods to callers from the package name."""

from opentelemetry.instrumentation._blobupload.api.blob import Blob
from opentelemetry.instrumentation._blobupload.api.blob_uploader import (
BlobUploader,
)
from opentelemetry.instrumentation._blobupload.api.constants import (
NOT_UPLOADED,
)
from opentelemetry.instrumentation._blobupload.api.content_type import (
detect_content_type,
)
from opentelemetry.instrumentation._blobupload.api.labels import (
generate_labels_for_event,
generate_labels_for_span,
generate_labels_for_span_event,
)
from opentelemetry.instrumentation._blobupload.api.provider import (
BlobUploaderProvider,
get_blob_uploader,
set_blob_uploader_provider,
)

__all__ = [
Blob,
BlobUploader,
NOT_UPLOADED,
detect_content_type,
generate_labels_for_event,
generate_labels_for_span,
generate_labels_for_span_event,
BlobUploaderProvider,
get_blob_uploader,
set_blob_uploader_provider,
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import base64
import json

from types import MappingProxyType as _frozendict
from typing import Mapping, Dict, Optional


class Blob(object):
"""Represents an opaque binary object and associated metadata.

This object conteptually has the following properties:

- raw_bytes: the actual data (payload) of the Blob
- content_type: metadata about the content type (e.g. "image/jpeg")
- labels: key/value data that can be used to identify and contextualize
the object such as {"trace_id": "...", "span_id": "...", "filename": ...}
Comment on lines +24 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this duplicates the docs on the properties.

"""

def __init__(
self,
raw_bytes: bytes,
content_type: Optional[str] = None,
labels: Optional[Mapping[str, str]] = None,
):
"""Initialize the blob with an explicit set of properties.

Args:
raw_bytes: the required payload
content_type: the MIME type describing the type of data in the payload
labels: additional key/value data about the Blob
"""
self._raw_bytes = raw_bytes
self._content_type = content_type
self._labels = {}
if labels is not None:
if isinstance(labels, dict):
self._labels.update(labels)
else:
for k in labels:
self._labels[k] = labels[k]

@staticmethod
def from_data_uri(uri: str, labels: Optional[dict] = None) -> "Blob":
Copy link
Contributor

@samuelcolvin samuelcolvin Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be easier to extend if this was a classmethod that returned cls(raw_bytes, content_type=content_type, labels=labels).

Alternatively, if this class shouldn't be subclassed, it should be marked as final.

michaelsafyan marked this conversation as resolved.
Show resolved Hide resolved
"""Instantiate a blob from a 'data:...' URI.

Args:
uri: A URI in the 'data:' format. Supports a subset of 'data:' URIs
that encode the data with the 'base64' extension and that include
a content type. Should work with any normal 'image/jpeg', 'image/png',
'application/pdf', 'audio/aac', and many others. DOES NOT SUPPORT
encoding data as percent-encoded text (no "base64").

labels: Additional key/value data to include in the constructed Blob.
"""
if not uri.startswith("data:"):
raise ValueError(
'Invalid "uri"; expected "data:" prefix. Found: "{}"'.format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason we're using .format() not f-strings?

In general I think f-strings would be preferred in modern python.

In particular, f-strings are around 2x faster to evaluate:

In [1]: %timeit 'this is a test {}'.format(1)
70.4 ns ± 1.3 ns per loop (mean ± std. dev. of 7 runs, 10,000,000 loops each)

In [2]: %timeit f'this is a test {1}'
37.9 ns ± 0.239 ns per loop (mean ± std. dev. of 7 runs, 10,000,000 loops each)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, including the full uri in the exception message could lead to extremely long exception messages, I would omit it.

uri
)
)
if ";base64," not in uri:
raise ValueError(
'Invalid "uri"; expected ";base64," section. Found: "{}"'.format(
uri
)
)
data_prefix_len = len("data:")
after_data_prefix = uri[data_prefix_len:]
if ";" not in after_data_prefix:
raise ValueError(
'Invalid "uri"; expected ";" in URI. Found: "{}"'.format(uri)
)
content_type, remaining = after_data_prefix.split(";", 1)
while not remaining.startswith("base64,"):
_, remaining = remaining.split(";", 1)
assert remaining.startswith("base64,")
base64_len = len("base64,")
base64_encoded_content = remaining[base64_len:]
raw_bytes = base64.b64decode(base64_encoded_content)
return Blob(raw_bytes, content_type=content_type, labels=labels)

@property
def raw_bytes(self) -> bytes:
"""Returns the raw bytes (payload) of this Blob."""
return self._raw_bytes

@property
def content_type(self) -> Optional[str]:
"""Returns the content type (or None) of this Blob."""
return self._content_type

@property
def labels(self) -> Mapping[str, str]:
"""Returns the key/value metadata of this Blob."""
return _frozendict(self._labels)

def __eq__(self, o):
michaelsafyan marked this conversation as resolved.
Show resolved Hide resolved
return (
(isinstance(o, Blob)) and
(self.raw_bytes == o.raw_bytes) and
(self.content_type == o.content_type) and
(self.labels == o.labels)
)

def __repr__(self):
michaelsafyan marked this conversation as resolved.
Show resolved Hide resolved
params = [repr(self._raw_bytes)]
if self._content_type is not None:
params.append('content_type={}'.format(repr(self._content_type)))
if self._labels:
params.append('labels={}'.format(json.dumps(self._labels, sort_keys=True)))
params_string = ', '.join(params)
return 'Blob({})'.format(params_string)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Defines an interface for performing asynchronous blob uploading."""

import abc
michaelsafyan marked this conversation as resolved.
Show resolved Hide resolved

from opentelemetry.instrumentation._blobupload.api.blob import Blob
from opentelemetry.instrumentation._blobupload.api.constants import (
NOT_UPLOADED,
)


class BlobUploader(abc.ABC):
"""Pure abstract base class representing a component that does blob uploading."""

@abc.abstractmethod
def upload_async(self, blob: Blob) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any support for async upload methods?

return NOT_UPLOADED
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Defines contexts that are used by the '_blobupload' package."""

# Special constant used to indicate that a BlobUploader did not upload.
NOT_UPLOADED = '/dev/null'
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Provides utilities for automatic content-type detection."""


# Helper used to handle the possibility of optional 'magic' dependency
# being unavailable for guessing the MIME type of raw bytes.
class _FallBackModule(object):
michaelsafyan marked this conversation as resolved.
Show resolved Hide resolved
"""Class that is shaped like the portion of 'magic' we need."""

def from_buffer(self, raw_bytes, mime=True):
michaelsafyan marked this conversation as resolved.
Show resolved Hide resolved
"""Fallback, subpar implementation of 'from_buffer'."""
return "application/octet-stream"


# Set up '_module' to either use 'magic' or the fallback.
_module = _FallBackModule()
try:
import magic

_module = magic
except ImportError:
pass


def detect_content_type(raw_bytes: bytes) -> str:
"""Attempts to infer the content type of the specified data."""
if not raw_bytes:
return 'application/octet-stream'
return _module.from_buffer(raw_bytes, mime=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Provides utilities for providing basic identifying labels for blobs."""


def generate_labels_for_span(trace_id: str, span_id: str) -> dict:
michaelsafyan marked this conversation as resolved.
Show resolved Hide resolved
"""Returns metadata for a span."""
return {"otel_type": "span", "trace_id": trace_id, "span_id": span_id}


def generate_labels_for_event(
trace_id: str, span_id: str, event_name: str
) -> dict:
michaelsafyan marked this conversation as resolved.
Show resolved Hide resolved
"""Returns metadata for an event."""
result = generate_labels_for_span(trace_id, span_id)
result.update(
{
"otel_type": "event",
"event_name": event_name,
}
)
return result


def generate_labels_for_span_event(
trace_id: str, span_id: str, event_name: str, event_index: int
) -> dict:
michaelsafyan marked this conversation as resolved.
Show resolved Hide resolved
"""Returns metadata for a span event."""
result = generate_labels_for_event(trace_id, span_id, event_name)
result.update(
{
"otel_type": "span_event",
"event_index": event_index,
}
)
return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import abc
import logging
from typing import Optional

from opentelemetry.instrumentation._blobupload.api.blob import Blob
from opentelemetry.instrumentation._blobupload.api.blob_uploader import (
BlobUploader,
)
from opentelemetry.instrumentation._blobupload.api.constants import NOT_UPLOADED


_logger = logging.getLogger(__name__)


class _NoOpBlobUploader(BlobUploader):
"""Implementation of BlobUploader that does nothing."""

def upload_async(self, blob: Blob) -> str:
return NOT_UPLOADED


class BlobUploaderProvider(abc.ABC):
"""Pure abstract base for configuring how to provide a BlobUploader."""

def get_blob_uploader(self, use_case: Optional[str]) -> BlobUploader:
"""Returns a BlobUploader for the specified use case.

Args:
use_case: An optional use case that describes what the uploader is for. This could
name a particular package, class, or instrumentation. It is intended to allow
users to differentiate upload behavior based on the target instrumentation.

Returns:
A BlobUploader that is appropriate for the use case.
"""
return _NoOpBlobUploader()


class _DefaultBlobUploaderProvider(BlobUploaderProvider):
"""Default provider used when none has been configured."""

def get_blob_uploader(self, use_case: Optional[str]) -> BlobUploader:
use_case_formatted = "(None)"
if use_case:
use_case_formatted = use_case
_logger.warning(
"No BlobUploaderProvider configured; returning a no-op for use case \"{}\". Use 'set_blob_uploader_provider()' to configure.".format(
use_case_formatted
)
)
return _NoOpBlobUploader()


_blob_uploader_provider = _DefaultBlobUploaderProvider()


def set_blob_uploader_provider(provider: BlobUploaderProvider) -> BlobUploaderProvider:
"""Allows configuring the behavior of 'get_blob_uploader."""
global _blob_uploader_provider
old_provider = _blob_uploader_provider
_blob_uploader_provider = provider
return old_provider


def get_blob_uploader(use_case: Optional[str] = None) -> BlobUploader:
return _blob_uploader_provider.get_blob_uploader(use_case)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from opentelemetry.instrumentation._blobupload.backend.google.gcs._gcs_impl import GcsBlobUploader


__all__ = [
GcsBlobUploader
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import Any, TypeAlias

import logging

_logger = logging.getLogger(__name__)
_gcs_initialized = False
_gcs_client_factory = None
_gcs_blob_from_uri = None


GcsClientType: TypeAlias = Any


def set_gcs_client_factory(gcs_client_type, client_factory):
global _gcs_initialized
global _gcs_client_factory
global GcsClientType
if _gcs_initialized:
_logger.warning('Replacing default GCS client factory')
GcsClientType = gcs_client_type
_gcs_client_factory = client_factory
if _gcs_client_factory and _gcs_blob_from_uri:
_gcs_initialized = True


def set_gcs_blob_from_uri(blob_from_uri):
global _gcs_initialized
global _gcs_blob_from_uri
if _gcs_initialized:
_logger.warning('Replacing default GCS blob_from_uri method')
_gcs_blob_from_uri = blob_from_uri
if _gcs_client_factory and _gcs_blob_from_uri:
_gcs_initialized = True


def is_gcs_initialized():
return _gcs_initialized


def create_gcs_client():
if _gcs_client_factory is not None:
return _gcs_client_factory()
return None


def blob_from_uri(uri, client):
if _gcs_blob_from_uri is not None:
return _gcs_blob_from_uri(uri, client=client)
return None


try:
from google.cloud.storage import Client as _GcsClient
from google.cloud.storage.blob import Blob as _GcsBlob
set_gcs_client_factory(_GcsClient, _GcsClient)
set_gcs_blob_from_uri(getattr(_GcsBlob, 'from_uri', getattr(_GcsBlob, 'from_string')))
_logger.debug('Found "google-cloud-storage" optional dependency and successfully registered it.')
except ImportError:
_logger.warning('Missing optional "google-cloud-storage" dependency.')
Loading
Loading