Skip to content

Commit

Permalink
Warnings updated, one list per thread.
Browse files Browse the repository at this point in the history
  • Loading branch information
david-i-berry committed Feb 5, 2024
1 parent e8d65a6 commit cd72624
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 30 deletions.
58 changes: 30 additions & 28 deletions csv2bufr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import json
import logging
import os.path
import threading
from typing import Any, Iterator, Union

from eccodes import (codes_bufr_new_from_samples,
Expand Down Expand Up @@ -91,7 +92,7 @@
'typicalSecond': 'const:0'
}

_warnings = []
_warnings_global = {}

# status codes
FAILED = 0
Expand Down Expand Up @@ -121,7 +122,8 @@

# function to find position in array of requested element
def index_(key, mapping):
global _warnings
global _warnings_global
tidx = f"t-{threading.get_ident()}"
idx = 0
for item in mapping:
if item['eccodes_key'] == key:
Expand All @@ -130,7 +132,7 @@ def index_(key, mapping):
if NULLIFY_INVALID:
msg = f"Warning: key {key} not found in {mapping}"
LOGGER.warning(msg)
_warnings.append(msg)
_warnings_global[tidx].append(msg)
return None
else:
msg = f"Error: key {key} not found in {mapping}"
Expand All @@ -139,7 +141,9 @@ def index_(key, mapping):


def parse_value(element: str, data: dict):
global _warnings
global _warnings_global
tidx = f"t-{threading.get_ident()}"

data_type = element.split(":")
if data_type[0] == "const":
value = data_type[1]
Expand All @@ -153,7 +157,7 @@ def parse_value(element: str, data: dict):
msg = f"Column {column} not found in input data: {data}"
if NULLIFY_INVALID:
LOGGER.warning(msg) # noqa
_warnings.append(msg)
_warnings_global[tidx].append(msg)
else:
# LOGGER.error(msg) # noqa
raise ValueError(msg)
Expand All @@ -179,7 +183,8 @@ def parse_value(element: str, data: dict):

