Skip to content

Commit

Permalink
Updated luigi.contrib.azureblob to 12.x.y series of azure.storage.blob
Browse files Browse the repository at this point in the history
  • Loading branch information
bcamel committed May 19, 2024
1 parent f55e5f2 commit 56dc5e1
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 45 deletions.
109 changes: 76 additions & 33 deletions luigi/contrib/azureblob.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import logging
import datetime

from azure.storage.blob import blockblobservice
from azure.storage.blob import BlobServiceClient

from luigi.format import get_default_format
from luigi.target import FileAlreadyExists, FileSystem, AtomicLocalFile, FileSystemTarget
Expand Down Expand Up @@ -62,60 +62,101 @@ def __init__(self, account_name=None, account_key=None, sas_token=None, **kwargs
* `custom_domain` - The custom domain to use. This can be set in the Azure Portal. For example, ‘www.mydomain.com’.
* `token_credential` - A token credential used to authenticate HTTPS requests. The token value should be updated before its expiration.
"""
self.options = {"account_name": account_name, "account_key": account_key, "sas_token": sas_token}
if kwargs.get("custom_domain"):
account_url = "{protocol}://{custom_domain}/{account_name}".format(protocol=kwargs.get("protocol", "https"),
custom_domain=kwargs.get("custom_domain"),
account_name=account_name)
else:
account_url = "{protocol}://{account_name}.blob.{endpoint_suffix}".format(protocol=kwargs.get("protocol",

Check warning on line 70 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L70

Added line #L70 was not covered by tests
"https"),
account_name=account_name,
endpoint_suffix=kwargs.get(
"endpoint_suffix",
"core.windows.net"))

self.options = {
"account_name": account_name,
"account_key": account_key,
"account_url": account_url,
"sas_token": sas_token}
self.kwargs = kwargs

@property
def connection(self):
return blockblobservice.BlockBlobService(account_name=self.options.get("account_name"),
account_key=self.options.get("account_key"),
sas_token=self.options.get("sas_token"),
protocol=self.kwargs.get("protocol"),
connection_string=self.kwargs.get("connection_string"),
endpoint_suffix=self.kwargs.get("endpoint_suffix"),
custom_domain=self.kwargs.get("custom_domain"),
is_emulated=self.kwargs.get("is_emulated") or False)
if self.kwargs.get("connection_string"):
return BlobServiceClient.from_connection_string(conn_str=self.kwargs.get("connection_string"),

Check warning on line 87 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L86-L87

Added lines #L86 - L87 were not covered by tests
**self.kwargs)
else:
return BlobServiceClient(account_url=self.options.get("account_url"),

Check warning on line 90 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L90

Added line #L90 was not covered by tests
credential=self.options.get("account_key") or self.options.get("sas_token"),
**self.kwargs)

def container_client(self, container_name):
return self.connection.get_container_client(container_name)

Check warning on line 95 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L95

Added line #L95 was not covered by tests

def blob_client(self, container_name, blob_name):
container_client = self.container_client(container_name)
return container_client.get_blob_client(blob_name)

Check warning on line 99 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L98-L99

Added lines #L98 - L99 were not covered by tests

def upload(self, tmp_path, container, blob, **kwargs):
logging.debug("Uploading file '{tmp_path}' to container '{container}' and blob '{blob}'".format(
tmp_path=tmp_path, container=container, blob=blob))
self.create_container(container)
lease_id = self.connection.acquire_blob_lease(container, blob)\
if self.exists("{container}/{blob}".format(container=container, blob=blob)) else None
lease = None
blob_client = self.blob_client(container, blob)
if blob_client.exists():
lease = blob_client.acquire_lease()

Check warning on line 108 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L105-L108

Added lines #L105 - L108 were not covered by tests
try:
self.connection.create_blob_from_path(container, blob, tmp_path, lease_id=lease_id, progress_callback=kwargs.get("progress_callback"))
with open(tmp_path, 'rb') as data:
blob_client.upload_blob(data,

Check warning on line 111 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L110-L111

Added lines #L110 - L111 were not covered by tests
overwrite=True,
lease=lease,
progress_hook=kwargs.get("progress_callback"))
finally:
if lease_id is not None:
self.connection.release_blob_lease(container, blob, lease_id)
if lease is not None:
lease.release()

Check warning on line 117 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L116-L117

Added lines #L116 - L117 were not covered by tests

def download_as_bytes(self, container, blob, bytes_to_read=None):
start_range, end_range = (0, bytes_to_read-1) if bytes_to_read is not None else (None, None)
logging.debug("Downloading from container '{container}' and blob '{blob}' as bytes".format(
container=container, blob=blob))
return self.connection.get_blob_to_bytes(container, blob, start_range=start_range, end_range=end_range).content
blob_client = self.blob_client(container, blob)
download_stream = blob_client.download_blob(offset=0, length=bytes_to_read) if bytes_to_read \

Check warning on line 123 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L122-L123

Added lines #L122 - L123 were not covered by tests
else blob_client.download_blob()
return download_stream.readall()

Check warning on line 125 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L125

Added line #L125 was not covered by tests

def download_as_file(self, container, blob, location):
logging.debug("Downloading from container '{container}' and blob '{blob}' to {location}".format(
container=container, blob=blob, location=location))
return self.connection.get_blob_to_path(container, blob, location)
blob_client = self.blob_client(container, blob)
with open(location, 'wb') as file:
download_stream = blob_client.download_blob()
file.write(download_stream.readall())
return blob_client.get_blob_properties()

Check warning on line 134 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L130-L134

Added lines #L130 - L134 were not covered by tests

def create_container(self, container_name):
return self.connection.create_container(container_name)
if not self.exists(container_name):
return self.connection.create_container(container_name)

Check warning on line 138 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L137-L138

Added lines #L137 - L138 were not covered by tests

def delete_container(self, container_name):
lease_id = self.connection.acquire_container_lease(container_name)
self.connection.delete_container(container_name, lease_id=lease_id)
container_client = self.container_client(container_name)
lease = container_client.acquire_lease()
container_client.delete_container(lease=lease)

Check warning on line 143 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L141-L143

Added lines #L141 - L143 were not covered by tests

def exists(self, path):
container, blob = self.splitfilepath(path)
return self.connection.exists(container, blob)
if blob is None:
return self.container_client(container).exists()

Check warning on line 148 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L147-L148

Added lines #L147 - L148 were not covered by tests
else:
return self.blob_client(container, blob).exists()

Check warning on line 150 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L150

Added line #L150 was not covered by tests

def remove(self, path, recursive=True, skip_trash=True):
container, blob = self.splitfilepath(path)
if not self.exists(path):
return False
lease_id = self.connection.acquire_blob_lease(container, blob)
self.connection.delete_blob(container, blob, lease_id=lease_id)

container, blob = self.splitfilepath(path)
blob_client = self.blob_client(container, blob)
lease = blob_client.acquire_lease()
blob_client.delete_blob(lease=lease)

Check warning on line 159 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L156-L159

Added lines #L156 - L159 were not covered by tests
return True

def mkdir(self, path, parents=True, raise_if_exists=False):
Expand Down Expand Up @@ -148,16 +189,18 @@ def copy(self, path, dest):
source_container=source_container, dest_container=dest_container
))

source_lease_id = self.connection.acquire_blob_lease(source_container, source_blob)
destination_lease_id = self.connection.acquire_blob_lease(dest_container, dest_blob) if self.exists(dest) else None
source_blob_client = self.blob_client(source_container, source_blob)
dest_blob_client = self.blob_client(dest_container, dest_blob)
source_lease = source_blob_client.acquire_lease()
destination_lease = dest_blob_client.acquire_lease() if self.exists(dest) else None

Check warning on line 195 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L192-L195

Added lines #L192 - L195 were not covered by tests
try:
return self.connection.copy_blob(source_container, dest_blob, self.connection.make_blob_url(
source_container, source_blob),
destination_lease_id=destination_lease_id, source_lease_id=source_lease_id)
return dest_blob_client.start_copy_from_url(source_url=source_blob_client.url,

Check warning on line 197 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L197

Added line #L197 was not covered by tests
source_lease=source_lease,
destination_lease=destination_lease)
finally:
self.connection.release_blob_lease(source_container, source_blob, source_lease_id)
if destination_lease_id is not None:
self.connection.release_blob_lease(dest_container, dest_blob, destination_lease_id)
source_lease.release()
if destination_lease is not None:
destination_lease.release()

Check warning on line 203 in luigi/contrib/azureblob.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/azureblob.py#L201-L203

Added lines #L201 - L203 were not covered by tests

def rename_dont_move(self, path, dest):
self.move(path, dest)
Expand Down
4 changes: 2 additions & 2 deletions scripts/ci/install_start_azurite.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

echo "$DOCKERHUB_TOKEN" | docker login -u spotifyci --password-stdin

docker pull arafato/azurite
docker pull mcr.microsoft.com/azure-storage/azurite
mkdir -p blob_emulator
$1/stop_azurite.sh
docker run -e executable=blob -d -t -p 10000:10000 -v blob_emulator:/opt/azurite/folder arafato/azurite
docker run -p 10000:10000 -v blob_emulator:/data -d mcr.microsoft.com/azure-storage/azurite azurite-blob -l /data --blobHost 0.0.0.0 --blobPort 10000
2 changes: 1 addition & 1 deletion scripts/ci/stop_azurite.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#!/usr/bin/env bash
docker stop $(docker ps -q --filter ancestor=arafato/azurite)
docker stop "$(docker ps -q --filter ancestor=mcr.microsoft.com/azure-storage/azurite)"
23 changes: 16 additions & 7 deletions test/contrib/azureblob_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@

import luigi
from luigi.contrib.azureblob import AzureBlobClient, AzureBlobTarget
from luigi.target import FileAlreadyExists

account_name = os.environ.get("ACCOUNT_NAME")
account_key = os.environ.get("ACCOUNT_KEY")
sas_token = os.environ.get("SAS_TOKEN")
is_emulated = False if account_name else True
client = AzureBlobClient(account_name, account_key, sas_token, is_emulated=is_emulated)
account_name = os.environ.get("AZURITE_ACCOUNT_NAME")
account_key = os.environ.get("AZURITE_ACCOUNT_KEY")
sas_token = os.environ.get("AZURITE_SAS_TOKEN")
custom_domain = os.environ.get("AZURITE_CUSTOM_DOMAIN")
protocol = os.environ.get("AZURITE_PROTOCOL", "http")
client = AzureBlobClient(account_name, account_key, sas_token, custom_domain=custom_domain, protocol=protocol)


@pytest.mark.azureblob
Expand Down Expand Up @@ -95,8 +97,15 @@ def test_upload_copy_move_remove_blob(self):
self.client.upload(f.name, container_name, from_blob_name)
self.assertTrue(self.client.exists(from_path))

# mkdir
self.assertRaises(FileAlreadyExists, self.client.mkdir, from_path, False, True)

# mkdir does not actually create anything
self.client.mkdir(to_path, True, True)
self.assertFalse(self.client.exists(to_path))

# copy
self.assertIn(self.client.copy(from_path, to_path).status, ["success", "pending"])
self.assertIn(self.client.copy(from_path, to_path)["copy_status"], ["success", "pending"])
self.assertTrue(self.client.exists(to_path))

# remove
Expand All @@ -121,7 +130,7 @@ def output(self):
return AzureBlobTarget("luigi-test", "movie-cheesy.txt", client, download_when_reading=False)

def run(self):
client.connection.create_container("luigi-test")
client.create_container("luigi-test")
with self.output().open("w") as op:
op.write("I'm going to make him an offer he can't refuse.\n")
op.write("Toto, I've got a feeling we're not in Kansas anymore.\n")
Expand Down
7 changes: 5 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ deps =
pymongo==3.4.0
toml<2.0.0
responses<1.0.0
azure-storage<=0.36
azure-storage-blob<=12.20.0
datadog==0.22.0
prometheus-client>=0.5.0<0.15
dropbox: dropbox>=11.0.0
Expand All @@ -75,6 +75,9 @@ setenv =
AWS_DEFAULT_REGION=us-east-1
AWS_ACCESS_KEY_ID=accesskey
AWS_SECRET_ACCESS_KEY=secretkey
AZURITE_ACCOUNT_NAME=devstoreaccount1
AZURITE_ACCOUNT_KEY=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==
AZURITE_CUSTOM_DOMAIN=localhost:10000
commands =
cdh,hdp: {toxinidir}/scripts/ci/setup_hadoop_env.sh
azureblob: {toxinidir}/scripts/ci/install_start_azurite.sh {toxinidir}/scripts/ci
Expand Down Expand Up @@ -137,7 +140,7 @@ deps =
jinja2==3.0.3
Sphinx>=1.4.4,<1.5
sphinx_rtd_theme
azure-storage<=0.36
azure-storage-blob<=12.20.0
prometheus-client==0.5.0
alabaster<0.7.13
commands =
Expand Down

0 comments on commit 56dc5e1

Please sign in to comment.