Skip to content

Commit

Permalink
Merge pull request #12 from VForWaTer/update-duckdb
Browse files Browse the repository at this point in the history
Update duckdb to stable version
  • Loading branch information
mmaelicke authored Jul 9, 2024
2 parents afd9df3 + 2f8e0a9 commit 6e22e78
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 15 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ RUN pip install ipython==8.26.0 \
pyarrow==14.0.1 \
ydata-profiling==4.6.4 \
# linux AArch64 extensions are not available for 0.9.2 -> 0.10.0 is released early Feb. 2024
duckdb==0.8.0 \
"duckdb>=1.0.0" \
polars==0.19.19 \
geocube

Expand All @@ -48,7 +48,7 @@ RUN mv /whitebox/WhiteboxTools_linux_amd64/WBT /src/WBT
# first line checks the architecture, and replaces x86_64 with amd64, which is what duckdb uses
RUN arch=$(uname -m | sed s/x86_64/amd64/) && \
mkdir /duck && \
wget https://github.com/duckdb/duckdb/releases/download/v0.8.0/duckdb_cli-linux-${arch}.zip && \
wget https://github.com/duckdb/duckdb/releases/download/v1.0.0/duckdb_cli-linux-${arch}.zip && \
unzip duckdb_cli-linux-${arch}.zip && \
rm duckdb_cli-linux-${arch}.zip && \
chmod +x ./duckdb && \
Expand Down
8 changes: 4 additions & 4 deletions src/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,18 @@ def aggregate_scale(aggregation_scale: str, executor: Executor, precision: Optio
if aggregation_scale == 'temporal':
on = ['time']
elif aggregation_scale == 'spatial':
on = ['lon', 'lat']
on = ['x', 'y']
elif aggregation_scale == 'spatiotemporal':
on = ['time', 'lon', 'lat']
on = ['time', 'x', 'y']

# extract only the index and column 'mean'
mean = df[[*on, 'mean']].rename({'mean': layer})
mean = df[[*on, 'mean']].clone().rename({'mean': layer})

# join the means
if means is None:
means = mean
else:
means = means.join(mean, on=on, how='outer')
means = means.join(mean, on=on, how='outer').clone()

# finally save the means
path = params.result_path / f"mean_{aggregation_scale}_aggs.parquet"
Expand Down
15 changes: 6 additions & 9 deletions src/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ def _table_exists(table_name: str) -> bool:
return False


def _create_datasource_table(entry: Entry, table_name: str, use_spatial: bool = False) -> str:
if use_spatial:
raise NotImplementedError('There is still an error with the spatial type.')
def _create_datasource_table(entry: Entry, table_name: str) -> str:
# get the parameters
params = load_params()

Expand All @@ -102,7 +100,7 @@ def _create_datasource_table(entry: Entry, table_name: str, use_spatial: bool =
column_names.append(f" time TIMESTAMP")

# spatial dimensions
if len(spatial_dims) == 2 and use_spatial:
if len(spatial_dims) == 2 and params.use_spatial:
column_names.append(f" cell BOX_2D")
else:
column_names.append(' ' + ','.join([f" {name} DOUBLE" for dim, name in zip(spatial_dims, SPATIAL_DIMENSIONS)]))
Expand All @@ -126,9 +124,8 @@ def _create_datasource_table(entry: Entry, table_name: str, use_spatial: bool =
return dbname


def _create_insert_sql(entry: Entry, table_name: str, source_name: str = 'df', use_spatial: bool = False) -> str:
if use_spatial:
raise NotImplementedError('There is still an error with the spatial type.')
def _create_insert_sql(entry: Entry, table_name: str, source_name: str = 'df') -> str:
params = load_params()

# get the dimension names
spatial_dims = entry.datasource.spatial_scale.dimension_names if entry.datasource.spatial_scale is not None else []
Expand All @@ -146,7 +143,7 @@ def _create_insert_sql(entry: Entry, table_name: str, source_name: str = 'df', u
column_names.append(f" {temporal_dims[0]} as time ")

# spatial dimensions
if len(spatial_dims) == 2 and use_spatial:
if len(spatial_dims) == 2 and params.use_spatial:
column_names.append(f" ({','.join(spatial_dims)})::BOX_2D AS cell ")
else:
column_names.append(' ' + ', '.join([f"{dim} AS {name}" for dim, name in zip(spatial_dims, SPATIAL_DIMENSIONS)]))
Expand Down Expand Up @@ -258,7 +255,7 @@ def load_xarray_to_duckdb(entry: Entry, data: xr.Dataset) -> str:

# get a delayed dask dataframe
try:
ddf = data.to_dask_dataframe()[dimension_names]
ddf = data.to_dask_dataframe()[dimension_names].dropna()
except ValueError as e:
# check this is the chunking error
if 'Object has inconsistent chunks' in str(e):
Expand Down
3 changes: 3 additions & 0 deletions src/param.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class Params(BaseModel):
base_path: str = '/out'
netcdf_backend: NetCDFBackends = NetCDFBackends.XARRAY

# duckdb settings
use_spatial: bool = False

@property
def dataset_path(self) -> Path:
if self.keep_data_files:
Expand Down

0 comments on commit 6e22e78

Please sign in to comment.