From cd726240cb2e5081debcc5a68858506a6b213292 Mon Sep 17 00:00:00 2001 From: david-i-berry Date: Mon, 5 Feb 2024 12:18:22 +0100 Subject: [PATCH] Warnings updated, one list per thread. --- csv2bufr/__init__.py | 58 ++++++++++++++++++++++-------------------- tests/test_csv2bufr.py | 12 +++++++-- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/csv2bufr/__init__.py b/csv2bufr/__init__.py index 0f7039d..9e4ce39 100644 --- a/csv2bufr/__init__.py +++ b/csv2bufr/__init__.py @@ -28,6 +28,7 @@ import json import logging import os.path +import threading from typing import Any, Iterator, Union from eccodes import (codes_bufr_new_from_samples, @@ -91,7 +92,7 @@ 'typicalSecond': 'const:0' } -_warnings = [] +_warnings_global = {} # status codes FAILED = 0 @@ -121,7 +122,8 @@ # function to find position in array of requested element def index_(key, mapping): - global _warnings + global _warnings_global + tidx = f"t-{threading.get_ident()}" idx = 0 for item in mapping: if item['eccodes_key'] == key: @@ -130,7 +132,7 @@ def index_(key, mapping): if NULLIFY_INVALID: msg = f"Warning: key {key} not found in {mapping}" LOGGER.warning(msg) - _warnings.append(msg) + _warnings_global[tidx].append(msg) return None else: msg = f"Error: key {key} not found in {mapping}" @@ -139,7 +141,9 @@ def index_(key, mapping): def parse_value(element: str, data: dict): - global _warnings + global _warnings_global + tidx = f"t-{threading.get_ident()}" + data_type = element.split(":") if data_type[0] == "const": value = data_type[1] @@ -153,7 +157,7 @@ def parse_value(element: str, data: dict): msg = f"Column {column} not found in input data: {data}" if NULLIFY_INVALID: LOGGER.warning(msg) # noqa - _warnings.append(msg) + _warnings_global[tidx].append(msg) else: # LOGGER.error(msg) # noqa raise ValueError(msg) @@ -179,7 +183,8 @@ def parse_value(element: str, data: dict): # function to retrieve data def get_(key: str, mapping: dict, data: dict): - global _warnings + global _warnings_global + tidx = f"t-{threading.get_ident()}" # get position in mapping try: idx = index_(key, mapping) @@ -189,7 +194,7 @@ def get_(key: str, mapping: dict, data: dict): msg = f"Warning ({e}) raised getting value for {key}, None returned for {key}" # noqa if NULLIFY_INVALID: LOGGER.warning(msg) # noqa - _warnings.append(msg) + _warnings_global[tidx].append(msg) value = None else: raise KeyError(msg) @@ -240,7 +245,8 @@ def validate_value(key: str, value: Union[NUMBERS], :returns: validated value """ - global _warnings + global _warnings_global + tidx = f"t-{threading.get_ident()}" # TODO move this function to the class as part of set value if value is None: @@ -254,7 +260,7 @@ def validate_value(key: str, value: Union[NUMBERS], msg = f"{key}: Value ({value}) out of valid range ({valid_min} - {valid_max})." # noqa if nullify_on_fail: LOGGER.warning(f"{msg}; Element set to missing") - _warnings.append(f"{msg}; Element set to missing") + _warnings_global[tidx].append(f"{msg}; Element set to missing") return None else: raise ValueError(msg) @@ -279,7 +285,7 @@ def __init__(self, descriptors: list, :param table_version: version of Master Table 0 to use, default 36 """ - global _warnings + self.warnings = [] # ================================ # first create empty bufr messages # ================================ @@ -350,7 +356,6 @@ def __init__(self, descriptors: list, # ============================================ def create_template(self) -> None: - global _warnings template = {} template["inputDelayedDescriptorReplicationFactor"] = \ self.delayed_replications @@ -420,6 +425,7 @@ def reset(self) -> None: for key in self.dict: self.dict[key]["value"] = None self.bufr = None + self.warnings = [] def set_element(self, key: str, value: object) -> None: """ @@ -430,7 +436,6 @@ def set_element(self, key: str, value: object) -> None: :returns: `None` """ - global _warnings # TODO move value validation here if value is not None and not isinstance(value, list): @@ -445,7 +450,7 @@ def set_element(self, key: str, value: object) -> None: if NULLIFY_INVALID: value = None LOGGER.warning(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa - _warnings.append(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa + self.warnings.append(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa else: raise RuntimeError(f"{e}: Unable to convert value {value} to int for {key}") # noqa elif expected_type == "float" and not isinstance(value, float): @@ -455,7 +460,7 @@ def set_element(self, key: str, value: object) -> None: if NULLIFY_INVALID: value = None LOGGER.warning(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa - _warnings.append(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa + self.warnings.append(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa else: raise RuntimeError(f"{e}: Unable to convert value {value} to float for {key}") # noqa else: @@ -470,7 +475,6 @@ def get_element(self, key: str) -> Any: :returns: value of the element """ - global _warnings result = None try: # check if we want value or an attribute (indicated by ->) @@ -484,7 +488,7 @@ def get_element(self, key: str) -> Any: if NULLIFY_INVALID: result = None LOGGER.warning(f"Error {e} whilst fetching {key} from data, None returned") # noqa - _warnings.append(f"Error {e} whilst fetching {key} from data, None returned") # noqa + self.warnings.append(f"Error {e} whilst fetching {key} from data, None returned") # noqa else: msg = f"Error {e} whilst fetching {key} from data." raise RuntimeError(msg) @@ -500,7 +504,6 @@ def as_bufr(self, use_cached: bool = False) -> bytes: :returns: bytes containing BUFR data """ - global _warnings if use_cached and (self.bufr is not None): return self.bufr # =========================== @@ -540,7 +543,7 @@ def as_bufr(self, use_cached: bool = False) -> bytes: except CodesInternalError as e: msg = f"Error ({e}) calling codes_set({bufr_msg}, 'pack', True). Null message returned" # noqa LOGGER.warning(f"{msg}") # noqa - _warnings.append(f"{msg}") # noqa + self.warnings.append(f"{msg}") # noqa codes_release(bufr_msg) return self.bufr except Exception as e: @@ -579,7 +582,6 @@ def md5(self) -> str: :returns: md5 of BUFR message """ - global _warnings return self._hash def parse(self, data: dict, mappings: dict) -> None: @@ -598,7 +600,6 @@ def parse(self, data: dict, mappings: dict) -> None: # ================================================== # Parse the data. # ================================================== - global _warnings for section in ("header", "data"): for element in mappings[section]: # get eccodes key @@ -634,7 +635,7 @@ def parse(self, data: dict, mappings: dict) -> None: except Exception as e: if NULLIFY_INVALID: LOGGER.warning(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa - _warnings.append(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa + self.warnings.append(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa value = None else: # LOGGER.error(f"Error raised whilst validating {element['eccodes_key']}, raising error") # noqa @@ -674,7 +675,6 @@ def get_datetime(self) -> datetime: :returns: `datetime.datetime` of ISO8601 representation of the characteristic date/time """ - global _warnings if None in [ self.get_element("typicalYear"), self.get_element("typicalMonth"), @@ -733,7 +733,9 @@ def transform(data: str, mappings: dict) -> Iterator[dict]: :returns: iterator """ - global _warnings + global _warnings_global + job_id = f"t-{threading.get_ident()}" # job ID based on thread + _warnings_global[job_id] = [] # ====================== # validate mapping files # ====================== @@ -797,7 +799,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]: if _delimiter not in [",", ";", "|", "\t"]: msg = "Invalid delimiter specified in mapping template, reverting to comma ','" # noqa LOGGER.warning(msg) - _warnings.append(msg) + _warnings_global[job_id].append(msg) _delimiter = "," else: _delimiter = DELIMITER @@ -865,7 +867,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]: if NULLIFY_INVALID: msg = f"csv read error, non ASCII data detected ({val}), skipping row" # noqa LOGGER.warning(msg) # noqa - _warnings.append(msg) + _warnings_global[job_id].append(msg) LOGGER.debug(row) continue else: @@ -913,7 +915,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]: "code": PASSED, "message": "", "errors": [], - "warnings": _warnings + "warnings": message.warnings + _warnings_global[job_id] } cksum = message.md5() # now identifier based on WSI and observation date as identifier @@ -948,7 +950,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]: "code": FAILED, "message": "Error encoding row, BUFR set to None", "errors": [f"{msg}\n\t\tData: {data_dict}"], - "warnings": _warnings + "warnings": message.warnings + _warnings_global[job_id] } result["_meta"] = { "id": None, @@ -971,6 +973,6 @@ def transform(data: str, mappings: dict) -> Iterator[dict]: # now yield result back to caller yield result # clear warnings - _warnings = [] + del _warnings_global[job_id] fh.close() diff --git a/tests/test_csv2bufr.py b/tests/test_csv2bufr.py index 9fef035..bf41757 100644 --- a/tests/test_csv2bufr.py +++ b/tests/test_csv2bufr.py @@ -21,20 +21,25 @@ import csv import os +import threading from io import StringIO import logging +import threading from eccodes import (codes_bufr_new_from_samples, codes_release) import pytest from csv2bufr import (apply_scaling, validate_value, - transform, SUCCESS) + transform, SUCCESS, _warnings_global) import csv2bufr.templates as c2bt LOGGER = logging.getLogger(__name__) LOGGER.setLevel("DEBUG") +# set up warnings dict +tidx = f"t-{threading.get_ident()}" +_warnings_global[tidx] = [] # test data @pytest.fixture @@ -216,12 +221,15 @@ def test_validate_value_fail(): # test to check that valid_value returns null value when we expect it to def test_validate_value_nullify(): input_value = 10.0 + tid = f"t-{threading.get_ident()}" + if tid not in _warnings_global: + _warnings_global[tid] = [] try: value = validate_value("test value", input_value, 0.0, 9.9, True) except Exception: assert False assert value is None - + del _warnings_global[tid] # check that test transform works def test_transform(data_dict, mapping_dict):