diff --git a/src/ingestor.py b/src/ingestor.py index e3e3da5..731f48d 100644 --- a/src/ingestor.py +++ b/src/ingestor.py @@ -2,14 +2,10 @@ import time as time import glob from pathlib import Path -from enum import Enum from tqdm import tqdm import rioxarray import xarray as xr -import geopandas as gpd -from dask.dataframe import DataFrame -from geocube.api.core import make_geocube import duckdb from metacatalog.models import Entry @@ -406,7 +402,6 @@ def add_temporal_integration(entry: Entry, table_name: str, database_path: Optio db.execute(sql) - def add_spatial_integration(entry: Entry, table_name: str, spatio_temporal: bool = False, database_path: Optional[str] = None, funcs: Optional[List[str]] = None, target_epsg: int = 3857, algin_cell: str = 'center') -> None: # if there is no spatial dimension, we cannot integrate if entry.datasource.spatial_scale is None: diff --git a/src/loader.py b/src/loader.py index 9a27809..17cf443 100644 --- a/src/loader.py +++ b/src/loader.py @@ -216,9 +216,9 @@ def _clip_netcdf_xarray(entry: Entry, file_name: str, data: xr.Dataset, params: # then the region clip if entry.datasource.temporal_scale is not None: time_dim = entry.datasource.temporal_scale.dimension_names[0] - ds.chunk({entry.datasource.temporal_scale.dimension_names[0]: 1}) + ds.chunk({entry.datasource.temporal_scale.dimension_names[0]: 'auto'}) - logger.info(f"python - ds.chunk{{'{time_dim}': 1}})") + logger.info(f"python - ds.chunk{{'{time_dim}': 'auto'}})") else: time_dim = None diff --git a/src/param.py b/src/param.py index ef91afd..d7bea6a 100644 --- a/src/param.py +++ b/src/param.py @@ -40,7 +40,8 @@ class Params(BaseModel): integration: Integrations = Integrations.ALL # optional parameter to configure output - keep_intermediate: bool = False + keep_data_files: bool = True + database_name: str = 'dataset.db' # stuff that we do not change in the tool @@ -48,9 +49,9 @@ class Params(BaseModel): netcdf_backend: NetCDFBackends = NetCDFBackends.XARRAY @property - def intermediate_path(self) -> Path: - if self.keep_intermediate: - p = Path(self.base_path) / 'intermediate' + def dataset_path(self) -> Path: + if self.keep_data_files: + p = Path(self.base_path) / 'datasets' else: p = Path(tempfile.mkdtemp()) @@ -60,13 +61,6 @@ def intermediate_path(self) -> Path: # return the path return p - @property - def dataset_path(self) -> Path: - p = Path(self.base_path) / 'datasets' - p.mkdir(parents=True, exist_ok=True) - - return p - @property def database_path(self) -> Path: return Path(self.base_path) / self.database_name diff --git a/src/run.py b/src/run.py index f9f6952..6957d51 100644 --- a/src/run.py +++ b/src/run.py @@ -1,5 +1,7 @@ import os +import sys from datetime import datetime as dt +import time from pathlib import Path from concurrent.futures import ProcessPoolExecutor @@ -8,9 +10,10 @@ from metacatalog import api from tqdm import tqdm -from param import load_params +from param import load_params, Integrations from loader import load_entry_data from logger import logger +import ingestor from clip import reference_area_to_file # parse parameters @@ -77,8 +80,15 @@ with open('/out/processing.log', 'w') as f: f.write(MSG) +# if the integration is set to NONE and the user does not want to keep the data files, there will be no output +if params.integration == Integrations.NONE and not params.keep_data_files: + logger.critical("You have set the integration to NONE and do not want to keep the data files. This will result in no output.") + sys.exit(1) + # --------------------------------------------------------------------------- # # Here is the actual tool +tool_start = time.time() + # save the reference area to a file for later reuse if params.reference_area is not None: reference_area = reference_area_to_file() @@ -105,8 +115,26 @@ logger.info(f"Pool {type(executor).__name__} finished all tasks and shutdown.") # here to the stuff for creating a consistent dataset +# check if the user disabled integration +if params.integration == Integrations.NONE: + logger.debug("Integration is disabled. No further processing will be done.") + +# check if we have any files to process +elif len(file_mapping) > 0: + logger.info(f"Starting to create a consistent DUckDB dataset at {params.database_path}. Check out https://duckdb.org/docs/api/overview to learn more about DuckDB.") + + # start a timer + t1 = time.time() + path = ingestor.load_files(file_mapping=file_mapping) + t2 = time.time() + logger.info(f"Finished creating the dataset at {path} in {t2-t1:.2f} seconds.") +else: + logger.warning("It seems like no data files have been processed. This might be an error.") # --------------------------------------------------------------------------- # +# we're finished. +t2 = time.time() +logger.info(f"Total runtime: {t2-tool_start:.2f} seconds.") # print out the report with open('/out/processing.log', 'r') as f: