Skip to content

Commit

Permalink
implement the direct parquet-file saving
Browse files Browse the repository at this point in the history
  • Loading branch information
mmaelicke committed Jul 17, 2024
1 parent 3efcc95 commit ae94d28
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 5 deletions.
2 changes: 1 addition & 1 deletion examples/dem/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ services:
- ../../data/pg_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "sh -c 'pg_isready -U $${POSTGRES_USER} -d $${POSTGRES_DB}'"]
interval: 50s
interval: 5s
timeout: 10s
retries: 5

Expand Down
44 changes: 43 additions & 1 deletion src/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,44 @@ def available_aggregations(database_path: Optional[str] = None) -> Dict[str, Lay
return layers


def aggregate_to_file(table_name: str, aggregation_scale: str, precision: str = 'day', resolution: int = 1000, layers: Optional[Dict[str, LayerInfo]] = None, aggregator: str = 'mean', database_path: Optional[str] = None) -> None:
# check if we habe a dataset path
db_path = _get_database_path(database_path)

# make sure that the current aggregation scale is supported
if aggregation_scale != 'temporal':
raise NotImplementedError("Only temporal aggregations are supported right now")

# first check if layers was given
if layers is None:
layers = available_aggregations(database_path=db_path)

# check if the table name and aggregation_scale is in the layers
try:
macro = layers[table_name]['aggregations'][aggregation_scale]
except KeyError:
raise ValueError(f"The dataset {db_path} does not contain a aggregation MACRO for {aggregation_scale} aggregations on table {table_name}")

# connect and run the aggregation query
with duckdb.connect(db_path, read_only=True) as db:
db.load_extension('spatial')

# build the query
if aggregation_scale == 'temporal':
sql = f"COPY (SELECT time, {aggregator} FROM {macro}('{precision}')) TO '{aggregator}_{table_name}_{aggregation_scale}_aggs.parquet' (FORMAT PARQUET);"

# start the timer
t1 = time.time()
db.execute(sql)
t2 = time.time()

#logging
logger.info(f"duckdb - {sql}")
logger.info(f"took {t2 - t1:.2f} seconds")

return True


def run_aggregation(table_name: str, aggregation_scale: str, precision: str = 'day', resolution: int = 1000, layers: Optional[Dict[str, LayerInfo]] = None, database_path: Optional[str] = None) -> pl.DataFrame:
# check if we have a database path
db_path = _get_database_path(database_path)
Expand Down Expand Up @@ -132,7 +170,11 @@ def aggregate_scale(aggregation_scale: str, executor: Executor, precision: Optio
for layer in layers.keys():
# do the aggregation
try:
df = run_aggregation(table_name=layer, aggregation_scale=aggregation_scale, precision=precision, resolution=resolution, layers=layers, database_path=db_path)
if aggregation_scale == 'temporal':
aggregate_to_file(table_name=layer, aggregator='mean', aggregation_scale='temporal', precision=precision, resolution=resolution, layers=layers, database_path=db_path)
continue
else:
df = run_aggregation(table_name=layer, aggregation_scale=aggregation_scale, precision=precision, resolution=resolution, layers=layers, database_path=db_path)
except Exception as e:
logger.error(str(e))
continue
Expand Down
6 changes: 3 additions & 3 deletions src/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,19 @@
logger.warning("It seems like no data files have been processed. This might be an error.")

# switch the type of integrations
if params.integration != Integrations.NONE:
if params.integration != Integrations.NONE and params.apply_aggregation:
with PoolExecutor() as executor:
logger.debug(f"START {type(executor).__name__} - Pool to ingest data files into a Dataset DuckDB database.")

if params.integration == Integrations.TEMPORAL or params.integration == Integrations.ALL:
# run the temporal aggregation
aggregator.aggregate_scale(aggregation_scale='temporal', executor=executor)

if params.integration == Integrations.SPATIAL or params.integration == Integrations.ALL:
if params.integration == Integrations.SPATIAL or params.integration == Integrations.ALL and False:
# run the spatial aggregation
aggregator.aggregate_scale(aggregation_scale='spatial', executor=executor)

if params.integration == Integrations.SPATIO_TEMPORAL or params.integration == Integrations.ALL:
if params.integration == Integrations.SPATIO_TEMPORAL or params.integration == Integrations.ALL and False:
# run the spatio-temporal aggregation
aggregator.aggregate_scale(aggregation_scale='spatiotemporal', executor=executor)

Expand Down

0 comments on commit ae94d28

Please sign in to comment.