Skip to content

Commit

Permalink
Merge pull request #18 from VForWaTer/single-thread
Browse files Browse the repository at this point in the history
Single thread
  • Loading branch information
mmaelicke authored Sep 17, 2024
2 parents c6eea75 + 996a5a2 commit 483e9cf
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 226 deletions.
7 changes: 3 additions & 4 deletions CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type: software
authors:
- given-names: Mirko
family-names: Mälicke
email: mirko.maelicke@KIT.edu
email: mirko.maelicke@kit.edu
affiliation: >-
Institute for Water and Environment, Hydrology,
Karlsruhe Institute for Technology (KIT)
Expand All @@ -28,7 +28,6 @@ abstract: >-
The requested datasources will be made available in the output directory of the tool. Areal datasets
will be clipped to the **bounding box** of the reference area and multi-file sources are preselected
to fall into the time range specified.
Note that exact extracts (specific time step, specific area) are not yet supported for areal datasets.
keywords:
- docker
- tool-spec
Expand All @@ -38,5 +37,5 @@ keywords:
- catchment
- metacatalog
license: CC-BY-4.0
version: '0.9.3'
date-released: '2024-07-31'
version: '0.10.0'
date-released: '2024-09-17'
26 changes: 13 additions & 13 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ RUN pip install \
xarray[complete]==2024.7.0 \
rioxarray==0.17.0 \
pyarrow==17.0.0 \
ydata-profiling==4.9.0 \
#ydata-profiling==4.9.0 \
# linux AArch64 extensions are not available for 0.9.2 -> 0.10.0 is released early Feb. 2024
"duckdb>=1.0.0" \
#"duckdb>=1.0.0" \
polars-lts-cpu==1.1.0 \
geocube==0.6.0

# install the needed version for metacatalog
RUN pip install metacatalog==0.9.1
RUN pip install metacatalog==0.9.2

# Install CDO, might be used to do seltimestep or sellonlatbox and possibly merge
RUN apt-get install -y gettext=0.21-12 \
gnuplot=5.4.4+dfsg1-2
#RUN apt-get install -y gettext=0.21-12 \
#gnuplot=5.4.4+dfsg1-2
# cdo=2.1.1-1

# create the tool input structure
Expand All @@ -50,16 +50,16 @@ COPY ./CITATION.cf[f] /src/CITATION.cff

# download a precompiled binary of duckdb
# first line checks the architecture, and replaces x86_64 with amd64, which is what duckdb uses
RUN arch=$(uname -m | sed s/x86_64/amd64/) && \
mkdir /duck && \
wget https://github.com/duckdb/duckdb/releases/download/v1.0.0/duckdb_cli-linux-${arch}.zip && \
unzip duckdb_cli-linux-${arch}.zip && \
rm duckdb_cli-linux-${arch}.zip && \
chmod +x ./duckdb && \
mv ./duckdb /duck/duckdb
# RUN arch=$(uname -m | sed s/x86_64/amd64/) && \
# mkdir /duck && \
# wget https://github.com/duckdb/duckdb/releases/download/v1.0.0/duckdb_cli-linux-${arch}.zip && \
# unzip duckdb_cli-linux-${arch}.zip && \
# rm duckdb_cli-linux-${arch}.zip && \
# chmod +x ./duckdb && \
# mv ./duckdb /duck/duckdb

# pre-install the spatial extension into duckdb as it will be used
RUN /duck/duckdb -c "INSTALL spatial;"
# RUN /duck/duckdb -c "INSTALL spatial;"

