Skip to content

Commit

Permalink
add duckdb ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
mmaelicke committed Feb 2, 2024
1 parent 8f61066 commit 4ac96ab
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 19 deletions.
5 changes: 0 additions & 5 deletions src/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@
import time as time
import glob
from pathlib import Path
from enum import Enum

from tqdm import tqdm
import rioxarray
import xarray as xr
import geopandas as gpd
from dask.dataframe import DataFrame
from geocube.api.core import make_geocube
import duckdb
from metacatalog.models import Entry

Expand Down Expand Up @@ -406,7 +402,6 @@ def add_temporal_integration(entry: Entry, table_name: str, database_path: Optio
db.execute(sql)



def add_spatial_integration(entry: Entry, table_name: str, spatio_temporal: bool = False, database_path: Optional[str] = None, funcs: Optional[List[str]] = None, target_epsg: int = 3857, algin_cell: str = 'center') -> None:
# if there is no spatial dimension, we cannot integrate
if entry.datasource.spatial_scale is None:
Expand Down
4 changes: 2 additions & 2 deletions src/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,9 @@ def _clip_netcdf_xarray(entry: Entry, file_name: str, data: xr.Dataset, params:
# then the region clip
if entry.datasource.temporal_scale is not None:
time_dim = entry.datasource.temporal_scale.dimension_names[0]
ds.chunk({entry.datasource.temporal_scale.dimension_names[0]: 1})
ds.chunk({entry.datasource.temporal_scale.dimension_names[0]: 'auto'})

logger.info(f"python - ds.chunk{{'{time_dim}': 1}})")
logger.info(f"python - ds.chunk{{'{time_dim}': 'auto'}})")
else:
time_dim = None

Expand Down
16 changes: 5 additions & 11 deletions src/param.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,18 @@ class Params(BaseModel):
integration: Integrations = Integrations.ALL

# optional parameter to configure output
keep_intermediate: bool = False
keep_data_files: bool = True

database_name: str = 'dataset.db'

# stuff that we do not change in the tool
base_path: str = '/out'
netcdf_backend: NetCDFBackends = NetCDFBackends.XARRAY

@property
def intermediate_path(self) -> Path:
if self.keep_intermediate:
p = Path(self.base_path) / 'intermediate'
def dataset_path(self) -> Path:
if self.keep_data_files:
p = Path(self.base_path) / 'datasets'
else:
p = Path(tempfile.mkdtemp())

Expand All @@ -60,13 +61,6 @@ def intermediate_path(self) -> Path:
# return the path
return p

@property
def dataset_path(self) -> Path:
p = Path(self.base_path) / 'datasets'
p.mkdir(parents=True, exist_ok=True)

return p

@property
def database_path(self) -> Path:
return Path(self.base_path) / self.database_name
Expand Down
30 changes: 29 additions & 1 deletion src/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import sys
from datetime import datetime as dt
import time
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor

Expand All @@ -8,9 +10,10 @@
from metacatalog import api
from tqdm import tqdm

from param import load_params
from param import load_params, Integrations
from loader import load_entry_data
from logger import logger
import ingestor
from clip import reference_area_to_file

# parse parameters
Expand Down Expand Up @@ -77,8 +80,15 @@
with open('/out/processing.log', 'w') as f:
f.write(MSG)

# if the integration is set to NONE and the user does not want to keep the data files, there will be no output
if params.integration == Integrations.NONE and not params.keep_data_files:
logger.critical("You have set the integration to NONE and do not want to keep the data files. This will result in no output.")
sys.exit(1)

# --------------------------------------------------------------------------- #
# Here is the actual tool
tool_start = time.time()

# save the reference area to a file for later reuse
if params.reference_area is not None:
reference_area = reference_area_to_file()
Expand All @@ -105,8 +115,26 @@
logger.info(f"Pool {type(executor).__name__} finished all tasks and shutdown.")

# here to the stuff for creating a consistent dataset
# check if the user disabled integration
if params.integration == Integrations.NONE:
logger.debug("Integration is disabled. No further processing will be done.")

# check if we have any files to process
elif len(file_mapping) > 0:
logger.info(f"Starting to create a consistent DUckDB dataset at {params.database_path}. Check out https://duckdb.org/docs/api/overview to learn more about DuckDB.")

# start a timer
t1 = time.time()
path = ingestor.load_files(file_mapping=file_mapping)
t2 = time.time()
logger.info(f"Finished creating the dataset at {path} in {t2-t1:.2f} seconds.")
else:
logger.warning("It seems like no data files have been processed. This might be an error.")

# --------------------------------------------------------------------------- #
# we're finished.
t2 = time.time()
logger.info(f"Total runtime: {t2-tool_start:.2f} seconds.")

# print out the report
with open('/out/processing.log', 'r') as f:
Expand Down

0 comments on commit 4ac96ab

Please sign in to comment.