Skip to content

Commit

Permalink
Template refactor (#125)
Browse files Browse the repository at this point in the history
* Template files removed from repo and back to csv2bufr-templates.
Metadata added to template schema.

* Update to mapping schema filename.

* Update to tests.yml to install templates before testing.

* Flake 8 fix.

* Update to repo used for templates.

* Update to repo used for templates.

* change requested, library ordering.

* Updates to enforce BUFR_ORIGINATING_CENTRE and BUFR_ORIGINATING_SUBCENTRE to be set.

* Update tests

* debugging GH tests workflow.

* flake

* defaults

* update GH tests.yml

* Docs.

* Docs.

* Warnings updated, one list per thread.

* Warnings updated, one list per thread.

* flake8

* typo's in docs.

* Update tests.yml

* Update installation.rst

---------

Co-authored-by: Tom Kralidis <[email protected]>
  • Loading branch information
david-i-berry and tomkralidis authored Feb 6, 2024
1 parent 56d6876 commit 8be9605
Show file tree
Hide file tree
Showing 14 changed files with 417 additions and 514 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ jobs:
pip3 install -r requirements.txt
pip3 install -r requirements-dev.txt
python3 setup.py install
mkdir -p /opt/csv2bufr
cd /opt/csv2bufr
wget https://github.com/wmo-im/csv2bufr-templates/archive/refs/tags/v0.1.tar.gz
tar -zxf v0.1.tar.gz --strip-components=1 csv2bufr-templates-0.1/templates
- name: run tests ⚙️
run: |
pytest
Expand Down
93 changes: 33 additions & 60 deletions csv2bufr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import json
import logging
import os.path
import threading
from typing import Any, Iterator, Union

from eccodes import (codes_bufr_new_from_samples,
Expand All @@ -38,7 +39,7 @@
codes_bufr_keys_iterator_delete,
codes_bufr_keys_iterator_get_name, CodesInternalError)

from jsonschema import validate
import csv2bufr.templates as c2bt

# some 'constants'
SUCCESS = True
Expand All @@ -56,9 +57,6 @@

LOGGER = logging.getLogger(__name__)

THISDIR = os.path.dirname(os.path.realpath(__file__))
MAPPINGS = f"{THISDIR}{os.sep}resources{os.sep}mappings"

BUFR_TABLE_VERSION = 38 # default BUFR table version
# list of BUFR attributes
ATTRIBUTES = ['code', 'units', 'scale', 'reference', 'width']
Expand Down Expand Up @@ -94,7 +92,7 @@
'typicalSecond': 'const:0'
}

_warnings = []
_warnings_global = {}

# status codes
FAILED = 0
Expand Down Expand Up @@ -124,7 +122,8 @@

# function to find position in array of requested element
def index_(key, mapping):
global _warnings
global _warnings_global
tidx = f"t-{threading.get_ident()}"
idx = 0
for item in mapping:
if item['eccodes_key'] == key:
Expand All @@ -133,7 +132,7 @@ def index_(key, mapping):
if NULLIFY_INVALID:
msg = f"Warning: key {key} not found in {mapping}"
LOGGER.warning(msg)
_warnings.append(msg)
_warnings_global[tidx].append(msg)
return None
else:
msg = f"Error: key {key} not found in {mapping}"
Expand All @@ -142,7 +141,9 @@ def index_(key, mapping):


def parse_value(element: str, data: dict):
global _warnings
global _warnings_global
tidx = f"t-{threading.get_ident()}"

data_type = element.split(":")
if data_type[0] == "const":
value = data_type[1]
Expand All @@ -156,7 +157,7 @@ def parse_value(element: str, data: dict):
msg = f"Column {column} not found in input data: {data}"
if NULLIFY_INVALID:
LOGGER.warning(msg) # noqa
_warnings.append(msg)
_warnings_global[tidx].append(msg)
else:
# LOGGER.error(msg) # noqa
raise ValueError(msg)
Expand All @@ -182,7 +183,8 @@ def parse_value(element: str, data: dict):

# function to retrieve data
def get_(key: str, mapping: dict, data: dict):
global _warnings
global _warnings_global
tidx = f"t-{threading.get_ident()}"
# get position in mapping
try:
idx = index_(key, mapping)
Expand All @@ -192,40 +194,13 @@ def get_(key: str, mapping: dict, data: dict):
msg = f"Warning ({e}) raised getting value for {key}, None returned for {key}" # noqa
if NULLIFY_INVALID:
LOGGER.warning(msg) # noqa
_warnings.append(msg)
_warnings_global[tidx].append(msg)
value = None
else:
raise KeyError(msg)
return value


