Skip to content

Commit

Permalink
Merge pull request #23 from leftfield-geospatial/feature_tile_downloa…
Browse files Browse the repository at this point in the history
…d_retry

Feature tile download retry
  • Loading branch information
dugalh authored Jun 21, 2024
2 parents 6f7f062 + 03d7eaf commit d47f749
Show file tree
Hide file tree
Showing 11 changed files with 881 additions and 471 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/install-test-conda-forge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Install package
run: |
conda info
conda install geedim>=1.7.3
conda install geedim>=1.8.0
conda list
- name: Run tests
Expand Down
439 changes: 307 additions & 132 deletions geedim/cli.py

Large diffs are not rendered by default.

159 changes: 102 additions & 57 deletions geedim/download.py

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions geedim/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@


class GeedimError(Exception):
""" Base exception class. """
"""Base exception class."""


class UnfilteredError(GeedimError):
""" Raised when attempting to retrieve the properties of an unfiltered image collection. """
"""Raised when attempting to retrieve the properties of an unfiltered image collection."""


class InputImageError(GeedimError):
""" Raised when there is a problem with the images making up a collection. """
"""Raised when there is a problem with the images making up a collection."""


class TileError(GeedimError):
"""Raised when there is a problem downloading an image tile."""
196 changes: 123 additions & 73 deletions geedim/tile.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,24 @@
limitations under the License.
"""

import logging
import threading
import time
import zipfile
from io import BytesIO
import threading

import numpy as np
import requests
import rasterio as rio
import requests
from rasterio import Affine, MemoryFile
from rasterio.windows import Window
from requests.exceptions import RequestException, JSONDecodeError
from tqdm.auto import tqdm

from geedim import utils
from geedim.errors import TileError

logger = logging.getLogger(__name__)


class Tile:
Expand All @@ -51,97 +58,140 @@ def __init__(self, exp_image, window: Window):

@property
def window(self) -> Window:
""" rasterio tile window into the source image. """
"""rasterio tile window into the source image."""
return self._window

def _get_download_url_response(self, session=None):
""" Get tile download url and response. """
session = session if session else requests
with self._ee_lock:
url = self._exp_image.ee_image.getDownloadURL(
dict(
crs=self._exp_image.crs, crs_transform=tuple(self._transform)[:6], dimensions=self._shape[::-1],
filePerBand=False, fileFormat='GeoTIFF'
)
)
return session.get(url, stream=True, timeout=(30, 300)), url
@staticmethod
def _raise_for_status(response: requests.Response):
"""Raise a TileError if the tile cannot be downloaded."""
download_size = int(response.headers.get('content-length', 0))
if download_size == 0 or not response.status_code == 200:
msg = f'Error downloading tile: {response.status_code} - {response.reason}. URL: {response.url}.'
try:
resp_dict = response.json()
if 'error' in resp_dict and 'message' in resp_dict['error']:
# raise an exception with the response error message
msg = resp_dict['error']['message']
msg = f'Error downloading tile: {msg} URL: {response.url}.'
if 'user memory limit exceeded' in msg.lower():
msg += (
'\nThe `max_tile_size` or `max_tile_dim` parameters can be decreased to work around this '
'error. Alternatively you can export to Earth Engine asset, and then download the asset '
'image.'
)
except JSONDecodeError:
pass

raise TileError(msg)

def _download_to_array(self, url: str, session: requests.Session = None, bar: tqdm = None) -> np.ndarray:
"""Download the image tile into a numpy array."""
# get image download response
session = session or requests
response = session.get(url, stream=True, timeout=(30, 300))

# raise a TileError if the tile cannot be downloaded
self._raise_for_status(response)

# find raw download size
download_size = int(response.headers.get('content-length', 0))
dtype_size = np.dtype(self._exp_image.dtype).itemsize
raw_download_size = self._shape[0] * self._shape[1] * self._exp_image.count * dtype_size

# download and unzip the tile
downloaded_size = 0
try:
# download zip into buffer
zip_buffer = BytesIO()
for data in response.iter_content(chunk_size=10240):
if bar:
# update with raw download progress (0-1)
bar.update(raw_download_size * (len(data) / download_size))
zip_buffer.write(data)
downloaded_size += len(data)
zip_buffer.flush()

# extract GeoTIFF bytes from zipped buffer
with zipfile.ZipFile(zip_buffer, 'r') as zip_file:
gtiff_bytes = zip_file.read(zip_file.filelist[0])

except (RequestException, zipfile.BadZipfile):
if bar:
# reverse progress bar
bar.update(-raw_download_size * (downloaded_size / download_size))
pass
raise

# read the tile array from the GeoTIFF bytes, via a rasterio memory file
env = rio.Env(GDAL_NUM_THREADS='ALL_CPUs', GTIFF_FORCE_RGBA=False)
with utils.suppress_rio_logs(), env, MemoryFile(gtiff_bytes) as mem_file:
with mem_file.open() as ds:
array = ds.read()
if (array.dtype == np.dtype('float32')) or (array.dtype == np.dtype('float64')):
# GEE sets nodata to -inf for float data types, (but does not populate the nodata field).
# rasterio won't allow nodata=-inf, so this is a workaround to change nodata to nan at
# source.
array[np.isinf(array)] = np.nan
return array

def download(self, session=None, response=None, bar: tqdm = None):
def download(
self,
session: requests.Session = None,
bar: tqdm = None,
max_retries: int = 5,
backoff_factor: float = 2.0,
) -> np.ndarray:
"""
Download the image tile into a numpy array.
Parameters
----------
session: requests.Session, optional
requests session to use for downloading
response: requests.Response, optional
Response to a get request on the tile download url.
bar: tqdm, optional
tqdm progress bar instance to update with incremental (0-1) download progress.
max_retries: int, optional
Number of times to retry downloading the tile. This is independent of the ``session``, which may have its
own retry configuration.
backoff_factor: float, optional
Backoff factor to apply between tile download retries. The delay between retries is: {backoff_factor} *
(2 ** ({number of previous retries})) seconds. This is independent of the ``session``, which may have its
own retry configuration.
Returns
-------
array: numpy.ndarray
3D numpy array of the tile pixel data with bands down the first dimension.
"""
session = session or requests

# get image download url and response
if response is None:
response, url = self._get_download_url_response(session=session)

# find raw and actual download sizes
dtype_size = np.dtype(self._exp_image.dtype).itemsize
raw_download_size = self._shape[0] * self._shape[1] * self._exp_image.count * dtype_size
download_size = int(response.headers.get('content-length', 0))

if download_size == 0 or not response.status_code == 200:
resp_dict = response.json()
if 'error' in resp_dict and 'message' in resp_dict['error']:
msg = resp_dict['error']['message']
ex_msg = f'Error downloading tile: {msg}'
if 'user memory limit exceeded' in msg.lower():
ex_msg += (
'\nThe `max_tile_size` or `max_tile_dim` parameters can be decreased to work around this '
'error. Alternatively you can export to Earth Engine asset, and then download the asset image.'
)
else:
ex_msg = str(response.json())
raise IOError(ex_msg)

# download zip into buffer
zip_buffer = BytesIO()
downloaded_size = 0
with response:
for data in response.iter_content(chunk_size=10240):
zip_buffer.write(data)
if bar is not None:
# update with raw download progress (0-1)
bar.update(raw_download_size * (len(data) / download_size))
downloaded_size += len(data)
zip_buffer.flush()

# check downloaded size matches the header content-length
if downloaded_size < download_size:
raise IOError(
f'Incomplete read: {downloaded_size} bytes read, {download_size-downloaded_size} '
f'more expected. URL: {response.url}.'
# get download URL
with self._ee_lock:
url = self._exp_image.ee_image.getDownloadURL(
dict(
crs=self._exp_image.crs,
crs_transform=tuple(self._transform)[:6],
dimensions=self._shape[::-1],
filePerBand=False,
fileFormat='GeoTIFF',
)
)

# extract geotiff from zipped buffer into another buffer
try:
zip_file = zipfile.ZipFile(zip_buffer)
ext_buffer = BytesIO(zip_file.read(zip_file.filelist[0]))
except zipfile.BadZipfile as ex:
raise IOError(f'Bad tile zip from URL: {response.url}.') from ex

# read the geotiff with a rasterio memory file
env = rio.Env(GDAL_NUM_THREADS='ALL_CPUs', GTIFF_FORCE_RGBA=False)
with utils.suppress_rio_logs(), env, MemoryFile(ext_buffer) as mem_file:
with mem_file.open() as ds:
array = ds.read()
if (array.dtype == np.dtype('float32')) or (array.dtype == np.dtype('float64')):
# GEE sets nodata to -inf for float data types, (but does not populate the nodata field).
# rasterio won't allow nodata=-inf, so this is a workaround to change nodata to nan at source.
array[np.isinf(array)] = np.nan
# download and read the tile, with retries
array = None
for retry in range(max_retries + 1):
try:
array = self._download_to_array(url, session=session, bar=bar)
break

except (RequestException, zipfile.BadZipfile) as ex:
if retry < max_retries:
# retry tile download
time.sleep(backoff_factor * (2**retry))
logger.warning(
f'Tile downloaded failed, retry {retry + 1} of {max_retries}. URL: {url}. {str(ex)}.'
)
else:
raise TileError(f'Tile download failed, reached the maximum retries. URL: {url}.') from ex

return array
Loading

0 comments on commit d47f749

Please sign in to comment.