# function to retrieve data
def get_(key: str, mapping: dict, data: dict):
global _warnings
global _warnings_global
tidx = f"t-{threading.get_ident()}"
# get position in mapping
try:
idx = index_(key, mapping)
Expand All @@ -189,7 +194,7 @@ def get_(key: str, mapping: dict, data: dict):
msg = f"Warning ({e}) raised getting value for {key}, None returned for {key}" # noqa
if NULLIFY_INVALID:
LOGGER.warning(msg) # noqa
_warnings.append(msg)
_warnings_global[tidx].append(msg)
value = None
else:
raise KeyError(msg)
Expand Down Expand Up @@ -240,7 +245,8 @@ def validate_value(key: str, value: Union[NUMBERS],
:returns: validated value
"""
global _warnings
global _warnings_global
tidx = f"t-{threading.get_ident()}"
# TODO move this function to the class as part of set value

if value is None:
Expand All @@ -254,7 +260,7 @@ def validate_value(key: str, value: Union[NUMBERS],
msg = f"{key}: Value ({value}) out of valid range ({valid_min} - {valid_max})." # noqa
if nullify_on_fail:
LOGGER.warning(f"{msg}; Element set to missing")
_warnings.append(f"{msg}; Element set to missing")
_warnings_global[tidx].append(f"{msg}; Element set to missing")
return None
else:
raise ValueError(msg)
Expand All @@ -279,7 +285,7 @@ def __init__(self, descriptors: list,
:param table_version: version of Master Table 0 to use, default 36
"""
global _warnings
self.warnings = []
# ================================
# first create empty bufr messages
# ================================
Expand Down Expand Up @@ -350,7 +356,6 @@ def __init__(self, descriptors: list,
# ============================================

def create_template(self) -> None:
global _warnings
template = {}
template["inputDelayedDescriptorReplicationFactor"] = \
self.delayed_replications
Expand Down Expand Up @@ -420,6 +425,7 @@ def reset(self) -> None:
for key in self.dict:
self.dict[key]["value"] = None
self.bufr = None
self.warnings = []

def set_element(self, key: str, value: object) -> None:
"""
Expand All @@ -430,7 +436,6 @@ def set_element(self, key: str, value: object) -> None:
:returns: `None`
"""
global _warnings
# TODO move value validation here

if value is not None and not isinstance(value, list):
Expand All @@ -445,7 +450,7 @@ def set_element(self, key: str, value: object) -> None:
if NULLIFY_INVALID:
value = None
LOGGER.warning(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa
_warnings.append(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa
self.warnings.append(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa
else:
raise RuntimeError(f"{e}: Unable to convert value {value} to int for {key}") # noqa
elif expected_type == "float" and not isinstance(value, float):
Expand All @@ -455,7 +460,7 @@ def set_element(self, key: str, value: object) -> None:
if NULLIFY_INVALID:
value = None
LOGGER.warning(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa
_warnings.append(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa
self.warnings.append(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa
else:
raise RuntimeError(f"{e}: Unable to convert value {value} to float for {key}") # noqa
else:
Expand All @@ -470,7 +475,6 @@ def get_element(self, key: str) -> Any:
:returns: value of the element
"""
global _warnings
result = None
try:
# check if we want value or an attribute (indicated by ->)
Expand All @@ -484,7 +488,7 @@ def get_element(self, key: str) -> Any:
if NULLIFY_INVALID:
result = None
LOGGER.warning(f"Error {e} whilst fetching {key} from data, None returned") # noqa
_warnings.append(f"Error {e} whilst fetching {key} from data, None returned") # noqa
self.warnings.append(f"Error {e} whilst fetching {key} from data, None returned") # noqa
else:
msg = f"Error {e} whilst fetching {key} from data."
raise RuntimeError(msg)
Expand All @@ -500,7 +504,6 @@ def as_bufr(self, use_cached: bool = False) -> bytes:
:returns: bytes containing BUFR data
"""
global _warnings
if use_cached and (self.bufr is not None):
return self.bufr
# ===========================
Expand Down Expand Up @@ -540,7 +543,7 @@ def as_bufr(self, use_cached: bool = False) -> bytes:
except CodesInternalError as e:
msg = f"Error ({e}) calling codes_set({bufr_msg}, 'pack', True). Null message returned" # noqa
LOGGER.warning(f"{msg}") # noqa
_warnings.append(f"{msg}") # noqa
self.warnings.append(f"{msg}") # noqa
codes_release(bufr_msg)
return self.bufr
except Exception as e:
Expand Down Expand Up @@ -579,7 +582,6 @@ def md5(self) -> str:
:returns: md5 of BUFR message
"""
global _warnings
return self._hash

def parse(self, data: dict, mappings: dict) -> None:
Expand All @@ -598,7 +600,6 @@ def parse(self, data: dict, mappings: dict) -> None:
# ==================================================
# Parse the data.
# ==================================================
global _warnings
for section in ("header", "data"):
for element in mappings[section]:
# get eccodes key
Expand Down Expand Up @@ -634,7 +635,7 @@ def parse(self, data: dict, mappings: dict) -> None:
except Exception as e:
if NULLIFY_INVALID:
LOGGER.warning(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa
_warnings.append(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa
self.warnings.append(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa
value = None
else:
# LOGGER.error(f"Error raised whilst validating {element['eccodes_key']}, raising error") # noqa
Expand Down Expand Up @@ -674,7 +675,6 @@ def get_datetime(self) -> datetime:
:returns: `datetime.datetime` of ISO8601 representation of the
characteristic date/time
"""
global _warnings
if None in [
self.get_element("typicalYear"),
self.get_element("typicalMonth"),
Expand Down Expand Up @@ -733,7 +733,9 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
:returns: iterator
"""
global _warnings
global _warnings_global
job_id = f"t-{threading.get_ident()}" # job ID based on thread
_warnings_global[job_id] = []
# ======================
# validate mapping files
# ======================
Expand Down Expand Up @@ -797,7 +799,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
if _delimiter not in [",", ";", "|", "\t"]:
msg = "Invalid delimiter specified in mapping template, reverting to comma ','" # noqa
LOGGER.warning(msg)
_warnings.append(msg)
_warnings_global[job_id].append(msg)
_delimiter = ","
else:
_delimiter = DELIMITER
Expand Down Expand Up @@ -865,7 +867,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
if NULLIFY_INVALID:
msg = f"csv read error, non ASCII data detected ({val}), skipping row" # noqa
LOGGER.warning(msg) # noqa
_warnings.append(msg)
_warnings_global[job_id].append(msg)
LOGGER.debug(row)
continue
else:
Expand Down Expand Up @@ -913,7 +915,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
"code": PASSED,
"message": "",
"errors": [],
"warnings": _warnings
"warnings": message.warnings + _warnings_global[job_id]
}
cksum = message.md5()
# now identifier based on WSI and observation date as identifier
Expand Down Expand Up @@ -948,7 +950,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
"code": FAILED,
"message": "Error encoding row, BUFR set to None",
"errors": [f"{msg}\n\t\tData: {data_dict}"],
"warnings": _warnings
"warnings": message.warnings + _warnings_global[job_id]
}
result["_meta"] = {
"id": None,
Expand All @@ -971,6 +973,6 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
# now yield result back to caller
yield result
# clear warnings
_warnings = []
del _warnings_global[job_id]

fh.close()
12 changes: 10 additions & 2 deletions tests/test_csv2bufr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,25 @@

import csv
import os
import threading
from io import StringIO
import logging
import threading

from eccodes import (codes_bufr_new_from_samples, codes_release)
import pytest

from csv2bufr import (apply_scaling, validate_value,
transform, SUCCESS)
transform, SUCCESS, _warnings_global)

import csv2bufr.templates as c2bt

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel("DEBUG")

# set up warnings dict
tidx = f"t-{threading.get_ident()}"
_warnings_global[tidx] = []

# test data
@pytest.fixture
Expand Down Expand Up @@ -216,12 +221,15 @@ def test_validate_value_fail():
# test to check that valid_value returns null value when we expect it to
def test_validate_value_nullify():
input_value = 10.0
tid = f"t-{threading.get_ident()}"
if tid not in _warnings_global:
_warnings_global[tid] = []
try:
value = validate_value("test value", input_value, 0.0, 9.9, True)
except Exception:
assert False
assert value is None

del _warnings_global[tid]

# check that test transform works
def test_transform(data_dict, mapping_dict):
Expand Down

0 comments on commit cd72624

Please sign in to comment.