# go to the source directory of this tool
WORKDIR /src
Expand Down
34 changes: 6 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,18 @@ This tool follows the [Tool Specification](https://vforwater.github.io/tool-spec
[MetaCatalog](https://github.com/vforwater/metacatalog) stores metadata about internal and external datasets along with
information about the data sources and how to access them. Using this tool, one can request datasets (called *entries* in MetaCatalog) by their **id**. Additionally, an area of interest is supplied as a GeoJSON feature, called **reference area**.

The tool involves three main processing steps, of which only the first one is mandatory.

1. The database of the connected MetaCatalog instance is queried for the `dataset_ids`. The data-files are reuqested for
The database of the connected MetaCatalog instance is queried for the `dataset_ids`. The data-files are reuqested for
the temporal extent of `start_date` and `end_date` if given, while the spatial extent is requested for the bounding box
of `reference_area`. MetaCatalog entires without either of the scales defined are loaded entierly.
Finally, the spatial extent is clipped by the `reference_area` to match exactly. Experimental parameters are not yet
exposed, but involve:

- `netcdf_backend`, which can be either `'CDO'` or `'xarray'` (default) can switch the software used for the clip
of NetCDF data sources, which are commonly used for spatio-temporal datasets.
- `touches` is a boolean that is `false` by default and configures if areal grid cells are considered part of
`reference_area` if they touch (`touches=true`) or only contain the grid center (`touches=false`).

All processed data-files for each source are then saved to `/out/datasets/`, while multi-file sources are saved to
child repositories. The file (or folder) names are built like: `<variable_name>_<entry_id>`.

2. The second step is only performed if the parameter `integration` is **not** set to `none`.
All available data sources are converted to long-format, where each atomic data value is indexed by the value of the
axes, that form the spatial and temporal scales (if given). These files are loaded into a DuckDB, that is exported as
`/out/dataset.db` along with all metadata from MetaCatalog as JSON, and a number of database MACROs for aggregations
along the scale axes.
For each data integration defined as `integration` (one of `['temporal', 'spatial', 'spatiotemporal']`), the MACRO is
executed and the result is saved to `/out/results/<variable_name>_<entry_id>_<aggregation_scale>_aggs.parquet` containing
aggregations to all statistical moments, quartiles, the sum, Shannon Entropy and a histogram.
The means are further joined into a common `/out/results/mean_<aggregation_scale>_aggs.parquet` as the main result
outputs. The aggregation is configured via `precision` (temporal) and `resolution` (spatial). The final database
can still be used to execute other aggregations, outside of the context of this tool.

3. The last step can only be run, if the second step was performed successfully. As of now, two finishing report-like
documents are created. First [YData Profiling](https://docs.profiling.ydata.ai/latest/) is run on the
`/out/results/mean_temporal_aggs.parquet` to create a time-series exploratory data analysis (EDA) report. It is
availabe in HTML and JSON format.
The second document is a `/out/README.md`, which is created at runtime from the data in the database. Thus, the data
tables are listed accordingly and license information is extracted and presented as available in the MetaCatalog instance.

### Parameters

Expand All @@ -55,11 +35,7 @@ tables are listed accordingly and license information is extracted and presented
| reference_area | A valid GeoJSON POLYGON Feature. Areal datasets will be clipped to this area. |
| start_date | The start date of the dataset, if a time dimension applies to the dataset. |
| end_date | The end date of the dataset, if a time dimension applies to the dataset. |
| integration | The mode of operation for integrating all data files associated with each data source into a common DuckDB-based dataset. |
| keep_data_files | If set to `false`, the data files clipped to the spatial and temporal scale will not be kept. |
| precision | The precision for aggregations along the temporal scale of the datasets. |
| resolution | The resolution of the output data. This parameter is only relevant for areal datasets. |

| cell_touches | Specifies if an areal cell is part of the reference area if it only touches the geometry. |

## Development and local run

Expand Down Expand Up @@ -125,11 +101,13 @@ Each container needs at least the following structure:
|- src/
| |- tool.yml
| |- run.py
| |- CITATION.cff
```

* `inputs.json` are parameters. Whichever framework runs the container, this is how parameters are passed.
* `tool.yml` is the tool specification. It contains metadata about the scope of the tool, the number of endpoints (functions) and their parameters
* `run.py` is the tool itself, or a Python script that handles the execution. It has to capture all outputs and either `print` them to console or create files in `/out`
* `CITATION.cff` Citation file providing bibliographic information on how to cite this tool.

*Does `run.py` take runtime args?*:

Expand Down
File renamed without changes.
48 changes: 34 additions & 14 deletions src/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import rasterio as rio

from json2args.logger import logger
from writer import dispatch_save_file, entry_metadata_saver
from writer import dispatch_save_file, entry_metadata_saver, xarray_to_netcdf_saver
from param import load_params, Params
from utils import whitebox_log_handler

Expand Down Expand Up @@ -112,8 +112,10 @@ def load_netcdf_file(entry: Entry, executor: Executor) -> str:
# get a path for the current dataset path
dataset_base_path = params.dataset_path / f"{entry.variable.name.replace(' ', '_')}_{entry.id}"

# create a counter for the saved parts
part = 0
# preprocess each netcdf / grib / zarr file
for i, fname in enumerate(fnames):
for fname in fnames:
# read the min and max time and check if we can skip
ds = xr.open_dataset(fname, decode_coords='all', mask_and_scale=True)

Expand Down Expand Up @@ -157,13 +159,19 @@ def load_netcdf_file(entry: Entry, executor: Executor) -> str:
# and supress the creation of metadata files
dataset_base_path.mkdir(parents=True, exist_ok=True)

# get the filenmae
filename = f"{entry.variable.name.replace(' ', '_')}_{entry.id}"
target_name = f"{filename}_part_{i + 1}.nc"
# we will actually save, so increate the part counter
part += 1

dispatch_save_file(entry=entry, data=data, executor=executor, base_path=str(dataset_base_path), target_name=target_name, save_meta=False)
# get the filename
filename = f"{entry.variable.name.replace(' ', '_')}_{entry.id}"
target_name = f"{filename}_part_{part}.nc"

# use the dispatch_save_file function to save the data
# dispatch_save_file(entry=entry, data=data, executor=executor, base_path=str(dataset_base_path), target_name=target_name, save_meta=False)
xarray_to_netcdf_saver(data=data, target_name=str(dataset_base_path / target_name))

# if there are many files, we save the metadata only once
if i == 0:
if part == 1:
metafile_name = str(params.dataset_path / f"{filename}.metadata.json")
entry_metadata_saver(entry, metafile_name)
logger.info(f"Saved metadata for dataset <ID={entry.id}> to {metafile_name}.")
Expand Down Expand Up @@ -301,18 +309,30 @@ def error_handler(future):
logger.error(f"ERRORED: clipping dataset <ID={entry.id}>: {str(exc)}")

# collect all futures
futures = []
# futures = []

part = 1

# go for each file
for i, fname in enumerate(fnames):
for fname in fnames:
# derive an out-name
out_name = None if len(fnames) == 1 else f"{Path(fname).stem}_part_{i + 1}.tif"
if len(fnames) == 1:
out_name = f"{entry.variable.name.replace(' ', '_')}_{entry.id}.tif"
else:
out_name = f"{entry.variable.name.replace(' ', '_')}_{entry.id}_part_{part}.tif"

# submit each save task to the executor
future = executor.submit(_rio_clip_raster, fname, reference_area, dataset_base_path, out_name=out_name, touched=params.cell_touches)
future.add_done_callback(error_handler)
futures.append(future)
#future = executor.submit(_rio_clip_raster, fname, reference_area, dataset_base_path, out_name=out_name, touched=params.cell_touches)
#future.add_done_callback(error_handler)
#futures.append(future)

# call procedurally
out_path = _rio_clip_raster(fname, reference_area, base_path=dataset_base_path, out_name=out_name, touched=params.cell_touches)
if out_path is not None:
part += 1

# wait until all are finished
tiles = [future.result() for future in futures if future.result() is not None]
#tiles = [future.result() for future in futures if future.result() is not None]

# run the merge function and delete the other files
# if len(tiles) > 1:
Expand Down
33 changes: 3 additions & 30 deletions src/param.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@
import geopandas as gpd


# create the Enum for integration type
class Integrations(str, Enum):
TEMPORAL = 'temporal'
SPATIAL = 'spatial'
SPATIO_TEMPORAL = 'spatiotemporal'
ALL = 'all'
NONE = 'none'
FULL = 'full'


class NetCDFBackends(str, Enum):
XARRAY = 'xarray'
Expand All @@ -39,31 +30,17 @@ class Params(BaseModel):
# optional parameters to configure the processing
start_date: datetime = None
end_date: datetime = None
integration: Integrations = Integrations.ALL
apply_aggregation: bool = False

# optional parameter to configure output
keep_data_files: bool = True
database_name: str = 'dataset.duckdb'

# optional parameter to provide result output
precision: str = 'day'
resolution: int = 5000
cell_touches: bool = True

# stuff that we do not change in the tool
base_path: str = '/out'
dataset_folder_name: str = 'datasets'
netcdf_backend: NetCDFBackends = NetCDFBackends.XARRAY

# duckdb settings
use_spatial: bool = False

@property
def dataset_path(self) -> Path:
if self.keep_data_files:
p = Path(self.base_path) / 'datasets'
else:
p = Path(tempfile.mkdtemp())
# set the databsets path
p = Path(self.base_path) / self.dataset_folder_name

# make the directory if it does not exist
p.mkdir(parents=True, exist_ok=True)
Expand All @@ -79,10 +56,6 @@ def result_path(self) -> Path:

return p

@property
def database_path(self) -> Path:
return Path(self.base_path) / self.database_name

@property
def reference_area_df(self) -> gpd.GeoDataFrame:
return gpd.GeoDataFrame.from_features([self.reference_area])
Expand Down
Loading

0 comments on commit 483e9cf

Please sign in to comment.