Skip to content

Commit

Permalink
Get data refactor (#481)
Browse files Browse the repository at this point in the history
  • Loading branch information
savente93 authored Sep 5, 2023
1 parent 5ec6e50 commit 0280c20
Show file tree
Hide file tree
Showing 14 changed files with 836 additions and 503 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,4 @@ dask-worker-space/

#ruff linting
.ruff_cache
.envrc
2 changes: 1 addition & 1 deletion docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Added
- Add support for reading model configs in ``TOML`` format. (PR #444)
- new ``force-overwrite`` option in ``hydromt update`` CLI to force overwritting updated netcdf files. (PR #460)
- add ``open_mfcsv`` function in ``io`` module for combining multiple CSV files into one dataset. (PR #486)
- Adapters can now clip data that is passed through a python object the same way as through the data catalog. (PR #481)

Changed
-------
Expand All @@ -29,7 +30,6 @@ Changed
Fixed
-----
- when a model component (eg maps, forcing, grid) is updated using the set_ methods, it will first be read to avoid loosing data. (PR #460)
-

Deprecated
----------
Expand Down
125 changes: 31 additions & 94 deletions hydromt/data_adapter/data_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@
from string import Formatter
from typing import Optional

import geopandas as gpd
import numpy as np
import pandas as pd
import xarray as xr
import yaml
from fsspec.implementations import local
from pyproj import CRS
from upath import UPath

from .. import _compat, gis_utils
from .. import _compat

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -239,80 +237,11 @@ def __eq__(self, other: object) -> bool:
else:
return False

def _parse_zoom_level(
def _resolve_paths(
self,
zoom_level: int | tuple = None,
geom: gpd.GeoSeries = None,
bbox: list = None,
logger=logger,
) -> int:
"""Return nearest smaller zoom level.
Based on zoom resolutions defined in data catalog.
"""
# common pyproj crs axis units
known_units = ["degree", "metre", "US survey foot"]
if self.zoom_levels is None or len(self.zoom_levels) == 0:
logger.warning("No zoom levels available, default to zero")
return 0
zls = list(self.zoom_levels.keys())
if zoom_level is None: # return first zoomlevel (assume these are ordered)
return next(iter(zls))
# parse zoom_level argument
if (
isinstance(zoom_level, tuple)
and isinstance(zoom_level[0], (int, float))
and isinstance(zoom_level[1], str)
and len(zoom_level) == 2
):
res, unit = zoom_level
# covert 'meter' and foot to official pyproj units
unit = {"meter": "metre", "foot": "US survey foot"}.get(unit, unit)
if unit not in known_units:
raise TypeError(
f"zoom_level unit {unit} not understood;"
f" should be one of {known_units}"
)
elif not isinstance(zoom_level, int):
raise TypeError(
f"zoom_level argument not understood: {zoom_level}; should be a float"
)
else:
return zoom_level
if self.crs:
# convert res if different unit than crs
crs = CRS.from_user_input(self.crs)
crs_unit = crs.axis_info[0].unit_name
if crs_unit != unit and crs_unit not in known_units:
raise NotImplementedError(
f"no conversion available for {unit} to {crs_unit}"
)
if unit != crs_unit:
lat = 0
if bbox is not None:
lat = (bbox[1] + bbox[3]) / 2
elif geom is not None:
lat = geom.to_crs(4326).centroid.y.item()
conversions = {
"degree": np.hypot(*gis_utils.cellres(lat=lat)),
"US survey foot": 0.3048,
}
res = res * conversions.get(unit, 1) / conversions.get(crs_unit, 1)
# find nearest smaller zoomlevel
eps = 1e-5 # allow for rounding errors
smaller = [x < (res + eps) for x in self.zoom_levels.values()]
zl = zls[-1] if all(smaller) else zls[max(smaller.index(False) - 1, 0)]
logger.info(f"Getting data for zoom_level {zl} based on res {zoom_level}")
return zl

def resolve_paths(
self,
time_tuple: tuple = None,
variables: list = None,
zoom_level: int | tuple = None,
geom: gpd.GeoSeries = None,
bbox: list = None,
logger=logger,
time_tuple: Optional[tuple] = None,
variables: Optional[list] = None,
zoom_level: int = 0,
**kwargs,
):
"""Resolve {year}, {month} and {variable} keywords in self.path.
Expand All @@ -326,22 +255,18 @@ def resolve_paths(
:py:func:`pandas.to_timedelta`, by default None
variables : list of str, optional
List of variable names, by default None
zoom_level : int | tuple, optional
zoom level of dataset, can be provided as tuple of
(<zoom resolution>, <unit>)
zoom_level : int
Parsed zoom level to use, by default 0
See :py:meth:`RasterDataAdapter._parse_zoom_level` for more info
logger:
The logger to use. If none is provided, the devault logger will be used.
**kwargs
key-word arguments are passed to fsspec FileSystem objects. Arguments
depend on protocal (local, gcs, s3...).
geom:
A geoSeries describing the geometries.
bbox:
A list of bounding boxes.
logger:
The logger to use. If none is provided, the devault logger will be used.
Returns
-------
List:
fns: list of str
list of filenames matching the path pattern given date range and variables
"""
known_keys = ["year", "month", "zoom_level", "variable"]
Expand All @@ -365,6 +290,7 @@ def resolve_paths(
else:
path = path + key_str
keys.append(key)

# resolve dates: month & year keys
dates, vrs, postfix = [None], [None], ""
if time_tuple is not None:
Expand All @@ -375,21 +301,16 @@ def resolve_paths(
postfix += "; date range: " + " - ".join([t.strftime(strf) for t in trange])
# resolve variables
if variables is not None:
variables = np.atleast_1d(variables).tolist()
mv_inv = {v: k for k, v in self.rename.items()}
vrs = [mv_inv.get(var, var) for var in variables]
postfix += f"; variables: {variables}"
# parse zoom level
if "zoom_level" in keys:
# returns the first zoom_level if zoom_level is None
zoom_level = self._parse_zoom_level(
zoom_level=zoom_level, bbox=bbox, geom=geom, logger=logger
)

# get filenames with glob for all date / variable combinations
fs = self.get_filesystem(**kwargs)
fmt = {}
# update based on zoomlevel (size = 1)
if zoom_level is not None:
if "zoom_level" in keys:
fmt.update(zoom_level=zoom_level)
# update based on dates and variables (size >= 1)
for date, var in product(dates, vrs):
Expand All @@ -398,6 +319,7 @@ def resolve_paths(
if var is not None:
fmt.update(variable=var)
fns.extend(fs.glob(path.format(**fmt)))

if len(fns) == 0:
raise FileNotFoundError(f"No such file found: {path}{postfix}")

Expand All @@ -409,7 +331,8 @@ def resolve_paths(
last_parent = UPath(path).parents[-1]
# add the rest of the path
fns = [last_parent.joinpath(*UPath(fn).parts[1:]) for fn in fns]
return list(set(fns)) # return unique paths
fns = list(set(fns)) # return unique paths
return fns

def get_filesystem(self, **kwargs):
"""Return an initialised filesystem object."""
Expand Down Expand Up @@ -445,3 +368,17 @@ def get_data(self, bbox, geom, buffer):
If bbox of mask are given, clip data to that extent.
"""

@staticmethod
def _single_var_as_array(ds, single_var_as_array, variable_name=None):
# return data array if single variable dataset
dvars = list(ds.data_vars.keys())
if single_var_as_array and len(dvars) == 1:
da = ds[dvars[0]]
if isinstance(variable_name, list) and len(variable_name) == 1:
da.name = variable_name[0]
elif isinstance(variable_name, str):
da.name = variable_name
return da
else:
return ds
Loading

0 comments on commit 0280c20

Please sign in to comment.