Skip to content

Commit

Permalink
add some debug statements
Browse files Browse the repository at this point in the history
  • Loading branch information
mmaelicke committed Apr 23, 2024
1 parent ab7f036 commit e1c113b
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 86 deletions.
2 changes: 1 addition & 1 deletion src/aggregator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional, List, Dict, TypedDict
from typing import Optional, Dict, TypedDict
from collections import defaultdict
import time
from concurrent.futures import Executor
Expand Down
63 changes: 11 additions & 52 deletions src/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import geopandas as gpd
import rasterio as rio


from logger import logger
from writer import dispatch_save_file, entry_metadata_saver
from param import load_params, Params
Expand Down Expand Up @@ -62,15 +61,22 @@ def load_file_source(entry: Entry, executor: Executor) -> str:
name = entry.datasource.path
path = Path(name)

# debug the path
logger.debug(f"Metacatalog entry datasource path: {path}; exists: {path.exists()}")

# go for the different suffixes
if path.suffix.lower() in ('.nc', '.netcdf', '.cdf', 'nc4'):
if path.suffix.lower() in ('.nc', '.netcdf', '.cdf', '.nc4'):
# laod the netCDF file time & space chunks to the output folder
out_path = load_netcdf_file(entry, executor=executor)

# return the dataset
return out_path
elif path.suffix.lower() in ('.tif', '.tiff', '.dem'):
logger.error("You tried to process a GeoTiff, which is not supported yet.")
raise NotImplementedError('GeoTiff loader is currently not implemented, sorry.')

else:
raise ValueError(f"Unknown file type for {path.suffix}. This tool does not support these kinds of files (yet).")


def load_netcdf_file(entry: Entry, executor: Executor) -> str:
Expand Down Expand Up @@ -124,17 +130,15 @@ def load_netcdf_file(entry: Entry, executor: Executor) -> str:
ds.close()
logger.warning(f"The dataset {fname} does not contain a datetime coordinate.")

#
# this does not work for ie HYRAS netCDF files
if params.netcdf_backend == 'cdo':
ds.close()
path = _clip_netcdf_cdo(fname, params)

#TODO to the mergetime here
pass

return path

elif params.netcdf_backend == 'xarray':
data = _clip_netcdf_xarray(entry, fname, ds, params)

