Skip to content

Commit

Permalink
Merge pull request #117 from wmo-im/warnings
Browse files Browse the repository at this point in the history
Improved warnings / error handling.
  • Loading branch information
david-i-berry authored Sep 1, 2023
2 parents f446b5e + 345dc20 commit 9f3b18e
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 44 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ ENV DEBIAN_FRONTEND="noninteractive" \
PATH="${PATH}:/opt/eccodes/bin"

RUN apt-get update -y \
&& apt-get install -y vim emacs nedit nano
&& apt-get install -y vim emacs nedit nano git \
&& pip3 install --no-cache-dir git+https://github.com/wmo-im/csv2bufr-templates@main # ToDo - move to requirements.txt

WORKDIR /tmp

Expand Down
93 changes: 60 additions & 33 deletions csv2bufr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
'typicalSecond': 'const:0'
}

_warnings = []

# status codes
FAILED = 0
Expand Down Expand Up @@ -123,19 +124,24 @@

# function to find position in array of requested element
def index_(key, mapping):
global _warnings
idx = 0
for item in mapping:
if item['eccodes_key'] == key:
return idx
idx += 1
if NULLIFY_INVALID:
LOGGER.warning(f"key {key} not found in {mapping}")
msg = f"key {key} not found in {mapping}"
LOGGER.warning(msg)
_warnings.append(msg)
return None
else:
LOGGER.error(f"key {key} not found in {mapping}")
raise ValueError
raise ValueError


def parse_value(element: str, data: dict):
global _warnings
data_type = element.split(":")
if data_type[0] == "const":
value = data_type[1]
Expand All @@ -146,11 +152,14 @@ def parse_value(element: str, data: dict):
elif data_type[0] == "data":
column = data_type[1]
if column not in data:
msg = f"Column {column} not found in input data: {data}"
if NULLIFY_INVALID:
LOGGER.warning(f"Column {column} not found in input data: {data}") # noqa
LOGGER.warning("HERE")
LOGGER.warning(msg) # noqa
_warnings.append(msg)
else:
LOGGER.error(f"Column {column} not found in input data: {data}") # noqa
raise ValueError
LOGGER.error(msg) # noqa
raise ValueError
value = data[column]
elif data_type[0] == "array":
value = data_type[1]
Expand All @@ -172,14 +181,17 @@ def parse_value(element: str, data: dict):

# function to retrieve data
def get_(key: str, mapping: dict, data: dict):
global _warnings
# get position in mapping
try:
idx = index_(key, mapping)
element = mapping[idx]
value = parse_value(element['value'], data)
except Exception as e:
msg = f"Warning raised getting value for {key}, None returned for {key}" # noqa
if NULLIFY_INVALID:
LOGGER.warning(f"Warning raised getting value for {key}, None returned for {key}") # noqa
LOGGER.warning(msg) # noqa
_warnings.append(msg)
value = None
else:
raise e
Expand All @@ -197,7 +209,7 @@ def validate_mapping(mapping: dict) -> bool:
:returns: `bool` of validation result
"""

global _warnings
# load internal file schema for mappings
file_schema = f"{MAPPINGS}{os.sep}mapping_schema.json"
with open(file_schema) as fh:
Expand Down Expand Up @@ -230,7 +242,7 @@ def apply_scaling(value: Union[NUMBERS], scale: Union[NUMBERS],
:returns: scaled value
"""

