Skip to content

Commit

Permalink
add parquet-support (#445)
Browse files Browse the repository at this point in the history
  • Loading branch information
savente93 authored Jul 31, 2023
1 parent f55aaee commit 50e50b5
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 35 deletions.
1 change: 1 addition & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Added
-----
- docs now include a dropdown for selecting older versions of the docs. (#457)
- Support for loading the same data source but from different places (e.g. local & aws)
- Add support for reading and writing tabular data in ``parquet`` format. (PR #445)

Changed
-------
Expand Down
15 changes: 14 additions & 1 deletion docs/user_guide/data_types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ Vector data (GeoDataFrame)
- :py:meth:`~hydromt.io.open_vector`
- Point, Line and Polygon geometries. Uses :py:func:`geopandas.read_file`
* - ``vector_table``
- CSV, XY, and EXCEL.
- CSV, XY, PARQUET and EXCEL.
- :py:meth:`~hydromt.io.open_vector`
- Point geometries only. Uses :py:meth:`~hydromt.io.open_vector_from_table`

Expand Down Expand Up @@ -350,6 +350,15 @@ options.
driver_kwargs:
driver: csv
.. _binary_vector:

HydroMT also supports reading and writing verctor data in binary format. Currently only parquet is
supported, but others could be added if desried. The structure of the files should be the same as
the text format files described above but writting according to the parquet file spec. Since this is
a binary format, not exmaples are provided, but for example pandas can wrtie the same data structure
to parquet as it can csv.


.. _GeoDataset:

Geospatial point time-series (GeoDataset)
Expand Down Expand Up @@ -442,6 +451,7 @@ separate (text) files are parsed to **GeoDataset** using the **vector** driver.
The GeoDataset must at least contain a location index with point geometries which is referred to by the ``path`` argument
The path may refer to both GIS vector data such as GeoJSON with only Point geometries
or tabulated point vector data such as csv files, see earlier examples for GeoDataFrame datasets.
Finally, certain binary formats such as parquet are also supported.
In addition a tabulated time-series text file can be passed to be used as a variable of the GeoDataset.
This data is added by a second file which is referred to using the ``fn_data`` key-word argument.
The index of the time-series (in the columns header) and point locations must match.
Expand Down Expand Up @@ -492,6 +502,9 @@ read the time stamps the :py:func:`pandas.to_datetime` method is used.
- Excel files
- :py:func:`pandas.read_excel`
- If required, provide a sheet name through driver_kwargs
* - ``parquet``
- Binary encoded comunar dataformat
- :py:func:`pandas.read_parquet`
* - ``fwf``
- Fixed width delimited text files
- :py:func:`pandas.read_fwf`
Expand Down
13 changes: 9 additions & 4 deletions hydromt/data_adapter/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ def __init__(
Path to data source. If the dataset consists of multiple files, the path may
contain {variable}, {year}, {month} placeholders as well as path
search pattern using a '*' wildcard.
driver: {'csv', 'xlsx', 'xls', 'fwf'}, optional
driver: {'csv', 'parquet', 'xlsx', 'xls', 'fwf'}, optional
Driver to read files with, for 'csv' :py:func:`~pandas.read_csv`,
for {'xlsx', 'xls'} :py:func:`~pandas.read_excel`, and for 'fwf'
:py:func:`~pandas.read_fwf`.
for 'parquet' :py:func:`~pandas.read_parquet`, for {'xlsx', 'xls'}
:py:func:`~pandas.read_excel`, and for 'fwf' :py:func:`~pandas.read_fwf`.
By default the driver is inferred from the file extension and falls back to
'csv' if unknown.
filesystem: {'local', 'gcs', 's3'}, optional
Expand Down Expand Up @@ -131,7 +131,7 @@ def to_file(
data_name : str
Name of the output file without extension.
driver : str, optional
Driver to write the file, e.g., 'csv', 'excel'. If None,
Driver to write the file, e.g., 'csv','parquet', 'excel'. If None,
the default behavior is used.
variables : list of str, optional
Names of DataFrame columns to include in the output. By default,
Expand Down Expand Up @@ -169,6 +169,9 @@ def to_file(
driver = "csv"
fn_out = join(data_root, f"{data_name}.csv")
obj.to_csv(fn_out, **kwargs)
elif driver == "parquet":
fn_out = join(data_root, f"{data_name}.parquet")
obj.to_parquet(fn_out, **kwargs)
elif driver == "excel":
fn_out = join(data_root, f"{data_name}.xlsx")
obj.to_excel(fn_out, **kwargs)
Expand Down Expand Up @@ -208,6 +211,8 @@ def get_data(

if self.driver in ["csv"]:
df = pd.read_csv(self.path, **kwargs)
elif self.driver == "parquet":
df = pd.read_parquet(self.path, **kwargs)
elif self.driver in ["xls", "xlsx", "excel"]:
df = pd.read_excel(self.path, engine="openpyxl", **kwargs)
elif self.driver in ["fwf"]:
Expand Down
27 changes: 25 additions & 2 deletions hydromt/data_adapter/geodataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class GeoDataFrameAdapter(DataAdapter):
_DRIVERS = {
"xy": "xy",
"csv": "csv",
"parquet": "parquet",
"xls": "xls",
"xlsx": "xlsx",
}
Expand Down Expand Up @@ -169,7 +170,7 @@ def to_file(
return None, None

if driver is None:
_lst = ["csv", "xls", "xlsx", "xy", "vector_table"]
_lst = ["csv", "parquet", "xls", "xlsx", "xy", "vector_table"]
driver = "csv" if self.driver in _lst else "GPKG"
# always write netcdf
if driver == "csv":
Expand All @@ -181,6 +182,15 @@ def to_file(
)
gdf["x"], gdf["y"] = gdf.geometry.x, gdf.geometry.y
gdf.drop(columns="geometry").to_csv(fn_out, **kwargs)
elif driver == "parquet":
fn_out = join(data_root, f"{data_name}.parquet")
if not np.all(gdf.geometry.type == "Point"):
raise ValueError(
f"{data_name} contains other geometries than 'Point' "
"which cannot be written to parquet."
)
gdf["x"], gdf["y"] = gdf.geometry.x, gdf.geometry.y
gdf.drop(columns="geometry").to_parquet(fn_out, **kwargs)
else:
driver_extensions = {
"ESRI Shapefile": ".shp",
Expand Down Expand Up @@ -240,10 +250,23 @@ def get_data(

# read and clip
logger.info(f"GeoDataFrame: Read {self.driver} data{clip_str}.")
if self.driver in ["csv", "xls", "xlsx", "xy", "vector", "vector_table"]:
if self.driver in [
"csv",
"parquet",
"xls",
"xlsx",
"xy",
"vector",
"vector_table",
]:
# "csv", "xls", "xlsx", "xy" deprecated use vector_table instead.
# specific driver should be added to open_vector kwargs
if "driver" not in kwargs and self.driver in ["csv", "xls", "xlsx", "xy"]:
warnings.warn(
"using the driver setting is deprecated. Please use"
"vector_table instead."
)

kwargs.update(driver=self.driver)
# Check if file-object is required because of additional options
gdf = io.open_vector(
Expand Down
51 changes: 31 additions & 20 deletions hydromt/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import glob
import io
import logging
from os.path import abspath, basename, dirname, isfile, join
from os.path import abspath, basename, dirname, isfile, join, splitext
from pathlib import Path

import dask
Expand Down Expand Up @@ -309,11 +309,11 @@ def open_geodataset(
fn_data: path, str
Path to data file of which the index dimension which should match the geospatial
coordinates index.
This can either be a csv with datetime in the first column and the location
index in the header row, or a netcdf with a time and index dimensions.
This can either be a csv, or parquet with datetime in the first column and the
location index in the header row, or a netcdf with a time and index dimensions.
var_name: str, optional
Name of the variable in case of a csv fn_data file. By default, None and
infered from basename.
Name of the variable in case of a csv, or parquet fn_data file. By default,
None and infered from basename.
crs: str, `pyproj.CRS`, or dict
Source coordinate reference system, ignored for files with a native crs.
bbox : array of float, default None
Expand Down Expand Up @@ -361,7 +361,7 @@ def open_geodataset(
def open_timeseries_from_table(
fn, name=None, index_dim="index", logger=logger, **kwargs
):
"""Open timeseries csv file and parse to xarray.DataArray.
"""Open timeseries csv or parquet file and parse to xarray.DataArray.
Accepts files with time index on one dimension and numeric location index on the
other dimension. In case of string location indices, non-numeric parts are
Expand All @@ -387,9 +387,16 @@ def open_timeseries_from_table(
da: xarray.DataArray
DataArray
"""
kwargs0 = dict(index_col=0, parse_dates=True)
kwargs0.update(**kwargs)
df = pd.read_csv(fn, **kwargs0)
_, ext = splitext(fn)
if ext == ".csv":
kwargs0 = dict(index_col=0, parse_dates=True)
kwargs0.update(**kwargs)
df = pd.read_csv(fn, **kwargs0)
elif ext in [".parquet", ".pq"]:
df = pd.read_parquet(fn, **kwargs)
else:
raise ValueError(f"Unknown table file format: {ext}")

# check if time index
if np.dtype(df.index).type != np.datetime64:
try:
Expand Down Expand Up @@ -422,20 +429,21 @@ def open_vector(
logger=logger,
**kwargs,
):
"""Open fiona-compatible geometry, csv, excel or xy file and parse it.
"""Open fiona-compatible geometry, csv, parquet, excel or xy file and parse it.
Construct a :py:meth:`geopandas.GeoDataFrame` CSV or XLS file are
Construct a :py:meth:`geopandas.GeoDataFrame` CSV, parquet, or XLS file are
converted to point geometries based on default columns names
for the x- and y-coordinates, or if given, the x_dim and y_dim arguments.
Parameters
----------
fn : str
path to geometry file
driver: {'csv', 'xls', 'xy', 'vector'}, optional
driver: {'csv', 'xls', 'xy', 'vector', 'parquet'}, optional
driver used to read the file: :py:meth:`geopandas.open_file` for gdal vector
files, :py:meth:`hydromt.io.open_vector_from_table`
for csv, xls(x) and xy files. By default None, and infered from file extention.
for csv, parquet, xls(x) and xy files. By default None, and infered from
file extention.
crs: str, `pyproj.CRS`, or dict
Source coordinate reference system, ignored for files with a native crs.
dst_crs: str, `pyproj.CRS`, or dict
Expand All @@ -452,7 +460,7 @@ def open_vector(
the predicate function against each item. Requires bbox or mask.
By default 'intersects'
x_dim, y_dim : str
Name of x, y-coordinate columns, only applicable for csv or xls tables
Name of x, y-coordinate columns, only applicable for parquet, csv or xls tables
assert_gtype : {Point, LineString, Polygon}, optional
If given, assert geometry type
mode: {'r', 'a', 'w'}
Expand All @@ -470,7 +478,7 @@ def open_vector(
"""
filtered = False
driver = driver if driver is not None else str(fn).split(".")[-1].lower()
if driver in ["csv", "xls", "xlsx", "xy"]:
if driver in ["csv", "parquet", "xls", "xlsx", "xy"]:
gdf = open_vector_from_table(fn, driver=driver, **kwargs)
else:
gdf = gpd.read_file(fn, bbox=bbox, mask=geom, mode=mode, **kwargs)
Expand Down Expand Up @@ -509,12 +517,13 @@ def open_vector_from_table(
crs=None,
**kwargs,
):
"""Read point geometry files from csv, xy or excel table files.
"""Read point geometry files from csv, parquet, xy or excel table files.
Parameters
----------
driver: {'csv', 'xls', 'xlsx', 'xy'}
driver: {'csv', 'parquet', 'xls', 'xlsx', 'xy'}
If 'csv' use :py:meth:`pandas.read_csv` to read the data;
If 'parquet' use :py:meth:`pandas.read_parquet` to read the data;
If 'xls' or 'xlsx' use :py:meth:`pandas.read_excel` with `engine=openpyxl`
If 'xy' use :py:meth:`pandas.read_csv` with `index_col=False`, `header=None`,
`delim_whitespace=True`.
Expand All @@ -537,13 +546,15 @@ def open_vector_from_table(
Parsed and filtered point geometries
"""
driver = driver.lower() if driver is not None else str(fn).split(".")[-1].lower()
if "index_col" not in kwargs:
if "index_col" not in kwargs and driver != "parquet":
kwargs.update(index_col=0)
if driver in ["csv"]:
if driver == "csv":
df = pd.read_csv(fn, **kwargs)
elif driver == "parquet":
df = pd.read_parquet(fn, **kwargs)
elif driver in ["xls", "xlsx"]:
df = pd.read_excel(fn, engine="openpyxl", **kwargs)
elif driver in ["xy"]:
elif driver == "xy":
x_dim = x_dim if x_dim is not None else "x"
y_dim = y_dim if y_dim is not None else "y"
kwargs.update(index_col=False, header=None, delim_whitespace=True)
Expand Down
2 changes: 1 addition & 1 deletion hydromt/stats/extremes.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ def get_lmom(x, nmom=4):
vector of (nmom) L-moments
"""
n = len(x)
xs = np.msort(x)
xs = np.sort(x, axis=0)
bb = np.zeros(nmom - 1)
ll = np.zeros(nmom - 1)
b0 = xs.mean(axis=0)
Expand Down
8 changes: 8 additions & 0 deletions tests/data/parameters_data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@ vito_mapping:
source_info: landuse parameters based on vito classification (https://land.copernicus.eu/global/products/lc)
source_version: 1.0
path: vito_mapping.csv
vito_mapping_parquet:
data_type: DataFrame
driver: parquet
meta:
category: landuse
source_info: landuse parameters based on vito classification (https://land.copernicus.eu/global/products/lc)
source_version: 1.0
path: vito_mapping.parquet
Binary file added tests/data/vito_mapping.parquet
Binary file not shown.
18 changes: 13 additions & 5 deletions tests/test_data_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,13 @@ def test_geodataset(geoda, geodf, ts, tmpdir):
assert da3.vector.crs.to_epsg() == 4326
with pytest.raises(FileNotFoundError, match="No such file or catalog source"):
data_catalog.get_geodataset("no_file.geojson")
# Test nc file writing to file
with tempfile.TemporaryDirectory() as td:
# Test nc file writing to file
GeoDatasetAdapter(fn_nc).to_file(
data_root=td, data_name="test", driver="netcdf"
)
GeoDatasetAdapter(fn_nc).to_file(
data_root=td, data_name="test1", driver="netcdf", variables="test1"
data_root=tmpdir, data_name="test1", driver="netcdf", variables="test1"
)
GeoDatasetAdapter(fn_nc).to_file(data_root=td, data_name="test", driver="zarr")

Expand Down Expand Up @@ -297,6 +297,14 @@ def test_dataframe(df, tmpdir):
assert isinstance(df1, pd.DataFrame)
pd.testing.assert_frame_equal(df, df1)

# test reading parquet
fn_df_parquet = str(tmpdir.join("test.parquet"))
df.to_parquet(fn_df_parquet)
data_catalog = DataCatalog()
df2 = data_catalog.get_dataframe(fn_df_parquet, driver="parquet")
assert isinstance(df2, pd.DataFrame)
pd.testing.assert_frame_equal(df, df2)

# Test FWF support
fn_fwf = str(tmpdir.join("test.txt"))
df.to_string(fn_fwf, index=False)
Expand All @@ -309,9 +317,9 @@ def test_dataframe(df, tmpdir):
if compat.HAS_OPENPYXL:
fn_xlsx = str(tmpdir.join("test.xlsx"))
df.to_excel(fn_xlsx)
df2 = data_catalog.get_dataframe(fn_xlsx, driver_kwargs=dict(index_col=0))
assert isinstance(df2, pd.DataFrame)
assert np.all(df2 == df)
df3 = data_catalog.get_dataframe(fn_xlsx, driver_kwargs=dict(index_col=0))
assert isinstance(df3, pd.DataFrame)
assert np.all(df3 == df)


def test_dataframe_unit_attrs(df: pd.DataFrame, tmpdir):
Expand Down
18 changes: 16 additions & 2 deletions tests/test_data_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,13 @@ def test_export_global_datasets(tmpdir):
def test_export_dataframe(tmpdir, df, df_time):
# Write two csv files
fn_df = str(tmpdir.join("test.csv"))
fn_df_parquet = str(tmpdir.join("test.parquet"))
df.to_csv(fn_df)
df.to_parquet(fn_df_parquet)
fn_df_time = str(tmpdir.join("test_ts.csv"))
fn_df_time_parquet = str(tmpdir.join("test_ts.parquet"))
df_time.to_csv(fn_df_time)
df_time.to_parquet(fn_df_time_parquet)

# Test to_file method (needs reading)
data_dict = {
Expand All @@ -345,6 +349,16 @@ def test_export_dataframe(tmpdir, df, df_time):
"parse_dates": True,
},
},
"test_df_parquet": {
"path": fn_df_parquet,
"driver": "parquet",
"data_type": "DataFrame",
},
"test_df_ts_parquet": {
"path": fn_df_time_parquet,
"driver": "parquet",
"data_type": "DataFrame",
},
}

data_catalog = DataCatalog()
Expand All @@ -356,11 +370,11 @@ def test_export_dataframe(tmpdir, df, df_time):
bbox=[11.70, 45.35, 12.95, 46.70],
)
data_catalog1 = DataCatalog(str(tmpdir.join("data_catalog.yml")))
assert len(data_catalog1.iter_sources()) == 1
assert len(data_catalog1.iter_sources()) == 2

data_catalog.export_data(str(tmpdir))
data_catalog1 = DataCatalog(str(tmpdir.join("data_catalog.yml")))
assert len(data_catalog1.iter_sources()) == 2
assert len(data_catalog1.iter_sources()) == 4
for key, source in data_catalog1.iter_sources():
dtypes = pd.DataFrame
obj = source.get_data()
Expand Down
Loading

0 comments on commit 50e50b5

Please sign in to comment.