# function to validate mapping file against JSON schema
def validate_mapping(mapping: dict) -> bool:
"""
Validates dictionary containing mapping to BUFR against internal schema.
Returns True if the dictionary passes and raises an error otherwise.
:param mapping: dictionary containing mappings to specified BUFR
sequence using ecCodes key.
:returns: `bool` of validation result
"""
global _warnings
# load internal file schema for mappings
file_schema = f"{MAPPINGS}{os.sep}mapping_schema.json"
with open(file_schema) as fh:
schema = json.load(fh)

# now validate
try:
validate(mapping, schema)
except Exception as e:
msg = f"Warning ({e}). Invalid BUFR template mapping file: {mapping}"
raise RuntimeError(msg)

return SUCCESS


def apply_scaling(value: Union[NUMBERS], scale: Union[NUMBERS],
offset: Union[NUMBERS]) -> Union[NUMBERS]:
"""
Expand Down Expand Up @@ -270,7 +245,8 @@ def validate_value(key: str, value: Union[NUMBERS],
:returns: validated value
"""
global _warnings
global _warnings_global
tidx = f"t-{threading.get_ident()}"
# TODO move this function to the class as part of set value

if value is None:
Expand All @@ -284,7 +260,7 @@ def validate_value(key: str, value: Union[NUMBERS],
msg = f"{key}: Value ({value}) out of valid range ({valid_min} - {valid_max})." # noqa
if nullify_on_fail:
LOGGER.warning(f"{msg}; Element set to missing")
_warnings.append(f"{msg}; Element set to missing")
_warnings_global[tidx].append(f"{msg}; Element set to missing")
return None
else:
raise ValueError(msg)
Expand All @@ -309,7 +285,7 @@ def __init__(self, descriptors: list,
:param table_version: version of Master Table 0 to use, default 36
"""
global _warnings
self.warnings = []
# ================================
# first create empty bufr messages
# ================================
Expand Down Expand Up @@ -380,7 +356,6 @@ def __init__(self, descriptors: list,
# ============================================

def create_template(self) -> None:
global _warnings
template = {}
template["inputDelayedDescriptorReplicationFactor"] = \
self.delayed_replications
Expand Down Expand Up @@ -450,6 +425,7 @@ def reset(self) -> None:
for key in self.dict:
self.dict[key]["value"] = None
self.bufr = None
self.warnings = []

def set_element(self, key: str, value: object) -> None:
"""
Expand All @@ -460,7 +436,6 @@ def set_element(self, key: str, value: object) -> None:
:returns: `None`
"""
global _warnings
# TODO move value validation here

if value is not None and not isinstance(value, list):
Expand All @@ -475,7 +450,7 @@ def set_element(self, key: str, value: object) -> None:
if NULLIFY_INVALID:
value = None
LOGGER.warning(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa
_warnings.append(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa
self.warnings.append(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa
else:
raise RuntimeError(f"{e}: Unable to convert value {value} to int for {key}") # noqa
elif expected_type == "float" and not isinstance(value, float):
Expand All @@ -485,7 +460,7 @@ def set_element(self, key: str, value: object) -> None:
if NULLIFY_INVALID:
value = None
LOGGER.warning(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa
_warnings.append(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa
self.warnings.append(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa
else:
raise RuntimeError(f"{e}: Unable to convert value {value} to float for {key}") # noqa
else:
Expand All @@ -500,7 +475,6 @@ def get_element(self, key: str) -> Any:
:returns: value of the element
"""
global _warnings
result = None
try:
# check if we want value or an attribute (indicated by ->)
Expand All @@ -514,7 +488,7 @@ def get_element(self, key: str) -> Any:
if NULLIFY_INVALID:
result = None
LOGGER.warning(f"Error {e} whilst fetching {key} from data, None returned") # noqa
_warnings.append(f"Error {e} whilst fetching {key} from data, None returned") # noqa
self.warnings.append(f"Error {e} whilst fetching {key} from data, None returned") # noqa
else:
msg = f"Error {e} whilst fetching {key} from data."
raise RuntimeError(msg)
Expand All @@ -530,7 +504,6 @@ def as_bufr(self, use_cached: bool = False) -> bytes:
:returns: bytes containing BUFR data
"""
global _warnings
if use_cached and (self.bufr is not None):
return self.bufr
# ===========================
Expand Down Expand Up @@ -570,7 +543,7 @@ def as_bufr(self, use_cached: bool = False) -> bytes:
except CodesInternalError as e:
msg = f"Error ({e}) calling codes_set({bufr_msg}, 'pack', True). Null message returned" # noqa
LOGGER.warning(f"{msg}") # noqa
_warnings.append(f"{msg}") # noqa
self.warnings.append(f"{msg}") # noqa
codes_release(bufr_msg)
return self.bufr
except Exception as e:
Expand Down Expand Up @@ -609,7 +582,6 @@ def md5(self) -> str:
:returns: md5 of BUFR message
"""
global _warnings
return self._hash

def parse(self, data: dict, mappings: dict) -> None:
Expand All @@ -628,7 +600,6 @@ def parse(self, data: dict, mappings: dict) -> None:
# ==================================================
# Parse the data.
# ==================================================
global _warnings
for section in ("header", "data"):
for element in mappings[section]:
# get eccodes key
Expand Down Expand Up @@ -664,7 +635,7 @@ def parse(self, data: dict, mappings: dict) -> None:
except Exception as e:
if NULLIFY_INVALID:
LOGGER.warning(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa
_warnings.append(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa
self.warnings.append(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa
value = None
else:
# LOGGER.error(f"Error raised whilst validating {element['eccodes_key']}, raising error") # noqa
Expand Down Expand Up @@ -704,7 +675,6 @@ def get_datetime(self) -> datetime:
:returns: `datetime.datetime` of ISO8601 representation of the
characteristic date/time
"""
global _warnings
if None in [
self.get_element("typicalYear"),
self.get_element("typicalMonth"),
Expand Down Expand Up @@ -763,11 +733,14 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
:returns: iterator
"""
global _warnings
global _warnings_global
job_id = f"t-{threading.get_ident()}" # job ID based on thread
_warnings_global[job_id] = []
# ======================
# validate mapping files
# ======================
e = validate_mapping(mappings)
e = c2bt.validate_template(mappings)

if e is not SUCCESS:
raise ValueError("Invalid mappings")

Expand Down Expand Up @@ -826,7 +799,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
if _delimiter not in [",", ";", "|", "\t"]:
msg = "Invalid delimiter specified in mapping template, reverting to comma ','" # noqa
LOGGER.warning(msg)
_warnings.append(msg)
_warnings_global[job_id].append(msg)
_delimiter = ","
else:
_delimiter = DELIMITER
Expand Down Expand Up @@ -894,7 +867,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
if NULLIFY_INVALID:
msg = f"csv read error, non ASCII data detected ({val}), skipping row" # noqa
LOGGER.warning(msg) # noqa
_warnings.append(msg)
_warnings_global[job_id].append(msg)
LOGGER.debug(row)
continue
else:
Expand Down Expand Up @@ -942,7 +915,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
"code": PASSED,
"message": "",
"errors": [],
"warnings": _warnings
"warnings": message.warnings + _warnings_global[job_id]
}
cksum = message.md5()
# now identifier based on WSI and observation date as identifier
Expand Down Expand Up @@ -977,7 +950,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
"code": FAILED,
"message": "Error encoding row, BUFR set to None",
"errors": [f"{msg}\n\t\tData: {data_dict}"],
"warnings": _warnings
"warnings": message.warnings + _warnings_global[job_id]
}
result["_meta"] = {
"id": None,
Expand All @@ -1000,6 +973,6 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
# now yield result back to caller
yield result
# clear warnings
_warnings = []
del _warnings_global[job_id]

fh.close()
7 changes: 4 additions & 3 deletions csv2bufr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ def mappings():
@click.command('list')
@click.pass_context
def list_mappings(ctx):
for mapping in os.listdir(MAPPINGS):
msg = f"{mapping} => {MAPPINGS}{os.sep}{mapping}"
click.echo(msg)
templates = c2bt.list_templates()
click.echo(json.dumps(templates))
for tmpl in templates.items():
click.echo(json.dumps(tmpl, indent=4))


@click.command('create')
Expand Down
Loading

0 comments on commit 8be9605

Please sign in to comment.