Skip to content

Commit

Permalink
internal table loading is implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
mmaelicke committed Jan 19, 2024
1 parent 8828017 commit 6f98b14
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,4 @@ dmypy.json
# project specific
out
pg_data
.DS_Store
6 changes: 3 additions & 3 deletions in/inputs.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
"vforwater_loader": {
"parameters": {
"dataset_ids": [
42,
43
1,
2
],
"start_date": "2000-01-01T12:00:00+01",
"end_date": "2015-01-01T12:00:00+01",
"end_date": "2014-01-01T12:00:00+01",
"reference_area": {
"type": "Feature",
"geometry": {
Expand Down
97 changes: 90 additions & 7 deletions src/run.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import os
from datetime import datetime as dt
from dotenv import load_dotenv
from pathlib import Path
import shutil
import json

from json2args import get_parameter
from dotenv import load_dotenv
from metacatalog import api
from tqdm import tqdm

import pandas as pd
import xarray as xr

# parse parameters
kwargs = get_parameter()
Expand Down Expand Up @@ -37,16 +44,72 @@
# check if a connection evironment variable is given
if 'VFW_POSTGRES_URI' in os.environ:
connection = os.environ['VFW_POSTGRES_URI']
elif 'METACATALOG_URI' in os.environ:
connection = os.environ['METACATALOG_URI']
else:
connection = None

# if we could not derive a connection, we hope for the best and hope that
# defaults are used
session = api.connect_database(connection)

# container for errors as thing could go wrong from here
errors = []
logs = []

# load the datasets
for dataset_id in tqdm(dataset_ids):
try:
results = api.find_entry(session, id=dataset_id, as_result=False)
if len(results) > 1:
raise Exception(f"Found more than one dataset with id {dataset_id}. (len = {len(results)})")
elif len(results) == 0:
raise Exception(f"Found no dataset with id {dataset_id}.")

# load the dataset
entry = results[0]

# load the data
data = entry.get_data(start=start, end=end)
name = f"/out/{entry.variable.name.replace(' ', '_')}_{entry.id}"

# pandas
if isinstance(data, pd.DataFrame):
# save to output folder
data.to_parquet(f"{name}.parquet")
logs.append(f"Saved dataset ID={entry.id} to {name}.parquet")

# xarray
elif isinstance(data, xr.Dataset):
data.to_netcdf(f"{name}.nc")
logs.append(f"Saved dataset ID={entry.id} to {name}.nc")

# path
elif isinstance(data, str):
if Path(data).exists():
new_loc = Path('/out') / Path(data).name
shutil.copy(data, str(new_loc))
logs.append(f"Saved dataset ID={entry.id} to {str(new_loc)}")

# finally save the metadata - only JSON for now
with open(f"{name}.json", 'w') as f:
json.dump(entry.to_dict(deep=True, stringify=True), f, indent=4)


except Exception as e:
errors.append(f"ERRORED STEP LOAD DATASET: {str(e)}")
continue



# ----------------------------
# FROM HERE, ONLY DEVELOPMENT
# output report

log_messages = "\n".join([f" - {log}" for log in logs])

# build the message for now
MSG = f"""
This is the V-FOR-WaTer data loader. Ufortunately, this tool is not yet implemented.
This is the V-FOR-WaTer data loader report
The following information has been submitted to the tool:
Expand All @@ -58,13 +121,33 @@
{', '.join(map(str, dataset_ids))}
DATABASE CONNECTION: {connection is not None}
DATABASE URI: {connection[:10] + '***' + connection[-6:] if connection is not None else 'N.A.'}
DATABASE URI: {session.bind}
PROCESSING LOGS
---------------
{log_messages}
NOTE
----
The current version of the tool does not yet support clipping by reference area.
We are on it.
"""

# RUN the tool here and create the output in /out
# Add error messages
if len(errors) > 0:
MSG += "\n\nERRORS:\n-------\nUnfortunately, there were errors during the processing of this tool. Please read them carefully:\n\n"
MSG += "\n\n".join(errors)

with open('/out/errors.txt', 'w') as f:
f.write("\n".join(errors))

# print out the report
print(MSG)

# save
with open('/out/result.txt', 'w') as f:
# save it
with open('/out/process.txt', 'w') as f:
f.write(MSG)

# save the logs on their own
with open('/out/logs.txt', 'w') as f:
f.write("\n".join(logs))

0 comments on commit 6f98b14

Please sign in to comment.