diff --git a/docs/api.rst b/docs/api.rst index 74d1a6fb6..42228ec58 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -22,6 +22,8 @@ General :toctree: _generated data_catalog.DataCatalog + data_catalog.DataCatalog.get_source + data_catalog.DataCatalog.iter_sources data_catalog.DataCatalog.sources data_catalog.DataCatalog.keys data_catalog.DataCatalog.predefined_catalogs @@ -36,12 +38,13 @@ Add data sources .. autosummary:: :toctree: _generated - data_catalog.DataCatalog.set_predefined_catalogs + data_catalog.DataCatalog.add_source + data_catalog.DataCatalog.update data_catalog.DataCatalog.from_predefined_catalogs data_catalog.DataCatalog.from_archive data_catalog.DataCatalog.from_yml data_catalog.DataCatalog.from_dict - data_catalog.DataCatalog.update + data_catalog.DataCatalog.set_predefined_catalogs .. _api_data_catalog_get: @@ -54,7 +57,7 @@ Get data data_catalog.DataCatalog.get_rasterdataset data_catalog.DataCatalog.get_geodataset data_catalog.DataCatalog.get_geodataframe - + data_catalog.DataCatalog.get_dataframe RasterDataset diff --git a/docs/changelog.rst b/docs/changelog.rst index 670cfdbb5..17288458e 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -12,6 +12,7 @@ Unreleased Added ----- - docs now include a dropdown for selecting older versions of the docs. (#457) +- Support for loading the same data source but from different places (e.g. local & aws) Changed ------- diff --git a/docs/dev/dev_install.rst b/docs/dev/dev_install.rst index 35ce98d90..694e1a572 100644 --- a/docs/dev/dev_install.rst +++ b/docs/dev/dev_install.rst @@ -99,7 +99,7 @@ Finally, create a developer installation of HydroMT: see :ref:`installation guide ` for the difference between both. Fine tuned installation ----------------------- +----------------------- If you want a more fine tuned installation you can also specify exactly which dependency groups you'd like. For instance, this will create an environment diff --git a/docs/user_guide/data_prepare_cat.rst b/docs/user_guide/data_prepare_cat.rst index 3e162b48a..53494c267 100644 --- a/docs/user_guide/data_prepare_cat.rst +++ b/docs/user_guide/data_prepare_cat.rst @@ -31,6 +31,7 @@ The ``rename``, ``nodata``, ``unit_add`` and ``unit_mult`` options are set per v meta: root: /path/to/data_root/ version: version + name: data_catalog_name my_dataset: crs: EPSG/WKT data_type: RasterDataset/GeoDataset/GeoDataFrame/DataFrame @@ -48,8 +49,6 @@ The ``rename``, ``nodata``, ``unit_add`` and ``unit_mult`` options are set per v nodata: new_variable_name: value path: /absolut_path/to/my_dataset.extension OR relative_path/to_my_dataset.extension - placeholders: - [placeholder_key: [placeholder_values]] rename: old_variable_name: new_variable_name unit_add: @@ -91,6 +90,10 @@ A full list of **optional data source arguments** is given below - **filesystem** (required if different than local): specify if the data is stored locally or remotely (e.g cloud). Supported filesystems are *local* for local data, *gcs* for data stored on Google Cloud Storage, and *aws* for data stored on Amazon Web Services. Profile or authentication information can be passed to ``driver_kwargs`` via *storage_options*. +- **version** (recommended): data source version + *NOTE*: New in HydroMT v0.8.1 +- **provider** (recommended): data source provider + *NOTE*: New in HydroMT v0.8.1 - **meta** (recommended): additional information on the dataset organized in a sub-list. Good meta data includes a *source_url*, *source_license*, *source_version*, *paper_ref*, *paper_doi*, *category*, etc. These are added to the data attributes. Usual categories within HydroMT are *geography*, *topography*, *hydrography*, *meteo*, *landuse*, *ocean*, *socio-economic*, *observed data* @@ -103,23 +106,81 @@ A full list of **optional data source arguments** is given below - **unit_mult**: multiply the input data by a value for unit conversion (e.g. 1000 for conversion from m to mm of precipitation). - **attrs** (optional): This argument allows for setting attributes like the unit or long name to variables. *NOTE*: New in HydroMT v0.7.2 +- **placeholder** (optional): this argument can be used to generate multiple sources with a single entry in the data catalog file. If different files follow a logical + nomenclature, multiple data sources can be defined by iterating through all possible combinations of the placeholders. The placeholder names should be given in the + source name and the path and its values listed under the placeholder argument. +- **variants** (optional): This argument can be used to generate multiple sources with the same name, but from different providers or versions. + Any keys here are essentially used to extend/overwrite the base arguments. + +The following are **optional data source arguments** for *RasterDataset*, *GeoDataFrame*, and *GeoDataset*: + - **crs** (required if missing in the data): EPSG code or WKT string of the reference coordinate system of the data. Only used if not crs can be inferred from the input data. + +The following are **optional data source arguments** for *RasterDataset*: + - **zoom_level** (optional): this argument can be used for a *RasterDatasets* that contain multiple zoom levels of different resolution. It should contain a list of numeric zoom levels that correspond to the `zoom_level` key in file path, e.g., ``"path/to/my/files/{zoom_level}/data.tif"`` and corresponding resolution, expressed in the unit of the data crs. The *crs* argument is therefore required when using zoom_levels to correctly interpret the unit of the resolution. The required zoom level can be requested from HydroMT as argument to the `DataCatalog.get_rasterdataset` method, see `Reading tiled raster data with different zoom levels <../_examples/working_with_tiled_raster_data.ipynb>`_. -- **placeholder** (optional): this argument can be used to generate multiple sources with a single entry in the data catalog file. If different files follow a logical - nomenclature, multiple data sources can be defined by iterating through all possible combinations of the placeholders. The placeholder names should be given in the - source name and the path and its values listed under the placeholder argument. .. note:: - The **alias** argument will be deprecated and should no longer be used, see `github issue for more information `_ + The **alias** argument will be deprecated and should no longer be used, see + `github issue for more information `_ .. warning:: - Using cloud data is still experimental and only supported for *DataFrame*, *RasterDataset* and *Geodataset* with *zarr*. *RasterDataset* with *raster* driver is also possible + Using cloud data is still experimental and only supported for *DataFrame*, *RasterDataset* and + *Geodataset* with *zarr*. *RasterDataset* with *raster* driver is also possible but in case of multiple files (mosaic) we strongly recommend using a vrt file for speed and computation efficiency. + +Data variants +------------- + +Data variants are used to define multiple data sources with the same name, but from different providers or versions. +Below, we show an example of a data catalog for a RasterDataset with multiple variants of the same data source (esa_worldcover), +but this works identical for other data types. +Here, the *crs*, *data_type*, *driver* and *filesystem* are common arguments used for all variants. +The variant arguments are used to extend and/or overwrite the common arguments, creating new sources. + +.. code-block:: yaml + + esa_worldcover: + crs: 4326 + data_type: RasterDataset + driver: raster + filesystem: local + variants: + - provider: local + version: 2021 + path: landuse/esa_worldcover_2021/esa-worldcover.vrt + - provider: local + version: 2020 + path: landuse/esa_worldcover/esa-worldcover.vrt + - provider: aws + version: 2020 + path: s3://esa-worldcover/v100/2020/ESA_WorldCover_10m_2020_v100_Map_AWS.vrt + filesystem: s3 + + +To request a specific variant, the variant arguments can be used as keyword arguments +to the `DataCatalog.get_rasterdataset` method, see code below. +By default the newest version from the last provider is returned when requesting a data +source with specific version or provider. +Requesting a specific version from a HydroMT configuration file is also possible, see :ref:`model_config`. + +.. code-block:: python + + from hydromt import DataCatalog + dc = DataCatalog.from_yml("data_catalog.yml") + # get the default version. This will return the latest (2020) version from the last provider (aws) + ds = dc.get_rasterdataset("esa_worldcover") + # get a 2020 version. This will return the 2020 version from the last provider (aws) + ds = dc.get_rasterdataset("esa_worldcover", version=2020) + # get a 2021 version. This will return the 2021 version from the local provider as this verion is not available from aws . + ds = dc.get_rasterdataset("esa_worldcover", version=2021) + # get the 2020 version from the local provider + ds = dc.get_rasterdataset("esa_worldcover", version=2020, provider="local") diff --git a/docs/user_guide/model_config.rst b/docs/user_guide/model_config.rst index 0a21d0e02..c274ad757 100644 --- a/docs/user_guide/model_config.rst +++ b/docs/user_guide/model_config.rst @@ -54,3 +54,9 @@ An example .yaml file is shown below. Note that this .yaml file does not apply t setup_manning_roughness: lulc_fn: globcover # source name of landuse-landcover data mapping_fn: globcover_mapping # source name of mapping table converting lulc classes to N values + + setup_infiltration: + soil_fn: + source: soil_data # source name of soil data with specific version + version: 1.0 # version of soil data + mapping_fn: soil_mapping # source name of mapping table converting soil classes to infiltration parameters diff --git a/examples/reading_vector_data.ipynb b/examples/reading_vector_data.ipynb index 355630f8d..e9340ca7c 100644 --- a/examples/reading_vector_data.ipynb +++ b/examples/reading_vector_data.ipynb @@ -70,10 +70,9 @@ "metadata": {}, "outputs": [], "source": [ - "# supported file formats\n", - "import fiona\n", - "\n", - "print(list(fiona.supported_drivers.keys()))" + "# uncomment to see list of supported file formats\n", + "# import fiona\n", + "# print(list(fiona.supported_drivers.keys()))" ] }, { diff --git a/hydromt/cli/api.py b/hydromt/cli/api.py index 8d4aaa390..76b52434a 100644 --- a/hydromt/cli/api.py +++ b/hydromt/cli/api.py @@ -100,13 +100,12 @@ def get_datasets(data_libs: Union[List, str]) -> Dict: for accepted yaml format. """ data_catalog = DataCatalog(data_libs) - datasets = data_catalog.sources dataset_sources = { "RasterDatasetSource": [], "GeoDatasetSource": [], "GeoDataframeSource": [], } - for k, v in datasets.items(): + for k, v in data_catalog.iter_sources(): if v.data_type == "RasterDataset": dataset_sources["RasterDatasetSource"].append(k) elif v.data_type == "GeoDataFrame": @@ -167,7 +166,7 @@ def get_region( # retrieve global hydrography data (lazy!) ds_org = data_catalog.get_rasterdataset(hydrography_fn) if "bounds" not in region: - region.update(basin_index=data_catalog[basin_index_fn]) + region.update(basin_index=data_catalog.get_source(basin_index_fn)) # get basin geometry geom, xy = workflows.get_basin_geometry( ds=ds_org, diff --git a/hydromt/data_adapter/data_adapter.py b/hydromt/data_adapter/data_adapter.py index 10cf09d16..2ae7eea49 100644 --- a/hydromt/data_adapter/data_adapter.py +++ b/hydromt/data_adapter/data_adapter.py @@ -5,6 +5,7 @@ from abc import ABCMeta, abstractmethod from itertools import product from string import Formatter +from typing import Optional import geopandas as gpd import numpy as np @@ -125,6 +126,8 @@ def __init__( driver_kwargs={}, name="", # optional for now catalog_name="", # optional for now + provider: Optional[str] = None, + version: Optional[str] = None, ): """General Interface to data source for HydroMT. @@ -170,6 +173,8 @@ def __init__( """ self.name = name self.catalog_name = catalog_name + self.provider = provider + self.version = str(version) if version is not None else None # version as str # general arguments self.path = path # driver and driver keyword-arguments @@ -227,6 +232,13 @@ def __repr__(self): """Pretty print string representation of self.""" return self.__str__() + def __eq__(self, other: object) -> bool: + """Return True if self and other are equal.""" + if type(other) is type(self): + return self.to_dict() == other.to_dict() + else: + return False + def _parse_zoom_level( self, zoom_level: int | tuple = None, diff --git a/hydromt/data_adapter/dataframe.py b/hydromt/data_adapter/dataframe.py index 67e5691cc..895d14dc0 100644 --- a/hydromt/data_adapter/dataframe.py +++ b/hydromt/data_adapter/dataframe.py @@ -3,7 +3,7 @@ import os import warnings from os.path import join -from typing import Union +from typing import Optional, Union import numpy as np import pandas as pd @@ -27,9 +27,9 @@ class DataFrameAdapter(DataAdapter): def __init__( self, path: str, - driver: str = None, + driver: Optional[str] = None, filesystem: str = "local", - nodata: Union[dict, float, int] = None, + nodata: Optional[Union[dict, float, int]] = None, rename: dict = {}, unit_mult: dict = {}, unit_add: dict = {}, @@ -38,6 +38,8 @@ def __init__( driver_kwargs: dict = {}, name: str = "", # optional for now catalog_name: str = "", # optional for now + provider: Optional[str] = None, + version: Optional[str] = None, **kwargs, ): """Initiate data adapter for 2D tabular data. @@ -106,6 +108,8 @@ def __init__( driver_kwargs=driver_kwargs, name=name, catalog_name=catalog_name, + provider=provider, + version=version, ) def to_file( diff --git a/hydromt/data_adapter/geodataframe.py b/hydromt/data_adapter/geodataframe.py index eddfac0de..989c43fe9 100644 --- a/hydromt/data_adapter/geodataframe.py +++ b/hydromt/data_adapter/geodataframe.py @@ -46,6 +46,8 @@ def __init__( driver_kwargs: dict = {}, name: str = "", # optional for now catalog_name: str = "", # optional for now + provider=None, + version=None, **kwargs, ): """Initiate data adapter for geospatial vector data. @@ -116,6 +118,8 @@ def __init__( driver_kwargs=driver_kwargs, name=name, catalog_name=catalog_name, + provider=provider, + version=version, ) self.crs = crs diff --git a/hydromt/data_adapter/geodataset.py b/hydromt/data_adapter/geodataset.py index 12e84e14c..c7b6d39db 100644 --- a/hydromt/data_adapter/geodataset.py +++ b/hydromt/data_adapter/geodataset.py @@ -47,6 +47,8 @@ def __init__( driver_kwargs: dict = {}, name: str = "", # optional for now catalog_name: str = "", # optional for now + provider=None, + version=None, **kwargs, ): """Initiate data adapter for geospatial timeseries data. @@ -123,6 +125,8 @@ def __init__( driver_kwargs=driver_kwargs, name=name, catalog_name=catalog_name, + provider=provider, + version=version, ) self.crs = crs @@ -255,7 +259,6 @@ def get_data( ) kwargs = self.driver_kwargs.copy() - # parse geom, bbox and buffer arguments clip_str = "" if geom is None and bbox is not None: diff --git a/hydromt/data_adapter/rasterdataset.py b/hydromt/data_adapter/rasterdataset.py index 5c0e8a6b7..97b40f47e 100644 --- a/hydromt/data_adapter/rasterdataset.py +++ b/hydromt/data_adapter/rasterdataset.py @@ -50,6 +50,8 @@ def __init__( zoom_levels: dict = {}, name: str = "", # optional for now catalog_name: str = "", # optional for now + provider=None, + version=None, **kwargs, ): """Initiate data adapter for geospatial raster data. @@ -127,6 +129,8 @@ def __init__( driver_kwargs=driver_kwargs, name=name, catalog_name=catalog_name, + provider=provider, + version=version, ) self.crs = crs self.zoom_levels = zoom_levels @@ -273,6 +277,7 @@ def get_data( ) kwargs = self.driver_kwargs.copy() + # zarr can use storage options directly, the rest should be converted to # file-like objects if "storage_options" in kwargs and self.driver == "raster": diff --git a/hydromt/data_catalog.py b/hydromt/data_catalog.py index e59d2fe93..e20889ebb 100644 --- a/hydromt/data_catalog.py +++ b/hydromt/data_catalog.py @@ -5,7 +5,6 @@ from __future__ import annotations import copy -import inspect import itertools import logging import os @@ -13,7 +12,7 @@ import warnings from os.path import abspath, basename, exists, isdir, isfile, join from pathlib import Path -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple, TypedDict, Union import geopandas as gpd import numpy as np @@ -23,6 +22,8 @@ import yaml from packaging.version import Version +from hydromt.utils import partition_dictionaries + from . import __version__ from .data_adapter import ( DataAdapter, @@ -39,6 +40,11 @@ "DataCatalog", ] +# just for typehints +SourceSpecDict = TypedDict( + "SourceSpecDict", {"source": str, "provider": str, "version": Union[str, int]} +) + class DataCatalog(object): @@ -128,8 +134,17 @@ def sources(self) -> Dict: return self._sources @property - def keys(self) -> List: + def keys(self) -> List[str]: """Returns list of data source names.""" + warnings.warn( + "Using iterating over the DataCatalog directly is deprecated." + "Please use cat.get_source()", + DeprecationWarning, + ) + return list(self._sources.keys()) + + def get_source_names(self) -> List[str]: + """Return a list of all available data source names.""" return list(self._sources.keys()) @property @@ -139,37 +154,200 @@ def predefined_catalogs(self) -> Dict: self.set_predefined_catalogs() return self._catalogs + def get_source( + self, source: str, provider: Optional[str] = None, version: Optional[str] = None + ) -> DataAdapter: + """Return a data source. + + Parameters + ---------- + source : str + Name of the data source. + provider : str, optional + Name of the data provider, by default None. + By default the last added provider is returned. + version : str, optional + Version of the data source, by default None. + By default the newest version of the requested provider is returned. + + Returns + ------- + DataAdapter + DataAdapter object. + """ + source = str(source) + if source not in self._sources: + available_sources = sorted(list(self._sources.keys())) + raise KeyError( + f"Requested unknown data source '{source}' " + f"available sources are: {available_sources}" + ) + available_providers = self._sources[source] + + # make sure all arguments are strings + provider = str(provider) if provider is not None else provider + version = str(version) if version is not None else version + + # find provider matching requested version + if provider is None and version is not None: + providers = [p for p, v in available_providers.items() if version in v] + if len(providers) > 0: # error raised later if no provider found + provider = providers[-1] + + # check if provider is available, otherwise use last added provider + if provider is None: + requested_provider = list(available_providers.keys())[-1] + else: + requested_provider = provider + if requested_provider not in available_providers: + providers = sorted(list(available_providers.keys())) + raise KeyError( + f"Requested unknown provider '{requested_provider}' for " + f"data source '{source}' available providers are: {providers}" + ) + available_versions = available_providers[requested_provider] + + # check if version is available, otherwise use last added version which is + # always the newest version + if version is None: + requested_version = list(available_versions.keys())[-1] + else: + requested_version = version + if requested_version not in available_versions: + data_versions = sorted(list(map(str, available_versions.keys()))) + raise KeyError( + f"Requested unknown version '{requested_version}' for " + f"data source '{source}' and provider '{requested_provider}' " + f"available versions are {data_versions}" + ) + + return self._sources[source][requested_provider][requested_version] + + def add_source(self, source: str, adapter: DataAdapter) -> None: + """Add a new data source to the data catalog. + + The data version and provider are extracted from the DataAdapter object. + + Parameters + ---------- + source : str + Name of the data source. + adapter : DataAdapter + DataAdapter object. + """ + if not isinstance(adapter, DataAdapter): + raise ValueError("Value must be DataAdapter") + + if hasattr(adapter, "version") and adapter.version is not None: + version = adapter.version + else: + version = "_UNSPECIFIED_" # make sure this comes first in sorted list + + if hasattr(adapter, "provider") and adapter.provider is not None: + provider = adapter.provider + else: + provider = adapter.catalog_name + + if source not in self._sources: + self._sources[source] = {} + else: # check if data type is the same as adapter with same name + adapter0 = next(iter(next(iter(self._sources[source].values())).values())) + if adapter0.data_type != adapter.data_type: + raise ValueError( + f"Data source '{source}' already exists with data type " + f"'{adapter0.data_type}' but new data source has data type " + f"'{adapter.data_type}'." + ) + + if provider not in self._sources[source]: + versions = {version: adapter} + else: + versions = self._sources[source][provider] + if provider in self._sources[source] and version in versions: + warnings.warn( + f"overwriting data source '{source}' with " + f"provider {provider} and version {version}.", + UserWarning, + ) + # update and sort dictionary -> make sure newest version is last + versions.update({version: adapter}) + versions = {k: versions[k] for k in sorted(list(versions.keys()))} + + self._sources[source][provider] = versions + def __getitem__(self, key: str) -> DataAdapter: """Get the source.""" - return self._sources[key] + warnings.warn( + 'Using iterating over the DataCatalog directly is deprecated."\ + " Please use cat.get_source("name")', + DeprecationWarning, + ) + return self.get_source(key) def __setitem__(self, key: str, value: DataAdapter) -> None: """Set or update adaptors.""" - if not isinstance(value, DataAdapter): - raise ValueError(f"Value must be DataAdapter, not {type(key).__name__}.") - if key in self._sources: - self.logger.warning(f"Overwriting data source {key}.") - return self._sources.__setitem__(key, value) + warnings.warn( + "Using DataCatalog as a dictionary directly is deprecated." + " Please use cat.add_source(adapter)", + DeprecationWarning, + ) + self.add_source(key, value) + + def iter_sources(self) -> List[Tuple[str, DataAdapter]]: + """Return a flat list of all available data sources with no duplicates.""" + ans = [] + for source_name, available_providers in self._sources.items(): + for _, available_versions in available_providers.items(): + for _, adapter in available_versions.items(): + ans.append((source_name, adapter)) + + return ans def __iter__(self): """Iterate over sources.""" - return self._sources.__iter__() + warnings.warn( + "Using iterating over the DataCatalog directly is deprecated." + " Please use cat.iter_sources()", + DeprecationWarning, + ) + return self.iter_sources() def __len__(self): """Return number of sources.""" - return self._sources.__len__() + return len(self.iter_sources()) def __repr__(self): """Prettyprint the sources.""" return self.to_dataframe().__repr__() + def __eq__(self, other) -> bool: + if type(other) is type(self): + if len(self) != len(other): + return False + for name, source in self.iter_sources(): + try: + other_source = other.get_source( + name, provider=source.provider, version=source.version + ) + except KeyError: + return False + if source != other_source: + return False + else: + return False + return True + def _repr_html_(self): return self.to_dataframe()._repr_html_() def update(self, **kwargs) -> None: - """Add data sources to library.""" + """Add data sources to library or update them.""" for k, v in kwargs.items(): - self[k] = v + self.add_source(k, v) + + def update_sources(self, **kwargs) -> None: + """Add data sources to library or update them.""" + self.update(**kwargs) def set_predefined_catalogs(self, urlpath: Union[Path, str] = None) -> Dict: """Initialise the predefined catalogs.""" @@ -195,7 +373,7 @@ def set_predefined_catalogs(self, urlpath: Union[Path, str] = None) -> Dict: def from_artifacts( self, name: str = "artifact_data", version: str = "latest" - ) -> None: + ) -> DataCatalog: """Parse artifacts. Deprecated method. Use @@ -207,15 +385,35 @@ def from_artifacts( Catalog name. If None (default) sample data is downloaded. version : str, optional Release version. By default it takes the latest known release. + + Returns + ------- + DataCatalog + DataCatalog object with parsed artifact data. """ warnings.warn( '"from_artifacts" is deprecated. Use "from_predefined_catalogs instead".', DeprecationWarning, ) - self.from_predefined_catalogs(name, version) + return self.from_predefined_catalogs(name, version) + + def from_predefined_catalogs( + self, name: str, version: str = "latest" + ) -> DataCatalog: + """Add data sources from a predefined data catalog. + + Parameters + ---------- + name : str + Catalog name. + version : str, optional + Catlog release version. By default it takes the latest known release. - def from_predefined_catalogs(self, name: str, version: str = "latest") -> None: - """Generate a catalogue from one of the predefined ones.""" + Returns + ------- + DataCatalog + DataCatalog object with parsed predefined catalog added. + """ if "=" in name: name, version = name.split("=")[0], name.split("=")[-1] if name not in self.predefined_catalogs: @@ -236,12 +434,27 @@ def from_predefined_catalogs(self, name: str, version: str = "latest") -> None: self.from_archive(urlpath, name=name, version=version) else: self.logger.info(f"Reading data catalog {name} {version}") - self.from_yml(urlpath) + self.from_yml(urlpath, catalog_name=name) def from_archive( self, urlpath: Union[Path, str], version: str = None, name: str = None - ) -> None: - """Read a data archive including a data_catalog.yml file.""" + ) -> DataCatalog: + """Read a data archive including a data_catalog.yml file. + + Parameters + ---------- + urlpath : str, Path + Path or url to data archive. + version : str, optional + Version of data archive, by default None. + name : str, optional + Name of data catalog, by default None. + + Returns + ------- + DataCatalog + DataCatalog object with parsed data archive added. + """ name = basename(urlpath).split(".")[0] if name is None else name root = join(self._cache_dir, name) if version is not None: @@ -258,11 +471,15 @@ def from_archive( self.logger.debug(f"Unpacking data from {archive_fn}") shutil.unpack_archive(archive_fn, root) # parse catalog - self.from_yml(yml_fn) + return self.from_yml(yml_fn, catalog_name=name) def from_yml( - self, urlpath: Union[Path, str], root: str = None, mark_used: bool = False - ) -> None: + self, + urlpath: Union[Path, str], + root: str = None, + catalog_name: str = None, + mark_used: bool = False, + ) -> DataCatalog: """Add data sources based on yaml file. Parameters @@ -293,9 +510,8 @@ def from_yml( data_type: driver: filesystem: - kwargs: + driver_kwargs: : - crs: nodata: : rename: @@ -314,9 +530,11 @@ def from_yml( placeholders: : : - zoom_levels: - : - : + + Returns + ------- + DataCatalog + DataCatalog object with parsed yaml file added. """ self.logger.info(f"Parsing data catalog from {urlpath}") yml = _yml_from_uri_or_path(urlpath) @@ -324,24 +542,31 @@ def from_yml( meta = dict() # legacy code with root/category at highest yml level if "root" in yml: + warnings.warn( + "The 'root' key is deprecated, use 'meta: root' instead.", + DeprecationWarning, + ) meta.update(root=yml.pop("root")) if "category" in yml: + warnings.warn( + "The 'category' key is deprecated, use 'meta: category' instead.", + DeprecationWarning, + ) meta.update(category=yml.pop("category")) + # read meta data meta = yml.pop("meta", meta) - self_version = Version(__version__) + # check version required hydromt version hydromt_version = meta.get("hydromt_version", __version__) + self_version = Version(__version__) yml_version = Version(hydromt_version) - if yml_version > self_version: self.logger.warning( f"Specified HydroMT version ({hydromt_version}) \ more recent than installed version ({__version__}).", ) - - catalog_name = meta.get("name", "".join(basename(urlpath).split(".")[:-1])) - - # TODO keep meta data!! Note only possible if yml files are not merged + if catalog_name is None: + catalog_name = meta.get("name", "".join(basename(urlpath).split(".")[:-1])) if root is None: root = meta.get("root", os.path.dirname(urlpath)) self.from_dict( @@ -359,7 +584,7 @@ def from_dict( root: Union[str, Path] = None, category: str = None, mark_used: bool = False, - ) -> None: + ) -> DataCatalog: """Add data sources based on dictionary. Parameters @@ -389,15 +614,13 @@ def from_dict( "data_type": , "driver": , "filesystem": , - "kwargs": {: }, - "crs": , + "driver_kwargs": {: }, "nodata": , "rename": {: }, "unit_add": {: }, "unit_mult": {: }, "meta": {...}, "placeholders": {: }, - "zoom_levels": {: }, } : { ... @@ -405,15 +628,26 @@ def from_dict( } """ - data_dict = _parse_data_dict( - data_dict, - catalog_name=catalog_name, - root=root, - category=category, - ) - self.update(**data_dict) - if mark_used: - self._used_data.extend(list(data_dict.keys())) + meta = data_dict.pop("meta", {}) + if "root" in meta and root is None: + root = meta.pop("root") + if "category" in meta and category is None: + category = meta.pop("category") + if "name" in meta and catalog_name is None: + catalog_name = meta.pop("name") + for name, source_dict in _denormalise_data_dict(data_dict): + adapter = _parse_data_source_dict( + name, + source_dict, + catalog_name=catalog_name, + root=root, + category=category, + ) + self.add_source(name, adapter) + if mark_used: + self._used_data.append(name) + + return self def to_yml( self, @@ -484,10 +718,12 @@ def to_dict( root = abspath(root) meta.update(**{"root": root}) root_drive = os.path.splitdrive(root)[0] - for name, source in sorted(self._sources.items()): # alphabetical order + sorted_sources = sorted(self.iter_sources(), key=lambda x: x[0]) + for name, source in sorted_sources: # alphabetical order if source_names is not None and name not in source_names: continue source_dict = source.to_dict() + if root is not None: path = source_dict["path"] # is abspath source_drive = os.path.splitdrive(path)[0] @@ -500,19 +736,52 @@ def to_dict( ).replace("\\", "/") # remove non serializable entries to prevent errors source_dict = _process_dict(source_dict, logger=self.logger) # TODO TEST - sources_out.update({name: source_dict}) + if name in sources_out: + existing = sources_out.pop(name) + if existing == source_dict: + sources_out.update({name: source_dict}) + continue + if "variants" in existing: + variants = existing.pop("variants") + _, variant, _ = partition_dictionaries(source_dict, existing) + variants.append(variant) + existing["variants"] = variants + else: + base, diff_existing, diff_new = partition_dictionaries( + source_dict, existing + ) + # provider and version should always be in variants list + provider = base.pop("provider", None) + if provider is not None: + diff_existing["provider"] = provider + diff_new["provider"] = provider + version = base.pop("version", None) + if version is not None: + diff_existing["version"] = version + diff_new["version"] = version + base["variants"] = [diff_new, diff_existing] + sources_out[name] = base + else: + sources_out.update({name: source_dict}) if meta: sources_out = {"meta": meta, **sources_out} return sources_out def to_dataframe(self, source_names: List = []) -> pd.DataFrame: """Return data catalog summary as DataFrame.""" - d = dict() - for name, source in self._sources.items(): + d = [] + for name, source in self.iter_sources(): if len(source_names) > 0 and name not in source_names: continue - d[name] = source.summary() - return pd.DataFrame.from_dict(d, orient="index") + d.append( + { + "name": name, + "provider": source.provider, + "version": source.version, + **source.summary(), + } + ) + return pd.DataFrame.from_records(d).set_index("name") def export_data( self, @@ -563,7 +832,18 @@ def export_data( variables = name.split("[")[-1].split("]")[0].split(",") name = name.split("[")[0] source_vars[name] = variables - sources[name] = copy.deepcopy(self.sources[name]) + + source = self.get_source(name) + provider = source.provider + version = source.version + + if name not in sources: + sources[name] = {} + if provider not in sources[name]: + sources[name][provider] = {} + + sources[name][provider][version] = copy.deepcopy(source) + else: sources = copy.deepcopy(self.sources) @@ -576,62 +856,78 @@ def export_data( sources_out = {} # export data and update sources - for key, source in sources.items(): - try: - # read slice of source and write to file - self.logger.debug(f"Exporting {key}.") - if not unit_conversion: - unit_mult = source.unit_mult - unit_add = source.unit_add - source.unit_mult = {} - source.unit_add = {} - fn_out, driver, source_kwargs = source.to_file( - data_root=data_root, - data_name=key, - variables=source_vars.get(key, None), - bbox=bbox, - time_tuple=time_tuple, - logger=self.logger, - ) - if fn_out is None: - self.logger.warning(f"{key} file contains no data within domain") - continue - # update path & driver and remove kwargs and rename in output sources - if unit_conversion: - source.unit_mult = {} - source.unit_add = {} - else: - source.unit_mult = unit_mult - source.unit_add = unit_add - source.path = fn_out - source.driver = driver - source.filesystem = "local" - source.driver_kwargs = {} - source.rename = {} - if key in sources_out: - self.logger.warning( - f"{key} already exists in data catalog and is overwritten." - ) - sources_out[key] = source - except FileNotFoundError: - self.logger.warning(f"{key} file not found at {source.path}") + for key, available_variants in sources.items(): + for provider, available_versions in available_variants.items(): + for version, source in available_versions.items(): + try: + # read slice of source and write to file + self.logger.debug(f"Exporting {key}.") + if not unit_conversion: + unit_mult = source.unit_mult + unit_add = source.unit_add + source.unit_mult = {} + source.unit_add = {} + fn_out, driver, source_kwargs = source.to_file( + data_root=data_root, + data_name=key, + variables=source_vars.get(key, None), + bbox=bbox, + time_tuple=time_tuple, + logger=self.logger, + ) + if fn_out is None: + self.logger.warning( + f"{key} file contains no data within domain" + ) + continue + # update path & driver and remove kwargs + # and rename in output sources + if unit_conversion: + source.unit_mult = {} + source.unit_add = {} + else: + source.unit_mult = unit_mult + source.unit_add = unit_add + source.path = fn_out + source.driver = driver + source.filesystem = "local" + source.driver_kwargs = {} + source.rename = {} + if key in sources_out: + self.logger.warning( + f"{key} already exists in data catalog, overwriting..." + ) + if key not in sources_out: + sources_out[key] = {} + if provider not in sources_out[key]: + sources_out[key][provider] = {} + + sources_out[key][provider][version] = source + except FileNotFoundError: + self.logger.warning(f"{key} file not found at {source.path}") # write data catalog to yml data_catalog_out = DataCatalog() - data_catalog_out._sources = sources_out + for key, available_variants in sources_out.items(): + for provider, available_versions in available_variants.items(): + for version, adapter in available_versions.items(): + data_catalog_out.add_source(key, adapter) + data_catalog_out.to_yml(fn, root="auto", meta=meta) def get_rasterdataset( self, - data_like: Union[str, Path, xr.Dataset, xr.DataArray], - bbox: List = None, - geom: gpd.GeoDataFrame = None, - zoom_level: int | tuple = None, + data_like: Union[str, SourceSpecDict, Path, xr.Dataset, xr.DataArray], + bbox: Optional[List] = None, + geom: Optional[gpd.GeoDataFrame] = None, + zoom_level: Optional[int | tuple] = None, buffer: Union[float, int] = 0, - align: bool = None, - variables: Union[List, str] = None, - time_tuple: Tuple = None, - single_var_as_array: bool = True, + align: Optional[bool] = None, + variables: Optional[Union[List, str]] = None, + time_tuple: Optional[Tuple] = None, + single_var_as_array: Optional[bool] = True, + provider: Optional[str] = None, + version: Optional[str] = None, **kwargs, ) -> xr.Dataset: """Return a clipped, sliced and unified RasterDataset. @@ -648,10 +944,12 @@ def get_rasterdataset( Arguments --------- - data_like: str, Path, xr.Dataset, xr.Datarray - Data catalog key, path to raster file or raster xarray data object. + data_like: str, Path, Dict, xr.Dataset, xr.Datarray + DataCatalog key, path to raster file or raster xarray data object. + The catalog key can be a string or a dictionary with the following keys: + {'name', 'provider', 'version'}. If a path to a raster file is provided it will be added - to the data_catalog with its based on the file basename without extension. + to the catalog with its based on the file basename. bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest (in WGS84 coordinates). @@ -674,6 +972,10 @@ def get_rasterdataset( single_var_as_array: bool, optional If True, return a DataArray if the dataset consists of a single variable. If False, always return a Dataset. By default True. + provider: str, optional + Data source provider. If None (default) the last added provider is used. + version: str, optional + Data source version. If None (default) the newest version is used. **kwargs: Additional keyword arguments that are passed to the `RasterDatasetAdapter` function. Only used if `data_like` is a path to a raster file. @@ -683,21 +985,29 @@ def get_rasterdataset( obj: xarray.Dataset or xarray.DataArray RasterDataset """ - if isinstance(data_like, (xr.DataArray, xr.Dataset)): - return data_like - elif not isinstance(data_like, (str, Path)): - raise ValueError(f'Unknown raster data type "{type(data_like).__name__}"') + if isinstance(data_like, dict): + data_like, provider, version = _parse_data_like_dict( + data_like, provider, version + ) - if data_like not in self.sources and exists(abspath(data_like)): - path = str(abspath(data_like)) - name = basename(data_like).split(".")[0] - source = RasterDatasetAdapter(path=path, **kwargs) - self.update(**{name: source}) - elif data_like in self.sources: - name = data_like - source = self.sources[name] + if isinstance(data_like, (str, Path)): + if isinstance(data_like, str) and data_like in self.sources: + name = data_like + source = self.get_source(name, provider=provider, version=version) + elif exists(abspath(data_like)): + path = str(abspath(data_like)) + if "provider" not in kwargs: + kwargs.update({"provider": "local"}) + source = RasterDatasetAdapter(path=path, **kwargs) + name = basename(data_like) + self.add_source(name, source) + else: + raise FileNotFoundError(f"No such file or catalog source: {data_like}") + elif isinstance(data_like, (xr.DataArray, xr.Dataset)): + # TODO apply bbox, geom, buffer, align, variables, time_tuple + return data_like else: - raise FileNotFoundError(f"No such file or catalog key: {data_like}") + raise ValueError(f'Unknown raster data type "{type(data_like).__name__}"') self._used_data.append(name) self.logger.info( @@ -720,12 +1030,14 @@ def get_rasterdataset( def get_geodataframe( self, - data_like: Union[str, Path, gpd.GeoDataFrame], - bbox: List = None, - geom: gpd.GeoDataFrame = None, + data_like: Union[str, SourceSpecDict, Path, xr.Dataset, xr.DataArray], + bbox: Optional[List] = None, + geom: Optional[gpd.GeoDataFrame] = None, buffer: Union[float, int] = 0, - variables: Union[List, str] = None, + variables: Optional[Union[List, str]] = None, predicate: str = "intersects", + provider: Optional[str] = None, + version: Optional[str] = None, **kwargs, ): """Return a clipped and unified GeoDataFrame (vector). @@ -739,8 +1051,10 @@ def get_geodataframe( --------- data_like: str, Path, gpd.GeoDataFrame Data catalog key, path to vector file or a vector geopandas object. + The catalog key can be a string or a dictionary with the following keys: + {'name', 'provider', 'version'}. If a path to a vector file is provided it will be added - to the data_catalog with its based on the file basename without extension. + to the catalog with its based on the file basename. bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest (in WGS84 coordinates). @@ -757,6 +1071,10 @@ def get_geodataframe( variables : str or list of str, optional. Names of GeoDataFrame columns to return. By default all columns are returned. + provider: str, optional + Data source provider. If None (default) the last added provider is used. + version: str, optional + Data source version. If None (default) the newest version is used. **kwargs: Additional keyword arguments that are passed to the `GeoDataFrameAdapter` function. Only used if `data_like` is a path to a vector file. @@ -766,21 +1084,28 @@ def get_geodataframe( gdf: geopandas.GeoDataFrame GeoDataFrame """ - if isinstance(data_like, gpd.GeoDataFrame): + if isinstance(data_like, dict): + data_like, provider, version = _parse_data_like_dict( + data_like, provider, version + ) + if isinstance(data_like, (str, Path)): + if str(data_like) in self.sources: + name = data_like + source = self.get_source(name, provider=provider, version=version) + elif exists(abspath(data_like)): + path = str(abspath(data_like)) + if "provider" not in kwargs: + kwargs.update({"provider": "local"}) + source = GeoDataFrameAdapter(path=path, **kwargs) + name = basename(data_like) + self.add_source(name, source) + else: + raise FileNotFoundError(f"No such file or catalog source: {data_like}") + elif isinstance(data_like, gpd.GeoDataFrame): + # TODO apply bbox, geom, buffer, predicate, variables return data_like - elif not isinstance(data_like, (str, Path)): - raise ValueError(f'Unknown vector data type "{type(data_like).__name__}"') - - if data_like not in self.sources and exists(abspath(data_like)): - path = str(abspath(data_like)) - name = basename(data_like).split(".")[0] - source = GeoDataFrameAdapter(path=path, **kwargs) - self.update(**{name: source}) - elif data_like in self.sources: - name = data_like - source = self.sources[name] else: - raise FileNotFoundError(f"No such file or catalog key: {data_like}") + raise ValueError(f'Unknown vector data type "{type(data_like).__name__}"') self._used_data.append(name) self.logger.info( @@ -799,13 +1124,15 @@ def get_geodataframe( def get_geodataset( self, - data_like: Union[Path, str, xr.DataArray, xr.Dataset], - bbox: List = None, - geom: gpd.GeoDataFrame = None, + data_like: Union[str, SourceSpecDict, Path, xr.Dataset, xr.DataArray], + bbox: Optional[List] = None, + geom: Optional[gpd.GeoDataFrame] = None, buffer: Union[float, int] = 0, - variables: List = None, - time_tuple: Tuple = None, + variables: Optional[List] = None, + time_tuple: Optional[Tuple] = None, single_var_as_array: bool = True, + provider: Optional[str] = None, + version: Optional[str] = None, **kwargs, ) -> xr.Dataset: """Return a clipped, sliced and unified GeoDataset. @@ -823,8 +1150,10 @@ def get_geodataset( --------- data_like: str, Path, xr.Dataset, xr.DataArray Data catalog key, path to geodataset file or geodataset xarray object. + The catalog key can be a string or a dictionary with the following keys: + {'name', 'provider', 'version'}. If a path to a file is provided it will be added - to the data_catalog with its based on the file basename without extension. + to the catalog with its based on the file basename. bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest (in WGS84 coordinates). @@ -832,8 +1161,6 @@ def get_geodataset( A geometry defining the area of interest. buffer : float, optional Buffer around the `bbox` or `geom` area of interest in meters. By default 0. - align : float, optional - Resolution to align the bounding box, by default None variables : str or list of str, optional. Names of GeoDataset variables to return. By default all dataset variables are returned. @@ -852,21 +1179,28 @@ def get_geodataset( obj: xarray.Dataset or xarray.DataArray GeoDataset """ - if isinstance(data_like, (xr.DataArray, xr.Dataset)): + if isinstance(data_like, dict): + data_like, provider, version = _parse_data_like_dict( + data_like, provider, version + ) + if isinstance(data_like, (str, Path)): + if isinstance(data_like, str) and data_like in self.sources: + name = data_like + source = self.get_source(name, provider=provider, version=version) + elif exists(abspath(data_like)): + path = str(abspath(data_like)) + if "provider" not in kwargs: + kwargs.update({"provider": "local"}) + source = GeoDatasetAdapter(path=path, **kwargs) + name = basename(data_like) + self.add_source(name, source) + else: + raise FileNotFoundError(f"No such file or catalog source: {data_like}") + elif isinstance(data_like, (xr.DataArray, xr.Dataset)): + # TODO apply bbox, geom, buffer, variables, time_tuple return data_like - elif not isinstance(data_like, (str, Path)): - raise ValueError(f'Unknown geo data type "{type(data_like).__name__}"') - - if data_like not in self.sources and exists(abspath(data_like)): - path = str(abspath(data_like)) - name = basename(data_like).split(".")[0] - source = GeoDatasetAdapter(path=path, **kwargs) - self.update(**{name: source}) - elif data_like in self.sources: - name = data_like - source = self.sources[name] else: - raise FileNotFoundError(f"No such file or catalog key: {data_like}") + raise ValueError(f'Unknown geo data type "{type(data_like).__name__}"') self._used_data.append(name) self.logger.info( @@ -880,15 +1214,16 @@ def get_geodataset( variables=variables, time_tuple=time_tuple, single_var_as_array=single_var_as_array, - logger=self.logger, ) return obj def get_dataframe( self, - data_like: Union[str, Path, pd.DataFrame], - variables: list = None, - time_tuple: tuple = None, + data_like: Union[str, SourceSpecDict, Path, xr.Dataset, xr.DataArray], + variables: Optional[list] = None, + time_tuple: Optional[Tuple] = None, + provider: Optional[str] = None, + version: Optional[str] = None, **kwargs, ): """Return a unified and sliced DataFrame. @@ -896,9 +1231,11 @@ def get_dataframe( Parameters ---------- data_like : str, Path, pd.DataFrame - Data catalog key, path to tabular data file or tabular pandas dataframe - object. If a path to a tabular data file is provided it will be added - to the data_catalog with its based on the file basename without extension. + Data catalog key, path to tabular data file or tabular pandas dataframe. + The catalog key can be a string or a dictionary with the following keys: + {'name', 'provider', 'version'}. + If a path to a tabular data file is provided it will be added + to the catalog with its based on the file basename. variables : str or list of str, optional. Names of GeoDataset variables to return. By default all dataset variables are returned. @@ -914,21 +1251,27 @@ def get_dataframe( pd.DataFrame Tabular data """ - if isinstance(data_like, pd.DataFrame): + if isinstance(data_like, dict): + data_like, provider, version = _parse_data_like_dict( + data_like, provider, version + ) + if isinstance(data_like, (str, Path)): + if isinstance(data_like, str) and data_like in self.sources: + name = data_like + source = self.get_source(name, provider=provider, version=version) + elif exists(abspath(data_like)): + path = str(abspath(data_like)) + if "provider" not in kwargs: + kwargs.update({"provider": "local"}) + source = DataFrameAdapter(path=path, **kwargs) + name = basename(data_like) + self.add_source(name, source) + else: + raise FileNotFoundError(f"No such file or catalog source: {data_like}") + elif isinstance(data_like, pd.DataFrame): return data_like - elif not isinstance(data_like, (str, Path)): - raise ValueError(f'Unknown tabular data type "{type(data_like).__name__}"') - - if data_like not in self.sources and exists(abspath(data_like)): - path = str(abspath(data_like)) - name = basename(data_like).split(".")[0] - source = DataFrameAdapter(path=path, **kwargs) - self.update(**{name: source}) - elif data_like in self.sources: - name = data_like - source = self.sources[name] else: - raise FileNotFoundError(f"No such file or catalog key: {data_like}") + raise ValueError(f'Unknown tabular data type "{type(data_like).__name__}"') self._used_data.append(name) self.logger.info( @@ -943,8 +1286,26 @@ def get_dataframe( return obj -def _parse_data_dict( - data_dict: Dict, +def _parse_data_like_dict( + data_like: SourceSpecDict, + provider: Optional[str] = None, + version: Optional[str] = None, +): + if not SourceSpecDict.__required_keys__.issuperset(set(data_like.keys())): + unknown_keys = set(data_like.keys()) - SourceSpecDict.__required_keys__ + raise ValueError(f"Unknown keys in requested data source: {unknown_keys}") + elif "source" not in data_like: + raise ValueError("No source key found in requested data source") + else: + source = data_like.get("source") + provider = data_like.get("provider", provider) + version = data_like.get("version", version) + return source, provider, version + + +def _parse_data_source_dict( + name: str, + data_source_dict: Dict, catalog_name: str = "", root: Union[Path, str] = None, category: str = None, @@ -957,86 +1318,45 @@ def _parse_data_dict( "GeoDataset": GeoDatasetAdapter, "DataFrame": DataFrameAdapter, } - # NOTE: shouldn't the kwarg overwrite the dict/yml ? - if root is None: - root = data_dict.pop("root", None) - # parse data - data = dict() - for name, source in data_dict.items(): - source = source.copy() # important as we modify with pop - - if "alias" in source: - alias = source.pop("alias") - if alias not in data_dict: - raise ValueError(f"alias {alias} not found in data_dict.") - # use alias source but overwrite any attributes with original source - source_org = source.copy() - source = data_dict[alias].copy() - source.update(source_org) - if "path" not in source: - raise ValueError(f"{name}: Missing required path argument.") - data_type = source.pop("data_type", None) - if data_type is None: - raise ValueError(f"{name}: Data type missing.") - elif data_type not in ADAPTERS: - raise ValueError(f"{name}: Data type {data_type} unknown") - adapter = ADAPTERS.get(data_type) - # Only for local files - path = source.pop("path") - # if remote path, keep as is else call abs_path method to solve local files - if not _uri_validator(str(path)): - path = abs_path(root, path) - meta = source.pop("meta", {}) - if "category" not in meta and category is not None: - meta.update(category=category) - # Get unit attrs if given from source - attrs = source.pop("attrs", {}) - # lower kwargs for backwards compatability - # FIXME this could be problamatic if driver kwargs conflict DataAdapter - # arguments - driver_kwargs = source.pop("driver_kwargs", source.pop("kwargs", {})) - for driver_kwarg in driver_kwargs: - # required for geodataset where driver_kwargs can be a path - if "fn" in driver_kwarg: - driver_kwargs.update( - {driver_kwarg: abs_path(root, driver_kwargs[driver_kwarg])} - ) - for opt in source: - if "fn" in opt: # get absolute paths for file names - source.update({opt: abs_path(root, source[opt])}) - if "placeholders" in source: - # pop avoid placeholders being passed to adapter - options = source.pop("placeholders") - for combination in itertools.product(*options.values()): - path_n = path - name_n = name - for k, v in zip(options.keys(), combination): - path_n = path_n.replace("{" + k + "}", v) - name_n = name_n.replace("{" + k + "}", v) - - data[name_n] = adapter( - path=path_n, - name=name_n, - catalog_name=catalog_name, - meta=meta, - attrs=attrs, - driver_kwargs=driver_kwargs, - **source, # key word arguments specific to certain adaptors - ) - - else: - data[name] = adapter( - path=path, - name=name, - catalog_name=catalog_name, - meta=meta, - attrs=attrs, - driver_kwargs=driver_kwargs, - **source, + source = data_source_dict.copy() # important as we modify with pop + + # parse path + if "path" not in source: + raise ValueError(f"{name}: Missing required path argument.") + # if remote path, keep as is else call abs_path method to solve local files + path = source.pop("path") + if not _uri_validator(str(path)): + path = abs_path(root, path) + # parse data type > adapter + data_type = source.pop("data_type", None) + if data_type is None: + raise ValueError(f"{name}: Data type missing.") + elif data_type not in ADAPTERS: + raise ValueError(f"{name}: Data type {data_type} unknown") + adapter = ADAPTERS.get(data_type) + # source meta data + meta = source.pop("meta", {}) + if "category" not in meta and category is not None: + meta.update(category=category) + + # driver arguments + driver_kwargs = source.pop("driver_kwargs", source.pop("kwargs", {})) + for driver_kwarg in driver_kwargs: + # required for geodataset where driver_kwargs can be a path + if "fn" in driver_kwarg: + driver_kwargs.update( + {driver_kwarg: abs_path(root, driver_kwargs[driver_kwarg])} ) - return data + return adapter( + path=path, + name=name, + catalog_name=catalog_name, + meta=meta, + driver_kwargs=driver_kwargs, + **source, + ) def _yml_from_uri_or_path(uri_or_path: Union[Path, str]) -> Dict: @@ -1062,6 +1382,54 @@ def _process_dict(d: Dict, logger=logger) -> Dict: return d +def _denormalise_data_dict(data_dict) -> List[Tuple[str, Dict]]: + """Return a flat list of with data name, dictionary of input data_dict. + + Expand possible versions, aliases and variants in data_dict. + """ + data_list = [] + for name, source in data_dict.items(): + source = copy.deepcopy(source) + data_dicts = [] + if "alias" in source: + alias = source.pop("alias") + warnings.warn( + "The use of alias is deprecated, please add a version on the aliased" + "catalog instead.", + DeprecationWarning, + ) + if alias not in data_dict: + raise ValueError(f"alias {alias} not found in data_dict.") + # use alias source but overwrite any attributes with original source + source_copy = data_dict[alias].copy() + source_copy.update(source) + data_dicts.append({name: source_copy}) + elif "variants" in source: + variants = source.pop("variants") + for diff in variants: + source_copy = copy.deepcopy(source) + source_copy.update(**diff) + data_dicts.append({name: source_copy}) + elif "placeholders" in source: + options = source.pop("placeholders") + for combination in itertools.product(*options.values()): + source_copy = copy.deepcopy(source) + name_copy = name + for k, v in zip(options.keys(), combination): + name_copy = name_copy.replace("{" + k + "}", v) + source_copy["path"] = source_copy["path"].replace("{" + k + "}", v) + data_dicts.append({name_copy: source_copy}) + else: + data_list.append((name, source)) + continue + + # recursively denormalise in case of multiple denormalise keys in source + for item in data_dicts: + data_list.extend(_denormalise_data_dict(item)) + + return data_list + + def abs_path(root: Union[Path, str], rel_path: Union[Path, str]) -> str: path = Path(str(rel_path)) if not path.is_absolute(): @@ -1069,15 +1437,3 @@ def abs_path(root: Union[Path, str], rel_path: Union[Path, str]) -> str: rel_path = join(root, rel_path) path = Path(abspath(rel_path)) return str(path) - - -def _seperate_driver_kwargs_from_kwargs( - kwargs: dict, data_adapter: DataAdapter -) -> Tuple[dict]: - driver_kwargs = kwargs - driver_kwargs_copy = driver_kwargs.copy() - kwargs = {} - for k, v in driver_kwargs_copy.items(): - if k in inspect.signature(data_adapter.__init__).parameters.keys(): - kwargs.update({k: driver_kwargs.pop(k)}) - return kwargs, driver_kwargs diff --git a/hydromt/models/model_api.py b/hydromt/models/model_api.py index 9fee8aed4..99d29a14f 100644 --- a/hydromt/models/model_api.py +++ b/hydromt/models/model_api.py @@ -346,10 +346,15 @@ def setup_region( kind, region = workflows.parse_region(region, logger=self.logger) # NOTE: kind=outlet is deprecated! if kind in ["basin", "subbasin", "interbasin", "outlet"]: + if kind == "outlet": + warnings.warn( + "Using outlet as kind in setup_region is deprecated", + DeprecationWarning, + ) # retrieve global hydrography data (lazy!) ds_org = self.data_catalog.get_rasterdataset(hydrography_fn) if "bounds" not in region: - region.update(basin_index=self.data_catalog[basin_index_fn]) + region.update(basin_index=self.data_catalog.get_source(basin_index_fn)) # get basin geometry geom, xy = workflows.get_basin_geometry( ds=ds_org, diff --git a/hydromt/models/model_grid.py b/hydromt/models/model_grid.py index bc7f1f548..d96323ce5 100644 --- a/hydromt/models/model_grid.py +++ b/hydromt/models/model_grid.py @@ -577,7 +577,7 @@ def setup_grid( # retrieve global hydrography data (lazy!) ds_hyd = self.data_catalog.get_rasterdataset(hydrography_fn) if "bounds" not in region: - region.update(basin_index=self.data_catalog[basin_index_fn]) + region.update(basin_index=self.data_catalog.get_source(basin_index_fn)) # get basin geometry geom, xy = workflows.get_basin_geometry( ds=ds_hyd, diff --git a/hydromt/utils.py b/hydromt/utils.py new file mode 100644 index 000000000..bb4d6d0db --- /dev/null +++ b/hydromt/utils.py @@ -0,0 +1,42 @@ +"""Utility functions for hydromt that have no other home.""" + + +def partition_dictionaries(left, right): + """Calculate a partitioning of the two dictionaries. + + given dictionaries A and B this function will the follwing partition: + (A ∩ B, A - B, B - A) + """ + common = {} + left_less_right = {} + right_less_left = {} + key_union = set(left.keys()) | set(right.keys()) + + for key in key_union: + value_left = left.get(key, None) + value_right = right.get(key, None) + if isinstance(value_left, dict) and isinstance(value_right, dict): + ( + common_children, + unique_left_children, + unique_right_children, + ) = partition_dictionaries(value_left, value_right) + common[key] = common_children + if unique_left_children != unique_right_children: + left_less_right[key] = unique_left_children + right_less_left[key] = unique_right_children + elif value_left == value_right: + common[key] = value_left + else: + if value_left is not None: + left_less_right[key] = value_left + if value_right is not None: + right_less_left[key] = value_right + + return common, left_less_right, right_less_left + + +def _dict_pprint(d): + import json + + return json.dumps(d, indent=2) diff --git a/hydromt/workflows/forcing.py b/hydromt/workflows/forcing.py index c32b6cc29..55e2872e6 100644 --- a/hydromt/workflows/forcing.py +++ b/hydromt/workflows/forcing.py @@ -80,6 +80,7 @@ def precip( p_out.attrs.update(unit="mm") if freq is not None: resample_kwargs.update(upsampling="bfill", downsampling="sum", logger=logger) + p_out = resample_time(p_out, freq, conserve_mass=True, **resample_kwargs) return p_out diff --git a/pyproject.toml b/pyproject.toml index 91109207b..a91be6509 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,6 @@ dependencies = [ "entrypoints", # Provide access for Plugins "geopandas>=0.10", # pandas but geo, wraps fiona and shapely "gdal>=3.1", # enable geospacial data manipulation, both raster and victor - "fiona==1.8.22", # IO for vector and shape files "numba", # speed up computations (used in e.g. stats) "numpy>=1.20", # pin necessary to ensure compatability with C headers "netcdf4", # netcfd IO diff --git a/tests/conftest.py b/tests/conftest.py index eff430757..1cfe53bb6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -114,7 +114,8 @@ def demda(): coords={"y": -np.arange(0, 1500, 100), "x": np.arange(0, 1000, 100)}, attrs=dict(_FillValue=-9999), ) - da.raster.set_crs(3785) + # NOTE epsg 3785 is deprecated https://epsg.io/3785 + da.raster.set_crs(3857) return da @@ -139,7 +140,8 @@ def flwda(flwdir): coords=gis_utils.affine_to_coords(flwdir.transform, flwdir.shape), attrs=dict(_FillValue=247), ) - da.raster.set_crs(3785) + # NOTE epsg 3785 is deprecated https://epsg.io/3785 + da.raster.set_crs(3875) return da @@ -214,7 +216,8 @@ def model(demda, world, obsda): mod = Model() mod.setup_region({"geom": demda.raster.box}) mod.setup_config(**{"header": {"setting": "value"}}) - mod.set_staticmaps(demda, "elevtn") # will be deprecated + with pytest.deprecated_call(): + mod.set_staticmaps(demda, "elevtn") mod.set_geoms(world, "world") mod.set_maps(demda, "elevtn") mod.set_forcing(obsda, "waterlevel") diff --git a/tests/data/aws_esa_worldcover.yml b/tests/data/aws_esa_worldcover.yml new file mode 100644 index 000000000..9a895581c --- /dev/null +++ b/tests/data/aws_esa_worldcover.yml @@ -0,0 +1,18 @@ +esa_worldcover: + crs: 4326 + data_type: RasterDataset + driver: raster + filesystem: s3 + version: 2021 + provider: aws + driver_kwargs: + storage_options: + anon: true + meta: + category: landuse + source_license: CC BY 4.0 + source_url: https://doi.org/10.5281/zenodo.5571936 + source_version: v100 + path: s3://esa-worldcover/v100/2020/ESA_WorldCover_10m_2020_v100_Map_AWS.vrt + rename: + ESA_WorldCover_10m_2020_v100_Map_AWS: landuse diff --git a/tests/data/legacy_esa_worldcover.yml b/tests/data/legacy_esa_worldcover.yml new file mode 100644 index 000000000..1d9c1d4b1 --- /dev/null +++ b/tests/data/legacy_esa_worldcover.yml @@ -0,0 +1,15 @@ +esa_worldcover: + crs: 4326 + data_type: RasterDataset + driver: raster + driver_kwargs: + chunks: + x: 36000 + y: 36000 + meta: + category: landuse + source_license: CC BY 4.0 + source_url: https://doi.org/10.5281/zenodo.5571936 + source_version: v100 + path: landuse/esa_worldcover/esa-worldcover.vrt + version: 2020 diff --git a/tests/data/merged_esa_worldcover.yml b/tests/data/merged_esa_worldcover.yml new file mode 100644 index 000000000..7636b370d --- /dev/null +++ b/tests/data/merged_esa_worldcover.yml @@ -0,0 +1,29 @@ +esa_worldcover: + crs: 4326 + data_type: RasterDataset + driver: raster + filesystem: local + driver_kwargs: + chunks: + x: 36000 + y: 36000 + meta: + category: landuse + source_license: CC BY 4.0 + source_url: https://doi.org/10.5281/zenodo.5571936 + variants: + - provider: local + version: 2021 + path: landuse/esa_worldcover_2021/esa-worldcover.vrt + - provider: local + version: 2020 + path: landuse/esa_worldcover/esa-worldcover.vrt + - provider: aws + version: 2020 + path: s3://esa-worldcover/v100/2020/ESA_WorldCover_10m_2020_v100_Map_AWS.vrt + rename: + ESA_WorldCover_10m_2020_v100_Map_AWS: landuse + filesystem: s3 + driver_kwargs: + storage_options: + anon: true diff --git a/tests/test_basin_mask.py b/tests/test_basin_mask.py index 4bba755f0..5c27f3016 100644 --- a/tests/test_basin_mask.py +++ b/tests/test_basin_mask.py @@ -123,7 +123,7 @@ def test_basin(caplog): data_catalog = hydromt.DataCatalog(logger=logger) ds = data_catalog.get_rasterdataset("merit_hydro") gdf_bas_index = data_catalog.get_geodataframe("merit_hydro_index") - bas_index = data_catalog["merit_hydro_index"] + bas_index = data_catalog.get_source("merit_hydro_index") with pytest.raises(ValueError, match=r"No basins found"): gdf_bas, gdf_out = get_basin_geometry( diff --git a/tests/test_data_adapter.py b/tests/test_data_adapter.py index 351a6bcbd..dd90b69ea 100644 --- a/tests/test_data_adapter.py +++ b/tests/test_data_adapter.py @@ -41,12 +41,12 @@ def test_resolve_path(tmpdir): cat = DataCatalog() cat.from_dict(dd) # test - assert len(cat["test"].resolve_paths()) == 48 - assert len(cat["test"].resolve_paths(variables=["precip"])) == 24 + assert len(cat.get_source("test").resolve_paths()) == 48 + assert len(cat.get_source("test").resolve_paths(variables=["precip"])) == 24 kwargs = dict(variables=["precip"], time_tuple=("2021-03-01", "2021-05-01")) - assert len(cat["test"].resolve_paths(**kwargs)) == 3 + assert len(cat.get_source("test").resolve_paths(**kwargs)) == 3 with pytest.raises(FileNotFoundError, match="No such file found:"): - cat["test"].resolve_paths(variables=["waves"]) + cat.get_source("test").resolve_paths(variables=["waves"]) def test_rasterdataset(rioda, tmpdir): @@ -57,12 +57,12 @@ def test_rasterdataset(rioda, tmpdir): da1 = data_catalog.get_rasterdataset(fn_tif, bbox=rioda.raster.bounds) assert np.all(da1 == rioda_utm) geom = rioda.raster.box - da1 = data_catalog.get_rasterdataset("test", geom=geom) + da1 = data_catalog.get_rasterdataset("test.tif", geom=geom) assert np.all(da1 == rioda_utm) - with pytest.raises(FileNotFoundError, match="No such file or catalog key"): + with pytest.raises(FileNotFoundError, match="No such file or catalog source"): data_catalog.get_rasterdataset("no_file.tif") with pytest.raises(IndexError, match="RasterDataset: No data within"): - data_catalog.get_rasterdataset("test", bbox=[40, 50, 41, 51]) + data_catalog.get_rasterdataset("test.tif", bbox=[40, 50, 41, 51]) @pytest.mark.skipif(not compat.HAS_GCSFS, reason="GCSFS not installed.") @@ -111,16 +111,23 @@ def test_rasterdataset_zoomlevels(rioda_large, tmpdir): } data_catalog = DataCatalog() data_catalog.from_dict(yml_dict) - assert data_catalog[name]._parse_zoom_level() == 0 # default to first - assert data_catalog[name]._parse_zoom_level(zoom_level=1) == 1 - assert data_catalog[name]._parse_zoom_level(zoom_level=(0.3, "degree")) == 1 - assert data_catalog[name]._parse_zoom_level(zoom_level=(0.29, "degree")) == 0 - assert data_catalog[name]._parse_zoom_level(zoom_level=(0.1, "degree")) == 0 - assert data_catalog[name]._parse_zoom_level(zoom_level=(1, "meter")) == 0 + assert data_catalog.get_source(name)._parse_zoom_level() == 0 # default to first + assert data_catalog.get_source(name)._parse_zoom_level(zoom_level=1) == 1 + assert ( + data_catalog.get_source(name)._parse_zoom_level(zoom_level=(0.3, "degree")) == 1 + ) + assert ( + data_catalog.get_source(name)._parse_zoom_level(zoom_level=(0.29, "degree")) + == 0 + ) + assert ( + data_catalog.get_source(name)._parse_zoom_level(zoom_level=(0.1, "degree")) == 0 + ) + assert data_catalog.get_source(name)._parse_zoom_level(zoom_level=(1, "meter")) == 0 with pytest.raises(TypeError, match="zoom_level unit"): - data_catalog[name]._parse_zoom_level(zoom_level=(1, "asfd")) + data_catalog.get_source(name)._parse_zoom_level(zoom_level=(1, "asfd")) with pytest.raises(TypeError, match="zoom_level argument"): - data_catalog[name]._parse_zoom_level(zoom_level=(1, "asfd", "asdf")) + data_catalog.get_source(name)._parse_zoom_level(zoom_level=(1, "asfd", "asdf")) def test_rasterdataset_driver_kwargs(artifact_data: DataCatalog, tmpdir): @@ -158,11 +165,11 @@ def test_rasterdataset_driver_kwargs(artifact_data: DataCatalog, tmpdir): datacatalog.from_dict(data_dict2) era5_nc = datacatalog.get_rasterdataset("era5_nc") assert era5_zarr.equals(era5_nc) - datacatalog["era5_zarr"].to_file(tmpdir, "era5_zarr", driver="zarr") + datacatalog.get_source("era5_zarr").to_file(tmpdir, "era5_zarr", driver="zarr") def test_rasterdataset_unit_attrs(artifact_data: DataCatalog): - era5_dict = {"era5": artifact_data.sources["era5"].to_dict()} + era5_dict = {"era5": artifact_data.get_source("era5").to_dict()} attrs = { "temp": {"unit": "degrees C", "long_name": "temperature"}, "temp_max": {"unit": "degrees C", "long_name": "maximum temperature"}, @@ -175,12 +182,13 @@ def test_rasterdataset_unit_attrs(artifact_data: DataCatalog): assert raster["temp_max"].attrs["long_name"] == attrs["temp_max"]["long_name"] +# @pytest.mark.skip() def test_geodataset(geoda, geodf, ts, tmpdir): # this test can sometimes hang because of threading issues therefore # the synchronous scheduler here is necessary from dask import config as dask_config - dask_config.set(scheduler="synchronous") + dask_config.set(scheduler="single-threaded") fn_nc = str(tmpdir.join("test.nc")) fn_gdf = str(tmpdir.join("test.geojson")) fn_csv = str(tmpdir.join("test.csv")) @@ -196,7 +204,7 @@ def test_geodataset(geoda, geodf, ts, tmpdir): ).sortby("index") assert np.allclose(da1, geoda) assert da1.name == "test1" - ds1 = data_catalog.get_geodataset("test", single_var_as_array=False) + ds1 = data_catalog.get_geodataset("test.nc", single_var_as_array=False) assert isinstance(ds1, xr.Dataset) assert "test" in ds1 da2 = data_catalog.get_geodataset( @@ -209,7 +217,7 @@ def test_geodataset(geoda, geodf, ts, tmpdir): ).sortby("index") assert np.allclose(da3, geoda) assert da3.vector.crs.to_epsg() == 4326 - with pytest.raises(FileNotFoundError, match="No such file or catalog key"): + with pytest.raises(FileNotFoundError, match="No such file or catalog source"): data_catalog.get_geodataset("no_file.geojson") # Test nc file writing to file with tempfile.TemporaryDirectory() as td: @@ -223,7 +231,7 @@ def test_geodataset(geoda, geodf, ts, tmpdir): def test_geodataset_unit_attrs(artifact_data: DataCatalog): - gtsm_dict = {"gtsmv3_eu_era5": artifact_data.sources["gtsmv3_eu_era5"].to_dict()} + gtsm_dict = {"gtsmv3_eu_era5": artifact_data.get_source("gtsmv3_eu_era5").to_dict()} attrs = { "waterlevel": { "long_name": "sea surface height above mean sea level", @@ -239,7 +247,7 @@ def test_geodataset_unit_attrs(artifact_data: DataCatalog): def test_geodataset_unit_conversion(artifact_data: DataCatalog): gtsm_geodataarray = artifact_data.get_geodataset("gtsmv3_eu_era5") - gtsm_dict = {"gtsmv3_eu_era5": artifact_data.sources["gtsmv3_eu_era5"].to_dict()} + gtsm_dict = {"gtsmv3_eu_era5": artifact_data.get_source("gtsmv3_eu_era5").to_dict()} gtsm_dict["gtsmv3_eu_era5"].update(dict(unit_mult=dict(waterlevel=1000))) datacatalog = DataCatalog() datacatalog.from_dict(gtsm_dict) @@ -248,7 +256,7 @@ def test_geodataset_unit_conversion(artifact_data: DataCatalog): def test_geodataset_set_nodata(artifact_data: DataCatalog): - gtsm_dict = {"gtsmv3_eu_era5": artifact_data.sources["gtsmv3_eu_era5"].to_dict()} + gtsm_dict = {"gtsmv3_eu_era5": artifact_data.get_source("gtsmv3_eu_era5").to_dict()} gtsm_dict["gtsmv3_eu_era5"].update(dict(nodata=-99)) datacatalog = DataCatalog() datacatalog.from_dict(gtsm_dict) @@ -264,15 +272,15 @@ def test_geodataframe(geodf, tmpdir): assert isinstance(gdf1, gpd.GeoDataFrame) assert np.all(gdf1 == geodf) gdf1 = data_catalog.get_geodataframe( - "test", bbox=geodf.total_bounds, buffer=1000, rename={"test": "test1"} + "test.geojson", bbox=geodf.total_bounds, buffer=1000, rename={"test": "test1"} ) assert np.all(gdf1 == geodf) - with pytest.raises(FileNotFoundError, match="No such file or catalog key"): + with pytest.raises(FileNotFoundError, match="No such file or catalog source"): data_catalog.get_geodataframe("no_file.geojson") def test_geodataframe_unit_attrs(artifact_data: DataCatalog): - gadm_level1 = {"gadm_level1": artifact_data.sources["gadm_level1"].to_dict()} + gadm_level1 = {"gadm_level1": artifact_data.get_source("gadm_level1").to_dict()} attrs = {"NAME_0": {"long_name": "Country names"}} gadm_level1["gadm_level1"].update(dict(attrs=attrs)) artifact_data.from_dict(gadm_level1) diff --git a/tests/test_data_catalog.py b/tests/test_data_catalog.py index 99bd97e30..4b1bddfcf 100644 --- a/tests/test_data_catalog.py +++ b/tests/test_data_catalog.py @@ -10,41 +10,41 @@ import xarray as xr from hydromt.data_adapter import DataAdapter, RasterDatasetAdapter -from hydromt.data_catalog import DataCatalog, _parse_data_dict +from hydromt.data_catalog import ( + DataCatalog, + _denormalise_data_dict, + _parse_data_source_dict, +) CATALOGDIR = join(dirname(abspath(__file__)), "..", "data", "catalogs") +DATADIR = join(dirname(abspath(__file__)), "data") def test_parser(): # valid abs root on windows and linux! root = "c:/root" if os.name == "nt" else "/c/root" # simple; abs path - dd = { - "test": { - "data_type": "RasterDataset", - "path": f"{root}/to/data.tif", - } + source = { + "data_type": "RasterDataset", + "path": f"{root}/to/data.tif", } - dd_out = _parse_data_dict(dd, root=root) - assert isinstance(dd_out["test"], RasterDatasetAdapter) - assert dd_out["test"].path == abspath(dd["test"]["path"]) + adapter = _parse_data_source_dict("test", source, root=root) + assert isinstance(adapter, RasterDatasetAdapter) + assert adapter.path == abspath(source["path"]) # test with Path object - dd["test"].update(path=Path(dd["test"]["path"])) - dd_out = _parse_data_dict(dd, root=root) - assert dd_out["test"].path == abspath(dd["test"]["path"]) + source.update(path=Path(source["path"])) + adapter = _parse_data_source_dict("test", source, root=root) + assert adapter.path == abspath(source["path"]) # rel path - dd = { - "test": { - "data_type": "RasterDataset", - "path": "path/to/data.tif", - "kwargs": {"fn": "test"}, - }, - "root": root, + source = { + "data_type": "RasterDataset", + "path": "path/to/data.tif", + "kwargs": {"fn": "test"}, } - dd_out = _parse_data_dict(dd) - assert dd_out["test"].path == abspath(join(root, dd["test"]["path"])) + adapter = _parse_data_source_dict("test", source, root=root) + assert adapter.path == abspath(join(root, source["path"])) # check if path in kwargs is also absolute - assert dd_out["test"].driver_kwargs["fn"] == abspath(join(root, "test")) + assert adapter.driver_kwargs["fn"] == abspath(join(root, "test")) # alias dd = { "test": { @@ -53,8 +53,13 @@ def test_parser(): }, "test1": {"alias": "test"}, } - dd_out = _parse_data_dict(dd, root=root) - assert dd_out["test"].path == dd_out["test1"].path + with pytest.deprecated_call(): + sources = _denormalise_data_dict(dd) + assert len(sources) == 2 + for name, source in sources: + adapter = _parse_data_source_dict(name, source, root=root, catalog_name="tmp") + assert adapter.path == abspath(join(root, dd["test"]["path"])) + assert adapter.catalog_name == "tmp" # placeholder dd = { "test_{p1}_{p2}": { @@ -63,18 +68,40 @@ def test_parser(): "placeholders": {"p1": ["a", "b"], "p2": ["1", "2", "3"]}, }, } - dd_out = _parse_data_dict(dd, root=root) - assert len(dd_out) == 6 - assert dd_out["test_a_1"].path == abspath(join(root, "data_1.tif")) - assert "placeholders" not in dd_out["test_a_1"].to_dict() + sources = _denormalise_data_dict(dd) + assert len(sources) == 6 + for name, source in sources: + assert "placeholders" not in source + adapter = _parse_data_source_dict(name, source, root=root) + assert adapter.path == abspath(join(root, f"data_{name[-1]}.tif")) + # variants + dd = { + "test": { + "data_type": "RasterDataset", + "variants": [ + {"path": "path/to/data1.tif", "version": "1"}, + {"path": "path/to/data2.tif", "provider": "local"}, + ], + }, + } + sources = _denormalise_data_dict(dd) + assert len(sources) == 2 + for i, (name, source) in enumerate(sources): + assert "variants" not in source + adapter = _parse_data_source_dict(name, source, root=root, catalog_name="tmp") + assert adapter.version == dd["test"]["variants"][i].get("version", None) + assert adapter.provider == dd["test"]["variants"][i].get("provider", None) + assert adapter.catalog_name == "tmp" # errors with pytest.raises(ValueError, match="Missing required path argument"): - _parse_data_dict({"test": {}}) + _parse_data_source_dict("test", {}) with pytest.raises(ValueError, match="Data type error unknown"): - _parse_data_dict({"test": {"path": "", "data_type": "error"}}) - with pytest.raises(ValueError, match="alias test not found in data_dict"): - _parse_data_dict({"test1": {"alias": "test"}}) + _parse_data_source_dict("test", {"path": "", "data_type": "error"}) + with pytest.raises( + ValueError, match="alias test not found in data_dict" + ), pytest.deprecated_call(): + _denormalise_data_dict({"test1": {"alias": "test"}}) def test_data_catalog_io(tmpdir): @@ -89,21 +116,99 @@ def test_data_catalog_io(tmpdir): fn_yml = join(tmpdir, "test1.yml") DataCatalog(fallback_lib=None).to_yml(fn_yml) # test print - print(data_catalog["merit_hydro"]) + print(data_catalog.get_source("merit_hydro")) + + +def test_versioned_catalogs(tmpdir): + # make sure the catalogs individually still work + legacy_yml_fn = join(DATADIR, "legacy_esa_worldcover.yml") + legacy_data_catalog = DataCatalog(data_libs=[legacy_yml_fn]) + assert len(legacy_data_catalog) == 1 + source = legacy_data_catalog.get_source("esa_worldcover") + assert Path(source.path).name == "esa-worldcover.vrt" + assert source.version == "2020" + # test round trip to and from dict + legacy_data_catalog2 = DataCatalog().from_dict(legacy_data_catalog.to_dict()) + assert legacy_data_catalog2 == legacy_data_catalog + # make sure we raise deprecation warning here + with pytest.deprecated_call(): + _ = legacy_data_catalog["esa_worldcover"] + + # second catalog + aws_yml_fn = join(DATADIR, "aws_esa_worldcover.yml") + aws_data_catalog = DataCatalog(data_libs=[aws_yml_fn]) + assert len(aws_data_catalog) == 1 + # test get_source with all keyword combinations + source = aws_data_catalog.get_source("esa_worldcover") + assert source.path.endswith("ESA_WorldCover_10m_2020_v100_Map_AWS.vrt") + assert source.version == "2021" + source = aws_data_catalog.get_source("esa_worldcover", version=2021) + assert source.path.endswith("ESA_WorldCover_10m_2020_v100_Map_AWS.vrt") + assert source.version == "2021" + source = aws_data_catalog.get_source("esa_worldcover", version=2021, provider="aws") + assert source.path.endswith("ESA_WorldCover_10m_2020_v100_Map_AWS.vrt") + # test round trip to and from dict + aws_data_catalog2 = DataCatalog().from_dict(aws_data_catalog.to_dict()) + assert aws_data_catalog2 == aws_data_catalog + + # test errors + with pytest.raises(KeyError): + aws_data_catalog.get_source("esa_worldcover", version=2021, provider="asdfasdf") + with pytest.raises(KeyError): + aws_data_catalog.get_source( + "esa_worldcover", version="asdfasdf", provider="aws" + ) + with pytest.raises(KeyError): + aws_data_catalog.get_source("asdfasdf", version=2021, provider="aws") + + # make sure we trigger user warning when overwriting versions + with pytest.warns(UserWarning): + aws_data_catalog.from_yml(aws_yml_fn) + + # make sure we can read merged catalogs + merged_yml_fn = join(DATADIR, "merged_esa_worldcover.yml") + merged_catalog = DataCatalog(data_libs=[merged_yml_fn]) + assert len(merged_catalog) == 3 + source_aws = merged_catalog.get_source("esa_worldcover") # last variant is default + assert source_aws.filesystem == "s3" + assert merged_catalog.get_source("esa_worldcover", provider="aws") == source_aws + source_loc = merged_catalog.get_source("esa_worldcover", provider="local") + assert source_loc != source_aws + assert source_loc.filesystem == "local" + assert source_loc.version == "2021" # get newest version + # test get_source with version only + assert merged_catalog.get_source("esa_worldcover", version=2021) == source_loc + # test round trip to and from dict + merged_catalog2 = DataCatalog().from_dict(merged_catalog.to_dict()) + assert merged_catalog2 == merged_catalog + + # Make sure we can query for the version we want + aws_and_legacy_catalog = DataCatalog(data_libs=[legacy_yml_fn, aws_yml_fn]) + assert len(aws_and_legacy_catalog) == 2 + source_aws = aws_and_legacy_catalog.get_source("esa_worldcover") + assert source_aws.filesystem == "s3" + source_aws2 = aws_and_legacy_catalog.get_source("esa_worldcover", provider="aws") + assert source_aws2 == source_aws + source_loc = aws_and_legacy_catalog.get_source( + "esa_worldcover", provider="legacy_esa_worldcover" # provider is filename + ) + assert Path(source_loc.path).name == "esa-worldcover.vrt" + # test round trip to and from dict + aws_and_legacy_catalog2 = DataCatalog().from_dict(aws_and_legacy_catalog.to_dict()) + assert aws_and_legacy_catalog2 == aws_and_legacy_catalog -@pytest.mark.filterwarnings('ignore:"from_artifacts" is deprecated:DeprecationWarning') def test_data_catalog(tmpdir): - data_catalog = DataCatalog(data_libs=None) # NOTE: legacy code! + data_catalog = DataCatalog(data_libs=None) # initialized with empty dict assert len(data_catalog._sources) == 0 # global data sources from artifacts are automatically added assert len(data_catalog.sources) > 0 # test keys, getitem, - keys = data_catalog.keys - source = data_catalog[keys[0]] + keys = [key for key, _ in data_catalog.iter_sources()] + source = data_catalog.get_source(keys[0]) assert isinstance(source, DataAdapter) - assert keys[0] in data_catalog + assert keys[0] in data_catalog.get_source_names() # add source from dict data_dict = {keys[0]: source.to_dict()} data_catalog.from_dict(data_dict) @@ -111,15 +216,19 @@ def test_data_catalog(tmpdir): assert isinstance(data_catalog._repr_html_(), str) assert isinstance(data_catalog.to_dataframe(), pd.DataFrame) with pytest.raises(ValueError, match="Value must be DataAdapter"): - data_catalog["test"] = "string" + data_catalog.add_source("test", "string") # check that no sources are loaded if fallback_lib is None assert not DataCatalog(fallback_lib=None).sources # test artifact keys (NOTE: legacy code!) - data_catalog = DataCatalog(deltares_data=False) + with pytest.deprecated_call(): + data_catalog = DataCatalog(deltares_data=False) assert len(data_catalog._sources) == 0 - data_catalog.from_artifacts("deltares_data") + with pytest.deprecated_call(): + data_catalog.from_artifacts("deltares_data") assert len(data_catalog._sources) > 0 - with pytest.raises(IOError, match="URL b'404: Not Found'"): + with pytest.raises( + IOError, match="URL b'404: Not Found'" + ), pytest.deprecated_call(): data_catalog = DataCatalog(deltares_data="unknown_version") # test hydromt version in meta data @@ -138,8 +247,10 @@ def test_from_archive(tmpdir): data_catalog.predefined_catalogs["artifact_data"]["versions"].values() )[0] data_catalog.from_archive(urlpath.format(version=version_hash)) - assert len(data_catalog._sources) > 0 - source0 = data_catalog._sources[[k for k in data_catalog.sources.keys()][0]] + assert len(data_catalog.iter_sources()) > 0 + source0 = data_catalog.get_source( + next(iter([source_name for source_name, _ in data_catalog.iter_sources()])) + ) assert ".hydromt_data" in str(source0.path) # failed to download with pytest.raises(ConnectionError, match="Data download failed"): @@ -201,7 +312,7 @@ def test_export_global_datasets(tmpdir): assert yml_list[2].strip().startswith("root:") # check if data is parsed correctly data_catalog1 = DataCatalog(data_lib_fn) - for key, source in data_catalog1.sources.items(): + for key, source in data_catalog1.iter_sources(): source_type = type(source).__name__ dtypes = DTYPES[source_type] obj = source.get_data() @@ -245,55 +356,96 @@ def test_export_dataframe(tmpdir, df, df_time): bbox=[11.70, 45.35, 12.95, 46.70], ) data_catalog1 = DataCatalog(str(tmpdir.join("data_catalog.yml"))) - assert len(data_catalog1) == 1 + assert len(data_catalog1.iter_sources()) == 1 data_catalog.export_data(str(tmpdir)) data_catalog1 = DataCatalog(str(tmpdir.join("data_catalog.yml"))) - assert len(data_catalog1) == 2 - for key, source in data_catalog1.sources.items(): + assert len(data_catalog1.iter_sources()) == 2 + for key, source in data_catalog1.iter_sources(): dtypes = pd.DataFrame obj = source.get_data() assert isinstance(obj, dtypes), key -def test_get_data(df): +def test_get_data(df, tmpdir): data_catalog = DataCatalog("artifact_data") # read artifacts - + n = len(data_catalog) # raster dataset using three different ways - da = data_catalog.get_rasterdataset(data_catalog["koppen_geiger"].path) + name = "koppen_geiger" + da = data_catalog.get_rasterdataset(data_catalog.get_source(name).path) + assert len(data_catalog) == n + 1 assert isinstance(da, xr.DataArray) - da = data_catalog.get_rasterdataset("koppen_geiger") + da = data_catalog.get_rasterdataset(name, provider="artifact_data") assert isinstance(da, xr.DataArray) da = data_catalog.get_rasterdataset(da) assert isinstance(da, xr.DataArray) + data = {"source": name, "provider": "artifact_data"} + da = data_catalog.get_rasterdataset(data) with pytest.raises(ValueError, match='Unknown raster data type "list"'): data_catalog.get_rasterdataset([]) + with pytest.raises(FileNotFoundError): + data_catalog.get_rasterdataset("test1.tif") + with pytest.raises(ValueError, match="Unknown keys in requested data"): + data_catalog.get_rasterdataset({"name": "test"}) # vector dataset using three different ways - gdf = data_catalog.get_geodataframe(data_catalog["osm_coastlines"].path) + name = "osm_coastlines" + gdf = data_catalog.get_geodataframe(data_catalog.get_source(name).path) + assert len(data_catalog) == n + 2 assert isinstance(gdf, gpd.GeoDataFrame) - gdf = data_catalog.get_geodataframe("osm_coastlines") + gdf = data_catalog.get_geodataframe(name, provider="artifact_data") assert isinstance(gdf, gpd.GeoDataFrame) gdf = data_catalog.get_geodataframe(gdf) assert isinstance(gdf, gpd.GeoDataFrame) + data = {"source": name, "provider": "artifact_data"} + gdf = data_catalog.get_geodataframe(data) + assert isinstance(gdf, gpd.GeoDataFrame) with pytest.raises(ValueError, match='Unknown vector data type "list"'): data_catalog.get_geodataframe([]) + with pytest.raises(FileNotFoundError): + data_catalog.get_geodataframe("test1.gpkg") + with pytest.raises(ValueError, match="Unknown keys in requested data"): + data_catalog.get_geodataframe({"name": "test"}) # geodataset using three different ways - da = data_catalog.get_geodataset(data_catalog["gtsmv3_eu_era5"].path) + name = "gtsmv3_eu_era5" + da = data_catalog.get_geodataset(data_catalog.get_source(name).path) + assert len(data_catalog) == n + 3 assert isinstance(da, xr.DataArray) - da = data_catalog.get_geodataset("gtsmv3_eu_era5") + da = data_catalog.get_geodataset(name, provider="artifact_data") assert isinstance(da, xr.DataArray) da = data_catalog.get_geodataset(da) assert isinstance(da, xr.DataArray) + data = {"source": name, "provider": "artifact_data"} + gdf = data_catalog.get_geodataset(data) + assert isinstance(gdf, xr.DataArray) with pytest.raises(ValueError, match='Unknown geo data type "list"'): data_catalog.get_geodataset([]) + with pytest.raises(FileNotFoundError): + data_catalog.get_geodataset("test1.nc") + with pytest.raises(ValueError, match="Unknown keys in requested data"): + data_catalog.get_geodataset({"name": "test"}) # dataframe using single way + name = "test.csv" + fn = str(tmpdir.join(name)) + df.to_csv(fn) + df = data_catalog.get_dataframe(fn, driver_kwargs=dict(index_col=0)) + assert len(data_catalog) == n + 4 + assert isinstance(df, pd.DataFrame) + df = data_catalog.get_dataframe(name, provider="local") + assert isinstance(df, pd.DataFrame) df = data_catalog.get_dataframe(df) assert isinstance(df, pd.DataFrame) + data = {"source": name, "provider": "local"} + gdf = data_catalog.get_dataframe(data) + assert isinstance(gdf, pd.DataFrame) with pytest.raises(ValueError, match='Unknown tabular data type "list"'): data_catalog.get_dataframe([]) + with pytest.raises(FileNotFoundError): + data_catalog.get_dataframe("test1.csv") + with pytest.raises(ValueError, match="Unknown keys in requested data"): + data_catalog.get_dataframe({"name": "test"}) def test_deprecation_warnings(artifact_data): diff --git a/tests/test_flw.py b/tests/test_flw.py index 061a76917..aac0324ac 100644 --- a/tests/test_flw.py +++ b/tests/test_flw.py @@ -62,18 +62,12 @@ def test_reproject_flwdir(hydds, demda): assert "flwdir" in hydds1 assert hydds1.raster.shape == demda_reproj.raster.shape assert np.allclose(hydds["uparea"].max(), hydds1["uparea"].max()) - # downscaled subdomain - demda_clip = demda_reproj.isel(y=slice(0, demda.raster.shape[0])) - hydds1 = flw.reproject_hydrography_like( - hydds, demda_clip, river_upa=0.05, outlets="min" - ) - assert hydds1.raster.shape == demda_clip.raster.shape # ~ 5% error is acceptable; test also exact value for precise unit testing assert abs(1 - hydds["uparea"].max() / hydds1["uparea"].max()) < 0.05 - assert np.isclose(hydds1["uparea"].max(), 1.53750026) + assert np.isclose(hydds1["uparea"].max(), 1.5) # error with pytest.raises(ValueError, match="uparea variable not found"): - flw.reproject_hydrography_like(hydds.drop_vars("uparea"), demda_clip) + flw.reproject_hydrography_like(hydds.drop_vars("uparea"), demda) def test_basin_map(hydds, flwdir): diff --git a/tests/test_model.py b/tests/test_model.py index ac41f2806..5923fbfdf 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -125,13 +125,11 @@ def test_write_data_catalog(tmpdir): assert list(DataCatalog(data_lib_fn).sources.keys()) == sources[:2] -@pytest.mark.filterwarnings( - 'ignore:Defining "region" based on staticmaps:DeprecationWarning' -) def test_model(model, tmpdir): # Staticmaps -> moved from _test_model_api as it is deprecated model._API.update({"staticmaps": xr.Dataset}) - non_compliant = model._test_model_api() + with pytest.deprecated_call(): + non_compliant = model._test_model_api() assert len(non_compliant) == 0, non_compliant # write model model.set_root(str(tmpdir), mode="w") @@ -140,16 +138,19 @@ def test_model(model, tmpdir): model.read() # read model model1 = Model(str(tmpdir), mode="r") - model1.read() + with pytest.deprecated_call(): + model1.read() with pytest.raises(IOError, match="Model opened in read-only mode"): model1.write() # check if equal model._results = {} # reset results for comparison - equal, errors = model._test_equal(model1) + with pytest.deprecated_call(): + equal, errors = model._test_equal(model1) assert equal, errors # read region from staticmaps model._geoms.pop("region") - assert np.all(model.region.total_bounds == model.staticmaps.raster.bounds) + with pytest.deprecated_call(): + assert np.all(model.region.total_bounds == model.staticmaps.raster.bounds) @pytest.mark.filterwarnings("ignore:The setup_basemaps") diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 000000000..16ae7443d --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,104 @@ +"""Testing for the internal hydromt utility functions.""" +from hydromt.utils import partition_dictionaries + + +def test_flat_dict_partition(): + left = {"a": 1, "b": 2, "pi": 3.14} + right = {"a": 1, "b": 2, "e": 2.71} + common, left_less_right, right_less_left = partition_dictionaries(left, right) + assert common == {"a": 1, "b": 2} + assert left_less_right == {"pi": 3.14} + assert right_less_left == {"e": 2.71} + + +def test_nested_disjoint_leaves(): + left = {"a": 1, "b": 2, "maths": {"constants": {"pi": 3.14}}} + right = {"a": 1, "b": 2, "maths": {"constants": {"e": 2.71}}} + common, left_less_right, right_less_left = partition_dictionaries(left, right) + assert common == {"a": 1, "b": 2, "maths": {"constants": {}}} + assert left_less_right == {"maths": {"constants": {"pi": 3.14}}} + assert right_less_left == {"maths": {"constants": {"e": 2.71}}} + + +def test_nested_common_siblings(): + left = { + "a": 1, + "b": 2, + "maths": { + "constants": {"pi": 3.14}, + "integration": {"numeric": None, "analytic": None}, + }, + } + right = { + "a": 1, + "b": 2, + "maths": { + "constants": {"e": 2.71}, + "integration": {"numeric": None, "analytic": None}, + }, + } + common, left_less_right, right_less_left = partition_dictionaries(left, right) + assert common == { + "a": 1, + "b": 2, + "maths": {"constants": {}, "integration": {"numeric": None, "analytic": None}}, + } + assert left_less_right == {"maths": {"constants": {"pi": 3.14}}} + assert right_less_left == {"maths": {"constants": {"e": 2.71}}} + + +def test_nested_key_conflict(): + left = { + "a": 1, + "b": 2, + "c": 3, + "d": 4, + "e": 5, + "maths": {"constants": {"pi": 3.14}}, + } + right = {"a": 1, "b": 2, "c": 3, "d": 4, "maths": {"constants": {"e": 2.71}}} + + common, left_less_right, right_less_left = partition_dictionaries(left, right) + + assert common == {"a": 1, "b": 2, "c": 3, "d": 4, "maths": {"constants": {}}} + assert left_less_right == { + "e": 5, + "maths": {"constants": {"pi": 3.14}}, + } + assert right_less_left == { + "maths": {"constants": {"e": 2.71}}, + } + + +def test_common_ancestory_distinct_children(): + left = { + "a": {"i": -1, "ii": -2, "iii": -3}, + "b": 2, + "c": 3, + "d": 4, + "e": 5, + "maths": {"constants": {"pi": 3.14}}, + } + right = { + "a": {"i": -1, "ii": -2, "iii": -3}, + "b": 2, + "c": 3, + "d": 4, + "maths": {"constants": {"e": 2.71}}, + } + + common, left_less_right, right_less_left = partition_dictionaries(left, right) + assert common == { + "a": {"i": -1, "ii": -2, "iii": -3}, + "b": 2, + "c": 3, + "d": 4, + "maths": {"constants": {}}, + } + assert left_less_right == { + "e": 5, + "maths": {"constants": {"pi": 3.14}}, + } + assert right_less_left == { + "maths": {"constants": {"e": 2.71}}, + }