global _warnings
if isinstance(value, NUMBERS):
if None not in [scale, offset]:
try:
Expand All @@ -257,7 +269,7 @@ def validate_value(key: str, value: Union[NUMBERS],
:returns: validated value
"""

global _warnings
# TODO move this function to the class as part of set value

if value is None:
Expand All @@ -271,6 +283,7 @@ def validate_value(key: str, value: Union[NUMBERS],
e = ValueError(f"{key}: Value ({value}) out of valid range ({valid_min} - {valid_max}).") # noqa
if nullify_on_fail:
LOGGER.warning(f"{e}; Element set to missing")
_warnings.append(f"{e}; Element set to missing")
return None
else:
# LOGGER.error(str(e))
Expand All @@ -296,6 +309,7 @@ def __init__(self, descriptors: list,
:param table_version: version of Master Table 0 to use, default 36
"""
global _warnings
# ================================
# first create empty bufr messages
# ================================
Expand Down Expand Up @@ -359,9 +373,11 @@ def __init__(self, descriptors: list,
self.extended_delayed_replications = \
extended_delayed_replications # used when encoding
self.bufr = None # placeholder for BUFR bytes
self._hash = None # placeholder for hash of data
# ============================================

def create_template(self) -> None:
global _warnings
template = {}
template["inputDelayedDescriptorReplicationFactor"] = \
self.delayed_replications
Expand Down Expand Up @@ -441,7 +457,7 @@ def set_element(self, key: str, value: object) -> None:
:returns: `None`
"""

global _warnings
# TODO move value validation here

if value is not None and not isinstance(value, list):
Expand All @@ -456,6 +472,7 @@ def set_element(self, key: str, value: object) -> None:
if NULLIFY_INVALID:
value = None
LOGGER.warning(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa
_warnings.append(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa
else:
raise e
elif expected_type == "float" and not isinstance(value, float):
Expand All @@ -465,6 +482,7 @@ def set_element(self, key: str, value: object) -> None:
if NULLIFY_INVALID:
value = None
LOGGER.warning(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa
_warnings.append(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa
else:
raise e
else:
Expand All @@ -479,6 +497,7 @@ def get_element(self, key: str) -> Any:
:returns: value of the element
"""
global _warnings
result = None
try:
# check if we want value or an attribute (indicated by ->)
Expand All @@ -492,6 +511,7 @@ def get_element(self, key: str) -> Any:
if NULLIFY_INVALID:
result = None
LOGGER.warning(f"Error {e} whilst fetching {key} from data, None returned") # noqa
_warnings.append(f"Error {e} whilst fetching {key} from data, None returned") # noqa
else:
msg = f"Error {e} whilst fetching {key} from data"
raise RuntimeError(msg)
Expand All @@ -507,7 +527,7 @@ def as_bufr(self, use_cached: bool = False) -> bytes:
:returns: bytes containing BUFR data
"""

global _warnings
if use_cached and (self.bufr is not None):
return self.bufr
# ===========================
Expand Down Expand Up @@ -544,12 +564,12 @@ def as_bufr(self, use_cached: bool = False) -> bytes:
try:
codes_set(bufr_msg, "pack", True)
except CodesInternalError as e:
LOGGER.warning(f"error calling codes_set({bufr_msg}, 'pack', True): {e}") # noqa
LOGGER.warning("null message returned")
LOGGER.warning(f"Error calling codes_set({bufr_msg}, 'pack', True): {e}. Null message returned") # noqa
_warnings.append(f"Error calling codes_set({bufr_msg}, 'pack', True): {e}. Null message returned") # noqa
codes_release(bufr_msg)
return self.bufr
except Exception as e:
LOGGER.error(f"error calling codes_set({bufr_msg}, 'pack', True): {e}") # noqa
LOGGER.error(f"Error calling codes_set({bufr_msg}, 'pack', True): {e}") # noqa
LOGGER.error(json.dumps(self.dict, indent=4))
raise e
# =======================================================
Expand All @@ -567,6 +587,13 @@ def as_bufr(self, use_cached: bool = False) -> bytes:
# Return BUFR message bytes
# =============================================
self.bufr = fh.read()
try:
# set hash
self._hash = hashlib.md5(self.bufr).hexdigest()
except Exception as e:
LOGGER.error(f"Error calculating hash (md5) of BUFR string: {self.bufr}") # noqa
raise e

return self.bufr

def md5(self) -> str:
Expand All @@ -575,12 +602,8 @@ def md5(self) -> str:
:returns: md5 of BUFR message
"""
bufr = self.as_bufr(use_cached=True)

if bufr is not None:
return hashlib.md5(bufr).hexdigest()
else:
return None
global _warnings
return self._hash

def parse(self, data: dict, mappings: dict) -> None:
"""
Expand All @@ -598,6 +621,7 @@ def parse(self, data: dict, mappings: dict) -> None:
# ==================================================
# Parse the data.
# ==================================================
global _warnings
for section in ("header", "data"):
for element in mappings[section]:
# get eccodes key
Expand Down Expand Up @@ -631,8 +655,8 @@ def parse(self, data: dict, mappings: dict) -> None:
NULLIFY_INVALID)
except Exception as e:
if NULLIFY_INVALID:
LOGGER.warning(f"Error raised whilst validating {element['eccodes_key']}, value set to None") # noqa
LOGGER.warning(f"data: {data}")
LOGGER.warning(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa
_warnings.append(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa
value = None
else:
# LOGGER.error(f"Error raised whilst validating {element['eccodes_key']}, raising error") # noqa
Expand All @@ -655,7 +679,7 @@ def parse(self, data: dict, mappings: dict) -> None:
value = apply_scaling(value, scale, offset)
except Exception as e:
LOGGER.error(f"Error scaling data: scale={scale}, offet={offset}, value={value}") # noqa
LOGGER.error(f"data: {data}")
LOGGER.debug(f"data: {data}")
raise e

# ==================================================
Expand All @@ -671,7 +695,7 @@ def get_datetime(self) -> datetime:
:returns: `datetime.datetime` of ISO8601 representation of the
characteristic date/time
"""

global _warnings
if None in [
self.get_element("typicalYear"),
self.get_element("typicalMonth"),
Expand Down Expand Up @@ -730,7 +754,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
:returns: iterator
"""

global _warnings
# ======================
# validate mapping files
# ======================
Expand Down Expand Up @@ -855,7 +879,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
if not val.isascii():
if NULLIFY_INVALID:
LOGGER.error(f"csv read error, non ASCII data detected ({val}), skipping row") # noqa
LOGGER.error(row)
LOGGER.debug(row)
continue
else:
raise ValueError
Expand All @@ -881,7 +905,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
data_dict["_wsi_local"] = wsi_local
except Exception as e:
LOGGER.error("Error parsing WIGOS station identifier")
LOGGER.error(f"data:{data_dict}")
LOGGER.debug(f"data:{data_dict}")
raise ValueError(e)
# reset BUFR message to clear data
message.reset()
Expand All @@ -894,7 +918,8 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
status = {
"code": PASSED,
"message": "",
"errors": []
"errors": [],
"warnings": _warnings
}
cksum = message.md5()
# now identifier based on WSI and observation date as identifier
Expand All @@ -921,14 +946,14 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
}

except Exception as e:
LOGGER.error(e)
LOGGER.error("Error encoding BUFR, BUFR set to None")
LOGGER.error(f"data:{data_dict}")
LOGGER.error(f"Error encoding BUFR, BUFR set to None\n\t{e}")
LOGGER.debug(f"data:{data_dict}")
result["bufr4"] = None
status = {
"code": FAILED,
"message": "Error encoding row, BUFR set to None",
"errors": [f"Error: {e}\n\t\tData: {data_dict}"]
"errors": [f"Error: {e}\n\t\tData: {data_dict}"],
"warnings": _warnings
}
result["_meta"] = {
"id": None,
Expand All @@ -948,7 +973,9 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:

time_ = datetime.now(timezone.utc).isoformat()
LOGGER.info(f"{time_}|{result['_meta']}")

# now yield result back to caller
yield result
# clear warnings
_warnings = []

fh.close()
24 changes: 14 additions & 10 deletions csv2bufr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import click

from csv2bufr import __version__, BUFRMessage, transform as transform_csv
import csv2bufr_templates as c2bt

THISDIR = os.path.dirname(os.path.realpath(__file__))
MAPPINGS = f"{THISDIR}{os.sep}resources{os.sep}mappings"
Expand Down Expand Up @@ -108,18 +109,21 @@ def transform(ctx, csv_file, mapping, output_dir, verbosity): # noqa
result = None
click.echo(f"\nCLI:\t... Transforming {csv_file.name} to BUFR ...")

# identify mapping to use
# load / identify mapping to use
if not os.path.isfile(mapping):
mappings_file = f"{MAPPINGS}{os.sep}{mapping}.json"
if not os.path.isfile(mappings_file):
raise click.ClickException(
f"Invalid stored mapping ({mappings_file})")
try:
mappings = c2bt.load_template(mapping)
if mappings is None:
raise click.ClickException(
f"Error loading mappings {mapping}")
except Exception as err:
raise click.ClickException(err)
else:
mappings_file = mapping

# load mappings
with open(mappings_file) as fh:
mappings = json.load(fh)
try:
with open(mapping) as fh:
mappings = json.load(fh)
except Exception as err:
raise click.ClickException(err)

try:
result = transform_csv(csv_file.read(), mappings)
Expand Down

0 comments on commit 9f3b18e

Please sign in to comment.