Skip to content

Commit

Permalink
Modify consumer to output paths to stored files from run command (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc authored Nov 28, 2023
1 parent 21a1663 commit 87b24b4
Show file tree
Hide file tree
Showing 14 changed files with 161 additions and 100 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
branches: []
paths-ignore:
- 'README.md'
workflow_dispatch:

# Specify concurrency such that only one workflow can run at a time
# * Different workflow files are not affected
Expand Down Expand Up @@ -112,7 +113,9 @@ jobs:
runs-on: ubuntu-latest
container: quay.io/condaforge/miniforge3:latest
needs: build-venv
if: contains(github.ref, 'refs/tags/v') && github.event_name == 'push'
if: |
github.event_name == 'workflow_dispatch' ||
(contains(github.ref, 'refs/tags/v') && github.event_name == 'push')
steps:
- name: Checkout repository
Expand Down
34 changes: 23 additions & 11 deletions src/nwp_consumer/cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import structlog
from docopt import docopt
import pathlib

from nwp_consumer import internal
from nwp_consumer.internal import config, inputs, outputs
Expand All @@ -48,7 +49,7 @@
log = structlog.getLogger()


def run(arguments: dict) -> int:
def run(arguments: dict) -> tuple[list[pathlib.Path], list[pathlib.Path]]:
"""Run the CLI."""
# --- Map arguments to service configuration --- #

Expand Down Expand Up @@ -134,38 +135,44 @@ def run(arguments: dict) -> int:

# Logic for the "check" command
if arguments['check']:
return service.Check()
_ = service.Check()
return ([], [])

# Logic for the env command
if arguments['env']:
# Missing env vars are printed during mapping of source/sink args
return 0
return ([], [])

log.info("nwp-consumer starting", version=__version__, arguments=arguments)
log.info("nwp-consumer service starting", version=__version__, arguments=arguments)

rawFiles: list[pathlib.Path] = []
processedFiles: list[pathlib.Path] = []

if arguments['download']:
service.DownloadRawDataset(
rawFiles += service.DownloadRawDataset(
start=startDate,
end=endDate
)

if arguments['convert']:
service.ConvertRawDatasetToZarr(
processedFiles += service.ConvertRawDatasetToZarr(
start=startDate,
end=endDate
)

if arguments['consume']:
service.Check()
service.DownloadAndConvert(
r, p = service.DownloadAndConvert(
start=startDate,
end=endDate
)
rawFiles += r
processedFiles += p

if arguments['--create-latest']:
service.CreateLatestZarr()
processedFiles += service.CreateLatestZarr()

return 0
return rawFiles, processedFiles


def main() -> None:
Expand All @@ -176,7 +183,12 @@ def main() -> None:

programStartTime = dt.datetime.now()
try:
run(arguments=arguments)
files: tuple[list[pathlib.Path], list[pathlib.Path]] = run(arguments=arguments)
log.info(
event="processed files",
raw_files=len(files[0]),
processed_files=len(files[1]),
)
except Exception as e:
log.error("encountered error running nwp-consumer", error=str(e), exc_info=True)
erred = True
Expand All @@ -189,7 +201,7 @@ def main() -> None:
p.unlink(missing_ok=True)
elapsedTime = dt.datetime.now() - programStartTime
log.info(
"nwp-consumer finished",
event="nwp-consumer finished",
elapsed_time=str(elapsedTime),
version=__version__
)
Expand Down
9 changes: 5 additions & 4 deletions src/nwp_consumer/internal/inputs/icon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, model: str) -> None:
match model:
case "europe": self.baseurl += "/icon-eu/grib"
case "global": self.baseurl += "/icon/grib"
case _: raise ValueError(f"unknown icon model {model}. Valid models are 'eu' and 'global'")
case _: raise ValueError(f"unknown icon model {model}. Valid models are 'europe' and 'global'")

def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoModel]: # noqa: D102

Expand All @@ -69,13 +69,14 @@ def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoM
# Files are split per parameter, level, and step, with a webpage per parameter
for param, _ in PARAMETER_RENAME_MAP.items():

if self.model == "europe" and (param == "clat" or param == "clon"):
# The latitude and longitude files are not available for the EU model
continue

# Fetch DWD webpage detailing the available files for the parameter
response = requests.get(f"{self.baseurl}/{it.strftime('%H')}/{param}/", timeout=3)

if response.status_code != 200:
if self.model == "eu" and param == "clat":
# The latitude and longitude files are not available for the EU model
continue
log.warn(
event="error fetching filelisting webpage for parameter",
status=response.status_code,
Expand Down
2 changes: 1 addition & 1 deletion src/nwp_consumer/internal/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def exists(self, *, dst: pathlib.Path) -> bool:
pass

@abc.abstractmethod
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int:
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path:
"""Move the given temp file to the store at path p.
:param src: Path to temp file to move
Expand Down
19 changes: 17 additions & 2 deletions src/nwp_consumer/internal/outputs/huggingface/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,28 @@ def __init__(self, repoID: str, token: str | None = None, endpoint: str | None
def exists(self, *, dst: pathlib.Path) -> bool: # noqa: D102
return self.__fs.exists(self.datasetPath / dst.as_posix())

def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int: # noqa: D102
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: # noqa: D102
self.__fs.put(
lpath=src.as_posix(),
rpath=(self.datasetPath / dst).as_posix(),
recursive=True
)
return self.__fs.du(path=(self.datasetPath / dst).as_posix())
nbytes = self.__fs.du(path=(self.datasetPath / dst).as_posix())
if nbytes != src.stat().st_size:
log.warn(
event="stored file size does not match source file size",
src=src.as_posix(),
dst=dst.as_posix(),
srcsize=src.stat().st_size,
dstsize=nbytes
)
else:
log.debug(
event="stored file",
filepath=dst.as_posix(),
nbytes=nbytes
)
return dst

def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: # noqa: D102
allDirs = [
Expand Down
4 changes: 2 additions & 2 deletions src/nwp_consumer/internal/outputs/huggingface/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def test_store(self):
src = internal.TMP_DIR / f'nwpc-{uuid.uuid4()}'
src.write_bytes(bytes(filename, 'utf-8'))

n = self.client.store(src=src, dst=dst)
self.assertEqual(n, 30)
out = self.client.store(src=src, dst=dst)
self.assertEqual(out, dst)
self.assertTrue(self.mock_fs.put.called_with(src, dst))
self.assertTrue(self.mock_fs.du.called_with(dst))

Expand Down
29 changes: 19 additions & 10 deletions src/nwp_consumer/internal/outputs/localfs/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,34 @@
class Client(internal.StorageInterface):
"""Client for local filesystem."""

def exists(self, *, dst: pathlib.Path) -> bool:
def exists(self, *, dst: pathlib.Path) -> bool: # noqa: D102
return dst.exists()

def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int: # noqa: D102
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: # noqa: D102
if src == dst:
return os.stat(src).st_size
return dst

dst.parent.mkdir(parents=True, exist_ok=True)
shutil.move(src=src, dst=dst)
# Do delete temp file here to avoid local duplication of file.
src.unlink(missing_ok=True)
nbytes = os.stat(dst).st_size
log.debug(
event="stored file locally",
src=src.as_posix(),
dst=dst.as_posix(),
nbytes=nbytes
)
return nbytes
if nbytes != dst.stat().st_size:
log.warn(
event="file size mismatch",
src=src.as_posix(),
dst=dst.as_posix(),
srcbytes=src.stat().st_size,
dstbytes=nbytes
)
else:
log.debug(
event="stored file locally",
src=src.as_posix(),
dst=dst.as_posix(),
nbytes=nbytes
)
return dst

def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: # noqa: D102
# List all the inittime folders in the given directory
Expand Down
4 changes: 2 additions & 2 deletions src/nwp_consumer/internal/outputs/localfs/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ def test_store(self):
src.write_bytes(bytes("test_file_contents", 'utf-8'))

# Store the file using the function
size = self.testClient.store(src=src, dst=dst)
out = self.testClient.store(src=src, dst=dst)

# Assert that the file exists
self.assertTrue(dst.exists())
# Assert that the file has the correct size
self.assertEqual(size, 18)
self.assertEqual(out, dst)
# Assert that the temporary file has been deleted
self.assertFalse(src.exists())

Expand Down
35 changes: 22 additions & 13 deletions src/nwp_consumer/internal/outputs/s3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ class Client(internal.StorageInterface):
__fs: s3fs.S3FileSystem

def __init__(self, key: str, secret: str, bucket: str, region: str,
endpointURL: str = None) -> None:
endpointURL: str | None = None) -> None:
"""Create a new S3Client."""
(key, secret) = (None, None) if (key, secret) == ("", "") else (key, secret)
if key is None and secret is None:
(_key, _secret) = (None, None) if (key, secret) == ("", "") else (key, secret)
if _key is None and _secret is None:
log.info(
event="attempting AWS connection using default credentials",
)
Expand All @@ -39,10 +39,10 @@ def __init__(self, key: str, secret: str, bucket: str, region: str,

self.__bucket = pathlib.Path(bucket)

def exists(self, *, dst: pathlib.Path) -> bool:
def exists(self, *, dst: pathlib.Path) -> bool: # noqa: D102
return self.__fs.exists((self.__bucket / dst).as_posix())

def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int:
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: # noqa: D102
log.debug(
event="storing file in s3",
src=src.as_posix(),
Expand All @@ -52,15 +52,24 @@ def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int:
# Don't delete temp file as user may want to do further processing locally.
# All temp files are deleted at the end of the program.
nbytes = self.__fs.du((self.__bucket / dst).as_posix())
log.debug(
event="stored file in s3",
src=src.as_posix(),
dst=(self.__bucket / dst).as_posix(),
nbytes=nbytes
)
return nbytes
if nbytes != src.stat().st_size:
log.warn(
event="file size mismatch",
src=src.as_posix(),
dst=(self.__bucket / dst).as_posix(),
srcsize=src.stat().st_size,
dstsize=nbytes
)
else:
log.debug(
event="stored file in s3",
src=src.as_posix(),
dst=(self.__bucket / dst).as_posix(),
nbytes=nbytes
)
return dst

def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]:
def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: # noqa: D102
allDirs = [
pathlib.Path(d).relative_to(self.__bucket / prefix)
for d in self.__fs.glob(f'{self.__bucket}/{prefix}/{internal.IT_FOLDER_GLOBSTR}')
Expand Down
4 changes: 2 additions & 2 deletions src/nwp_consumer/internal/outputs/s3/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_store(self):
# Write the data to the temporary file
src.write_bytes(bytes(fileName, 'utf-8'))

n = self.client.store(src=src, dst=dst)
name = self.client.store(src=src, dst=dst)

# Verify the written file in the raw directory
response = self.testS3.get_object(
Expand All @@ -117,7 +117,7 @@ def test_store(self):
self.assertEqual(response["Body"].read(), bytes(fileName, 'utf-8'))

# Verify the correct number of bytes was written
self.assertEqual(n, len(bytes(fileName, 'utf-8')))
self.assertEqual(name, dst)

# Delete the created file and the temp file
self.testS3.delete_object(
Expand Down
Loading

0 comments on commit 87b24b4

Please sign in to comment.