Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get data refactor #481

Merged
merged 37 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2a3c7fa
refactorasterdataset into functions
savente93 Aug 16, 2023
70779f3
refactor geodataset into functions
savente93 Aug 17, 2023
66fc91e
refactor dataframe, geodataframe, rasterdataset
savente93 Aug 17, 2023
3bd7964
return predicate in geodataframe
savente93 Aug 17, 2023
5a75014
refactor geodataframe functions
savente93 Aug 17, 2023
5515647
[no ci] WIP
savente93 Aug 17, 2023
4ae4fe4
refactor checkpoint
savente93 Aug 18, 2023
cf06256
refactor geodataset checkpoint
savente93 Aug 18, 2023
ba3a373
checkpoint
savente93 Aug 18, 2023
069fae0
rename some functions
savente93 Aug 18, 2023
464a731
make data slice methods static for use in data cat
savente93 Aug 18, 2023
6da008b
fix text
savente93 Aug 18, 2023
5ec1206
make sonar cloud a bit happier
savente93 Aug 18, 2023
67886ae
increase timeout slightly
savente93 Aug 21, 2023
7910777
wip
savente93 Aug 21, 2023
37861b9
add kwargs back in on geodataframe read data
savente93 Aug 21, 2023
3bf2aab
delete stray debug file
savente93 Aug 21, 2023
ee55725
remove failfast in ci
savente93 Aug 21, 2023
747cf1f
Merge branch 'main' into get-data-refactor
savente93 Aug 21, 2023
6501fca
move dask config to conftest instead of single test
savente93 Aug 21, 2023
e678151
remove envrc
savente93 Aug 21, 2023
cefffad
add extra and io to testing
savente93 Aug 21, 2023
a573fa0
Merge branch 'main' into get-data-refactor
savente93 Aug 21, 2023
31317c7
remove tmpdir
savente93 Aug 21, 2023
b0b4dc4
update changelog
savente93 Aug 21, 2023
fe6b98a
Merge branch 'main' into get-data-refactor
savente93 Aug 24, 2023
841c485
Merge branch 'main' into get-data-refactor
savente93 Aug 24, 2023
7163da6
fix typo
DirkEilander Aug 29, 2023
d633fd3
Merge branch 'main' into get-data-refactor
savente93 Aug 30, 2023
5eca16a
uniformize slice data interface for the adapters
savente93 Aug 30, 2023
2f4c763
make geodataframe slice_data a bit safer to use
savente93 Aug 30, 2023
8cdb648
continue restructure get_data methods; IndexError if no data #181; wa…
DirkEilander Sep 1, 2023
3a04886
fix type annotation
DirkEilander Sep 1, 2023
18fc309
add tests
DirkEilander Sep 1, 2023
fdf4bfc
fix logging
DirkEilander Sep 1, 2023
377b71a
fix clip error
DirkEilander Sep 1, 2023
da1bc6a
call static methods as static
savente93 Sep 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,4 @@ dask-worker-space/