elif params.netcdf_backend == 'parquet':
# use the xarray clip first
ds = _clip_netcdf_xarray(entry, fname, ds, params)
Expand Down Expand Up @@ -297,48 +301,3 @@ def load_raster_file(entry: Entry, name: str, reference_area: dict, base_path: s
out_path = Path(base_path) / Path(fname).name
with rio.open(str(out_path), 'w', **out_meta) as dst:
dst.write(out_raster)


# deprecated
# we do not merge them anymore
def merge_multi_file_netcdf(entry: Entry, path: str, save_nc: bool = True, save_parquet: bool = True) -> pd.DataFrame:
# check if this file should be saved
if save_nc:
out_name = f'/out/{entry.variable.name.replace(" ", "_")}_{entry.id}_lonlatbox.nc'
else:
out_name = f'{path}/merged_lonlatbox.nc'

# build the CDO command
merge_cmd = ['cdo', 'mergetime', str(Path(path) / '*.nc'), out_name]

# run merge command
t1 = time.time()
subprocess.run(merge_cmd)
t2 = time.time()
logger.info(' '.join(merge_cmd))
logger.info(f"took {t2-t1:.2f} seconds")

# open the merged data
# TODO infer time_axis from the entry and figure out a useful time_axis chunk size here
data = xr.open_dataset(out_name, decode_coords=True, mask_and_scale=True, chunks={'time': 1})

if not save_parquet:
return data

# TODO: put this into an extra STEP
# TODO: figure out axis_names from the entry here THIS IS NOT REALLY USEFULL
time_axis = next(([_] for _ in ('tstamp', 'time', 'date', 'datetime') if _ in data.coords), [])
x_axis = next(([_] for _ in ('lon', 'longitude', 'x') if _ in data.coords), [])
y_axis = next(([_] for _ in ('lat', 'latitude', 'y') if _ in data.coords), [])
var_name = [_ for _ in ('pr', 'hurs', 'tas', 'rsds', 'tasmin', 'tasmax') if _ in data.data_vars]
variable_names = [*time_axis, *x_axis, *y_axis, *var_name]

# convert to long format
t1 = time.time()
df = data[var_name].to_dask_dataframe()[variable_names]
t2 = time.time()
logger.debug(f"Converting {out_name} to long format in {t2-t1:.2f} seconds.")

return df


1 change: 0 additions & 1 deletion src/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import pandas as pd
from ydata_profiling import ProfileReport
from concurrent.futures import ThreadPoolExecutor as PoolExecutor

from param import load_params
from logger import logger
Expand Down
68 changes: 36 additions & 32 deletions src/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@
# Here is the actual tool
tool_start = time.time()

# debug the params before we do anything with them
logger.debug(f"JSON dump of parameters received: {params.model_dump_json()}")

# save the reference area to a file for later reuse
if params.reference_area is not None:
reference_area = reference_area_to_file()
Expand Down Expand Up @@ -139,53 +142,54 @@
logger.warning("It seems like no data files have been processed. This might be an error.")

# switch the type of integrations
with PoolExecutor() as executor:
logger.debug(f"START {type(executor).__name__} - Pool to ingest data files into a Dataset DuckDB database.")

if params.integration == Integrations.TEMPORAL or params.integration == Integrations.ALL:
# run the temporal aggregation
aggregator.aggregate_scale(aggregation_scale='temporal', executor=executor)

if params.integration == Integrations.SPATIAL or params.integration == Integrations.ALL:
# run the spatial aggregation
aggregator.aggregate_scale(aggregation_scale='spatial', executor=executor)

if params.integration == Integrations.SPATIO_TEMPORAL or params.integration == Integrations.ALL:
# run the spatio-temporal aggregation
aggregator.aggregate_scale(aggregation_scale='spatiotemporal', executor=executor)

# wait until all results are finished
executor.shutdown(wait=True)
logger.debug(f"STOP {type(executor).__name__} - Pool finished all tasks and shutdown.")


# finally run a thrid pool to generate reports
with PoolExecutor() as executor:
logger.debug(f"START {type(executor).__name__} - Pool to generate final reports.")
if params.integration != Integrations.NONE:
with PoolExecutor() as executor:
logger.debug(f"START {type(executor).__name__} - Pool to ingest data files into a Dataset DuckDB database.")

if params.integration == Integrations.TEMPORAL or params.integration == Integrations.ALL:
# run the temporal aggregation
aggregator.aggregate_scale(aggregation_scale='temporal', executor=executor)

if params.integration == Integrations.SPATIAL or params.integration == Integrations.ALL:
# run the spatial aggregation
aggregator.aggregate_scale(aggregation_scale='spatial', executor=executor)

if params.integration == Integrations.SPATIO_TEMPORAL or params.integration == Integrations.ALL:
# run the spatio-temporal aggregation
aggregator.aggregate_scale(aggregation_scale='spatiotemporal', executor=executor)

# wait until all results are finished
executor.shutdown(wait=True)
logger.debug(f"STOP {type(executor).__name__} - Pool finished all tasks and shutdown.")


# finally run a thrid pool to generate reports
with PoolExecutor() as executor:
logger.debug(f"START {type(executor).__name__} - Pool to generate final reports.")

# create a callback to log exceptions
def callback(future):
exc = future.exception()
if exc is not None:
logger.exception(exc)

# generate the profile report - start first as this one might potentially take longer
# TODO: there should be an option to disable this
executor.submit(reporter.generate_profile_report).add_done_callback(callback)
# generate the profile report - start first as this one might potentially take longer
# TODO: there should be an option to disable this
executor.submit(reporter.generate_profile_report).add_done_callback(callback)

# generate the readme
executor.submit(reporter.generate_readme).add_done_callback(callback)
# generate the readme
executor.submit(reporter.generate_readme).add_done_callback(callback)


# wait until all results are finished
executor.shutdown(wait=True)
logger.debug(f"STOP {type(executor).__name__} - Pool finished all tasks and shutdown.")
# wait until all results are finished
executor.shutdown(wait=True)
logger.debug(f"STOP {type(executor).__name__} - Pool finished all tasks and shutdown.")
# --------------------------------------------------------------------------- #


# we're finished.
t2 = time.time()
logger.info(f"Total runtime: {t2-tool_start:.2f} seconds.")
logger.info(f"Total runtime: {t2 - tool_start:.2f} seconds.")

# print out the report
with open('/out/processing.log', 'r') as f:
Expand Down

0 comments on commit e1c113b

Please sign in to comment.