Skip to content

Commit

Permalink
Add streaming file uploads. (#576)
Browse files Browse the repository at this point in the history
This adds the ability to upload a file to TileDB Cloud, using a
streaming request to avoid loading the whole file in memory at once.
  • Loading branch information
thetorpedodog authored Jun 10, 2024
1 parent 974fc15 commit 71330b0
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/tiledb/cloud/_common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def run():
return wrapper # type: ignore[return-value]


def release_connection(resp: urllib3.HTTPResponse) -> None:
def release_connection(resp: urllib3.BaseHTTPResponse) -> None:
"""Release the backing connection of this HTTPResponse to the pool.
When a call is made with ``preload_content=False``, the response body is not
Expand Down
3 changes: 3 additions & 0 deletions src/tiledb/cloud/files/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
from . import udfs
from . import utils

upload = utils.upload_file

__all__ = (
"indexing",
"ingestion",
"udfs",
"upload",
"utils",
)
118 changes: 109 additions & 9 deletions src/tiledb/cloud/files/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import base64
import os
import re
import urllib.parse
import warnings
from fnmatch import fnmatch
from typing import Optional, Tuple, Union
from typing import Mapping, Optional, Tuple, Union

import tiledb
import tiledb.cloud.tiledb_cloud_error as tce
from tiledb.cloud import array
from tiledb.cloud import client
from tiledb.cloud import config
from tiledb.cloud import rest_api
from tiledb.cloud import tiledb_cloud_error
from tiledb.cloud._common import utils
from tiledb.cloud.rest_api import ApiException as GenApiException
from tiledb.cloud.rest_api import configuration
from tiledb.cloud.rest_api import models


Expand Down Expand Up @@ -93,6 +99,10 @@ def create_file(
raise tiledb_cloud_error.check_exc(exc) from None


_EXPORT_CHUNK_SIZE = 512 * 1024 * 1024
"""The number of bytes we will attempt to read at once when exporting a file."""


def export_file_local(
uri: str,
output_uri: str,
Expand All @@ -113,16 +123,19 @@ def export_file_local(
vfs = tiledb.VFS(ctx=ctx)
with tiledb.open(uri, ctx=ctx, timestamp=timestamp) as A:
file_size = A.meta["file_size"]
# Row major partial export of single attribute
iterable = A.query(
attrs=["contents"], return_incomplete=True, order="C"
).multi_index[: file_size - 1]
# Most Python indices are [start, end), but TileDB indices are
# [start, end], which is why we subtract 1 here.

with vfs.open(output_uri, "wb") as fh:
for part in iterable:
# We can't multi_index with return_incomplete over a dense array
# so here we manually chunk our request.
for start in range(0, file_size, _EXPORT_CHUNK_SIZE):
end = min(start + _EXPORT_CHUNK_SIZE, file_size)
part = A.query(
attrs=["contents"],
order="C",
)[:end]
fh.write(part["contents"])
# Get rid of this immediately so the GC can claim it ASAP
# if needed.
del part

return models.FileExported(output_uri=output_uri)

Expand Down Expand Up @@ -164,3 +177,90 @@ def export_file(
)
except GenApiException as exc:
raise tiledb_cloud_error.check_exc(exc) from None


def upload_file(
input_uri: str,
output_uri: str,
*,
filename: Optional[str] = None,
content_type: str = "application/octet-stream",
access_credentials_name: Optional[str] = None,
) -> str:
"""Uploads a file to TileDB Cloud.
:param input_uri: The URI or path of the input file. May be an ordinary path
or any URI accessible via TileDB VFS.
:param output_uri: The TileDB URI to write the file to.
:param filename: If present, the value to store as the original filename.
:param content_type: The MIME type of the file.
:param access_credentials_name: If present, the name of the credentials
to use when writing the uploaded file to backend storage instead of
the defaults.
:return: The ``tiledb://`` URI of the uploaded file.
"""
namespace, name = array.split_uri(output_uri)
client.user_profile
vfs = tiledb.VFS()

with vfs.open(input_uri, "rb") as infile:
size = infile.seek(0, 2)
infile.seek(0)
headers = {
"content-type": content_type,
**_auth_headers(config.config),
}
if access_credentials_name:
headers["x-tiledb-cloud-access-credentials-name"] = access_credentials_name

pool = client.client._client_v2.rest_client.pool_manager

query = urllib.parse.urlencode(
{
"filename": filename or os.path.basename(input_uri),
"filesize": size,
}
)

namespace = urllib.parse.quote(namespace)
name = urllib.parse.quote(name, safe="")

request_url = (
f"{config.config.host}/v2/notebooks/{namespace}/{name}/upload?{query}"
)

while True:
resp = pool.request(
method="POST",
url=request_url,
body=infile,
headers=headers,
chunked=True,
redirect=False,
)
try:
request_url = resp.get_redirect_location()
if request_url:
# If we got redirected, we need to rewind the file
# so we can send it to the actual location.
infile.seek(0)
continue
json_response = resp.json()
if not 200 <= resp.status < 300:
raise tce.TileDBCloudError(json_response["message"])
return json_response["output_uri"]
except (KeyError, ValueError) as base:
raise tce.TileDBCloudError(resp.data) from base
finally:
utils.release_connection(resp)


def _auth_headers(cfg: configuration.Configuration) -> Mapping[str, object]:
key = cfg.get_api_key_with_prefix("X-TILEDB-REST-API-KEY")
if key:
return {"x-tiledb-rest-api-key": key}
if cfg.username and cfg.password:
basic = base64.b64encode(f"{cfg.username}:{cfg.password}".encode("utf-8"))
return {"authorization": basic}
return {}
# No authentication has been provided. Do nothing.
23 changes: 23 additions & 0 deletions tests/test_file.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import hashlib
import os
import pathlib
import tempfile
import unittest
from typing import List

import tiledb
from tiledb.cloud import array
from tiledb.cloud import client
from tiledb.cloud import groups
from tiledb.cloud._common import testonly
Expand Down Expand Up @@ -167,6 +169,27 @@ def _cleanup_residual_test_arrays(array_uris: List[str]) -> None:
continue


class UploadTest(unittest.TestCase):
def test_round_trip(self):
namespace = client.default_user().username
default_path = client.default_user().default_s3_path
output = f"{default_path}/{testonly.random_name('upload')}"
uri = file_utils.upload_file(
__file__,
f"tiledb://{namespace}/{output}",
content_type="text/plain",
)
try:
with tempfile.TemporaryDirectory() as tmpdir:
tmppath = pathlib.Path(tmpdir) / "output"
file_utils.export_file_local(uri, str(tmppath))
me = pathlib.Path(__file__).read_bytes()
downloaded = tmppath.read_bytes()
self.assertEqual(me, downloaded)
finally:
array.delete_array(uri)


class TestFileIngestion(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
Expand Down

0 comments on commit 71330b0

Please sign in to comment.