#ruff linting
.ruff_cache
.envrc
2 changes: 1 addition & 1 deletion docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Added
- Add support for reading model configs in ``TOML`` format. (PR #444)
- new ``force-overwrite`` option in ``hydromt update`` CLI to force overwritting updated netcdf files. (PR #460)
- add ``open_mfcsv`` function in ``io`` module for combining multiple CSV files into one dataset. (PR #486)
- Adapters can now clip data that is passed through a python object the same way as through the data catalog. (PR #481)

Changed
-------
Expand All @@ -28,7 +29,6 @@ Changed
Fixed
-----
- when a model component (eg maps, forcing, grid) is updated using the set_ methods, it will first be read to avoid loosing data. (PR #460)
-

Deprecated
----------
Expand Down
84 changes: 63 additions & 21 deletions hydromt/data_adapter/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,48 @@ def get_data(
based on the properties of this DataFrameAdapter. For a detailed
description see: :py:func:`~hydromt.data_catalog.DataCatalog.get_dataframe`
"""
kwargs = self._parse_args()
df = self._load_data(variables, **kwargs)
df = DataFrameAdapter.slice_temporal_dimension(df, time_tuple)
df = self._uniformize_data(df)
return df

def _load_data(self, variables, **kwargs):
df = self._read_data(**kwargs)
df = self._rename_vars(df, variables)
return df

def _uniformize_data(self, df):
df = self._apply_unit_conversion(df)
df = self._set_meta_data(df)
return df

@staticmethod
def slice_temporal_dimension(df, time_tuple):
"""Return a sliced DataFrame.

Parameters
----------
df : pd.DataFrame
the dataframe to be sliced.
time_tuple : tuple of str, datetime, optional
Start and end date of period of interest. By default the entire time period
of the dataset is returned.

Returns
-------
pd.DataFrame
Tabular data
"""
if time_tuple is not None and np.dtype(df.index).type == np.datetime64:
logger.debug(f"DataFrame: Slicing time dime {time_tuple}")
df = df[df.index.slice_indexer(*time_tuple)]
if df.size == 0:
raise IndexError("DataFrame: Time slice out of range.")

return df

def _parse_args(self):
# Extract storage_options from kwargs to instantiate fsspec object correctly
so_kwargs = {}
if "storage_options" in self.driver_kwargs:
Expand All @@ -209,13 +251,14 @@ def get_data(
_ = self.resolve_paths(**so_kwargs) # throw nice error if data not found

kwargs = self.driver_kwargs.copy()
return kwargs

# read and clip
def _read_data(self, **kwargs):
logger.info(f"DataFrame: Read {self.driver} data.")

if self.driver in ["csv"]:
df = pd.read_csv(self.path, **kwargs)
elif self.driver == "parquet":
_ = kwargs.pop("index_col", None)
df = pd.read_parquet(self.path, **kwargs)
elif self.driver in ["xls", "xlsx", "excel"]:
df = pd.read_excel(self.path, engine="openpyxl", **kwargs)
Expand All @@ -224,16 +267,21 @@ def get_data(
else:
raise IOError(f"DataFrame: driver {self.driver} unknown.")

# rename and select columns
return df

def _rename_vars(self, df, variables):
if self.rename:
rename = {k: v for k, v in self.rename.items() if k in df.columns}
df = df.rename(columns=rename)

if variables is not None:
if np.any([var not in df.columns for var in variables]):
raise ValueError(f"DataFrame: Not all variables found: {variables}")
df = df.loc[:, variables]

# nodata and unit conversion for numeric data
return df

def _apply_unit_conversion(self, df):
if df.index.size == 0:
logger.warning(f"DataFrame: No data within spatial domain {self.path}.")
else:
Expand All @@ -250,24 +298,18 @@ def get_data(
is_nodata = np.isin(df[c], np.atleast_1d(mv))
df[c] = np.where(is_nodata, np.nan, df[c])

# unit conversion
unit_names = list(self.unit_mult.keys()) + list(self.unit_add.keys())
unit_names = [k for k in unit_names if k in df.columns]
if len(unit_names) > 0:
logger.debug(f"DataFrame: Convert units for {len(unit_names)} columns.")
for name in list(set(unit_names)): # unique
m = self.unit_mult.get(name, 1)
a = self.unit_add.get(name, 0)
df[name] = df[name] * m + a

# clip time slice
if time_tuple is not None and np.dtype(df.index).type == np.datetime64:
logger.debug(f"DataFrame: Slicing time dime {time_tuple}")
df = df[df.index.slice_indexer(*time_tuple)]
if df.size == 0:
raise IndexError("DataFrame: Time slice out of range.")
unit_names = list(self.unit_mult.keys()) + list(self.unit_add.keys())
unit_names = [k for k in unit_names if k in df.columns]
if len(unit_names) > 0:
logger.debug(f"DataFrame: Convert units for {len(unit_names)} columns.")
for name in list(set(unit_names)): # unique
m = self.unit_mult.get(name, 1)
a = self.unit_add.get(name, 0)
df[name] = df[name] * m + a

return df

# set meta data
def _set_meta_data(self, df):
df.attrs.update(self.meta)

# set column attributes
Expand Down
193 changes: 133 additions & 60 deletions hydromt/data_adapter/geodataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,83 +212,64 @@ def get_data(
buffer=0,
logger=logger,
variables=None,
# **kwargs, # this is not used, for testing only
):
"""Return a clipped and unified GeoDataFrame (vector).

For a detailed description see:
:py:func:`~hydromt.data_catalog.DataCatalog.get_geodataframe`
"""
# If variable is string, convert to list
if variables:
variables = np.atleast_1d(variables).tolist()

if "storage_options" in self.driver_kwargs:
# not sure if storage options can be passed to fiona.open()
# for now throw NotImplemented Error
raise NotImplementedError(
"Remote file storage_options not implemented for GeoDataFrame"
)
_ = self.resolve_paths() # throw nice error if data not found
varialbes, clip_str, geom, predicate, kwargs = self._parse_args(
variables, geom, bbox, buffer, predicate
)
gdf = self._load_data(clip_str, geom, predicate, **kwargs)
gdf = self.slice_data(gdf, variables, geom, predicate)
gdf = self._uniformize_data(gdf)

kwargs = self.driver_kwargs.copy()
# parse geom, bbox and buffer arguments
clip_str = ""
if geom is None and bbox is not None:
# convert bbox to geom with crs EPGS:4326 to apply buffer later
geom = gpd.GeoDataFrame(geometry=[box(*bbox)], crs=4326)
clip_str = " and clip to bbox (epsg:4326)"
elif geom is not None:
clip_str = f" and clip to geom (epsg:{geom.crs.to_epsg():d})"
if geom is not None:
# make sure geom is projected > buffer in meters!
if geom.crs.is_geographic and buffer > 0:
geom = geom.to_crs(3857)
geom = geom.buffer(buffer) # a buffer with zero fixes some topology errors
bbox_str = ", ".join([f"{c:.3f}" for c in geom.total_bounds])
clip_str = f"{clip_str} [{bbox_str}]"
if kwargs.pop("within", False): # for backward compatibility
predicate = "contains"
return gdf

# read and clip
logger.info(f"GeoDataFrame: Read {self.driver} data{clip_str}.")
if self.driver in [
"csv",
"parquet",
"xls",
"xlsx",
"xy",
"vector",
"vector_table",
]:
# "csv", "xls", "xlsx", "xy" deprecated use vector_table instead.
# specific driver should be added to open_vector kwargs
if "driver" not in kwargs and self.driver in ["csv", "xls", "xlsx", "xy"]:
warnings.warn(
"using the driver setting is deprecated. Please use"
"vector_table instead."
)
@staticmethod
def slice_data(gdf, variables, geom, predicate):
"""Return a clipped GeoDataFrame (vector).

kwargs.update(driver=self.driver)
# Check if file-object is required because of additional options
gdf = io.open_vector(
self.path, crs=self.crs, geom=geom, predicate=predicate, **kwargs
)
else:
raise ValueError(f"GeoDataFrame: driver {self.driver} unknown.")
Arguments
---------
geom : geopandas.GeoDataFrame/Series,
A geometry defining the area of interest.
predicate : {'intersects', 'within', 'contains', 'overlaps',
'crosses', 'touches'}, optional If predicate is provided,
the GeoDataFrame is filtered by testing the predicate function
against each item. Requires bbox or mask. By default 'intersects'
variables : str or list of str, optional.
Names of GeoDataFrame columns to return.

# rename and select columns
if self.rename:
rename = {k: v for k, v in self.rename.items() if k in gdf.columns}
gdf = gdf.rename(columns=rename)
Returns
-------
gdf: geopandas.GeoDataFrame
GeoDataFrame
"""
if variables is not None:
if np.any([var not in gdf.columns for var in variables]):
raise ValueError(f"GeoDataFrame: Not all variables found: {variables}")
if "geometry" not in variables: # always keep geometry column
variables = variables + ["geometry"]
gdf = gdf.loc[:, variables]

# nodata and unit conversion for numeric data
if geom is not None:
gdf = gdf.sjoin(
gpd.GeoDataFrame(geometry=geom).to_crs(gdf.crs), predicate=predicate
)
if "index_right" in gdf.columns:
gdf = gdf.drop("index_right", axis=1)

return gdf

def _uniformize_data(self, gdf):
# rename and select columns
if self.rename:
rename = {k: v for k, v in self.rename.items() if k in gdf.columns}
gdf = gdf.renae(columns=rename)

# nodata and unit conversion
if gdf.index.size == 0:
logger.warning(f"GeoDataFrame: No data within spatial domain {self.path}.")
else:
Expand Down Expand Up @@ -324,4 +305,96 @@ def get_data(
for col in self.attrs:
if col in gdf.columns:
gdf[col].attrs.update(**self.attrs[col])

return gdf

def _load_data(self, clip_str, geom, predicate, **kwargs):
# read and clip
logger.info(f"GeoDataFrame: Read {self.driver} data{clip_str}.")
if self.driver in [
"csv",
"parquet",
"xls",
"xlsx",
"xy",
"vector",
"vector_table",
]:
# "csv", "xls", "xlsx", "xy" deprecated use vector_table instead.
# specific driver should be added to open_vector kwargs
if "driver" not in kwargs and self.driver in ["csv", "xls", "xlsx", "xy"]:
warnings.warn(
"using the driver setting is deprecated. Please use"
"vector_table instead."
)

kwargs.update(driver=self.driver)
# Check if file-object is required because of additional options
gdf = io.open_vector(
self.path, crs=self.crs, geom=geom, predicate=predicate, **kwargs
)
else:
raise ValueError(f"GeoDataFrame: driver {self.driver} unknown.")

return gdf

def _parse_args(self, variables, geom, bbox, buffer, predicate):
# If variable is string, convert to list
if variables:
variables = np.atleast_1d(variables).tolist()

if "storage_options" in self.driver_kwargs:
# not sure if storage options can be passed to fiona.open()
# for now throw NotImplemented Error
raise NotImplementedError(
"Remote file storage_options not implemented for GeoDataFrame"
)
_ = self.resolve_paths() # throw nice error if data not found

kwargs = self.driver_kwargs.copy()
geom, clip_str = GeoDataFrameAdapter.parse_geom(geom, bbox, buffer)
if kwargs.pop("within", False): # for backward compatibility
predicate = "contains"

return variables, clip_str, geom, predicate, kwargs

@staticmethod
def parse_geom(geom, bbox, buffer):
"""Parse geometries.

The geometry returned by this function can be
used by `GeoDataFrameAdapter.slice_data`.

Arguments
---------
geom : geopandas.GeoDataFrame/Series,
A geometry defining the area of interest.
bbox : array-like of floats
(xmin, ymin, xmax, ymax) bounding box of area of interest
(in WGS84 coordinates).
buffer : float, optional
Buffer around the `bbox` or `geom` area of interest in meters. By default 0.

Returns
-------
geom: geometry
the actual geometry
clip_str: str
the string representation of the geom to be used in logging.
"""
clip_str = ""
if geom is None and bbox is not None:
# convert bbox to geom with crs EPGS:4326 to apply buffer later
geom = gpd.GeoDataFrame(geometry=[box(*bbox)], crs=4326)
clip_str = " and clip to bbox (epsg:4326)"
elif geom is not None:
clip_str = f" and clip to geom (epsg:{geom.crs.to_epsg():d})"
if geom is not None:
# make sure geom is projected > buffer in meters!
if geom.crs.is_geographic and buffer > 0:
geom = geom.to_crs(3857)
geom = geom.buffer(buffer) # a buffer with zero fixes some topology errors
bbox_str = ", ".join([f"{c:.3f}" for c in geom.total_bounds])
clip_str = f"{clip_str} [{bbox_str}]"

return geom, clip_str
Loading
Loading