Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify consumer to output paths to stored files from run command #66

Merged
merged 7 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
branches: []
paths-ignore:
- 'README.md'
workflow_dispatch:

# Specify concurrency such that only one workflow can run at a time
# * Different workflow files are not affected
Expand Down Expand Up @@ -112,7 +113,9 @@ jobs:
runs-on: ubuntu-latest
container: quay.io/condaforge/miniforge3:latest
needs: build-venv
if: contains(github.ref, 'refs/tags/v') && github.event_name == 'push'
if: |
github.event_name == 'workflow_dispatch' ||
(contains(github.ref, 'refs/tags/v') && github.event_name == 'push')

steps:
- name: Checkout repository
Expand Down
34 changes: 23 additions & 11 deletions src/nwp_consumer/cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import structlog
from docopt import docopt
import pathlib

from nwp_consumer import internal
from nwp_consumer.internal import config, inputs, outputs
Expand All @@ -48,7 +49,7 @@
log = structlog.getLogger()


def run(arguments: dict) -> int:
def run(arguments: dict) -> tuple[list[pathlib.Path], list[pathlib.Path]]:
"""Run the CLI."""
# --- Map arguments to service configuration --- #

Expand Down Expand Up @@ -134,38 +135,44 @@ def run(arguments: dict) -> int:

# Logic for the "check" command
if arguments['check']:
return service.Check()
_ = service.Check()
return ([], [])

# Logic for the env command
if arguments['env']:
# Missing env vars are printed during mapping of source/sink args
return 0
return ([], [])

log.info("nwp-consumer starting", version=__version__, arguments=arguments)
log.info("nwp-consumer service starting", version=__version__, arguments=arguments)

rawFiles: list[pathlib.Path] = []
processedFiles: list[pathlib.Path] = []

if arguments['download']:
service.DownloadRawDataset(
rawFiles += service.DownloadRawDataset(
start=startDate,
end=endDate
)

if arguments['convert']:
service.ConvertRawDatasetToZarr(
processedFiles += service.ConvertRawDatasetToZarr(
start=startDate,
end=endDate
)

if arguments['consume']:
service.Check()
service.DownloadAndConvert(
r, p = service.DownloadAndConvert(
start=startDate,
end=endDate
)
rawFiles += r
processedFiles += p

if arguments['--create-latest']:
service.CreateLatestZarr()
processedFiles += service.CreateLatestZarr()

return 0
return rawFiles, processedFiles


def main() -> None:
Expand All @@ -176,7 +183,12 @@ def main() -> None:

programStartTime = dt.datetime.now()
try:
run(arguments=arguments)
files: tuple[list[pathlib.Path], list[pathlib.Path]] = run(arguments=arguments)
log.info(
event="processed files",
raw_files=len(files[0]),
processed_files=len(files[1]),
)
except Exception as e:
log.error("encountered error running nwp-consumer", error=str(e), exc_info=True)
erred = True
Expand All @@ -189,7 +201,7 @@ def main() -> None:
p.unlink(missing_ok=True)
elapsedTime = dt.datetime.now() - programStartTime
log.info(
"nwp-consumer finished",
event="nwp-consumer finished",
elapsed_time=str(elapsedTime),
version=__version__
)
Expand Down
9 changes: 5 additions & 4 deletions src/nwp_consumer/internal/inputs/icon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, model: str) -> None:
match model:
case "europe": self.baseurl += "/icon-eu/grib"
case "global": self.baseurl += "/icon/grib"
case _: raise ValueError(f"unknown icon model {model}. Valid models are 'eu' and 'global'")
case _: raise ValueError(f"unknown icon model {model}. Valid models are 'europe' and 'global'")

def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoModel]: # noqa: D102

Expand All @@ -69,13 +69,14 @@ def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoM
# Files are split per parameter, level, and step, with a webpage per parameter
for param, _ in PARAMETER_RENAME_MAP.items():

if self.model == "europe" and (param == "clat" or param == "clon"):
# The latitude and longitude files are not available for the EU model
continue

# Fetch DWD webpage detailing the available files for the parameter
response = requests.get(f"{self.baseurl}/{it.strftime('%H')}/{param}/", timeout=3)

if response.status_code != 200:
if self.model == "eu" and param == "clat":
# The latitude and longitude files are not available for the EU model
continue
log.warn(
event="error fetching filelisting webpage for parameter",
status=response.status_code,
Expand Down
2 changes: 1 addition & 1 deletion src/nwp_consumer/internal/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def exists(self, *, dst: pathlib.Path) -> bool:
pass

@abc.abstractmethod
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int:
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path:
"""Move the given temp file to the store at path p.

:param src: Path to temp file to move
Expand Down
19 changes: 17 additions & 2 deletions src/nwp_consumer/internal/outputs/huggingface/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,28 @@ def __init__(self, repoID: str, token: str | None = None, endpoint: str | None
def exists(self, *, dst: pathlib.Path) -> bool: # noqa: D102
return self.__fs.exists(self.datasetPath / dst.as_posix())

def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int: # noqa: D102
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: # noqa: D102
self.__fs.put(
lpath=src.as_posix(),
rpath=(self.datasetPath / dst).as_posix(),
recursive=True
)
return self.__fs.du(path=(self.datasetPath / dst).as_posix())
nbytes = self.__fs.du(path=(self.datasetPath / dst).as_posix())
if nbytes != src.stat().st_size:
log.warn(
event="stored file size does not match source file size",
src=src.as_posix(),
dst=dst.as_posix(),
srcsize=src.stat().st_size,
dstsize=nbytes
)
else:
log.debug(
event="stored file",
filepath=dst.as_posix(),
nbytes=nbytes
)
return dst

def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: # noqa: D102
allDirs = [
Expand Down
4 changes: 2 additions & 2 deletions src/nwp_consumer/internal/outputs/huggingface/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def test_store(self):
src = internal.TMP_DIR / f'nwpc-{uuid.uuid4()}'
src.write_bytes(bytes(filename, 'utf-8'))

n = self.client.store(src=src, dst=dst)
self.assertEqual(n, 30)
out = self.client.store(src=src, dst=dst)
self.assertEqual(out, dst)
self.assertTrue(self.mock_fs.put.called_with(src, dst))
self.assertTrue(self.mock_fs.du.called_with(dst))

Expand Down
29 changes: 19 additions & 10 deletions src/nwp_consumer/internal/outputs/localfs/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,34 @@
class Client(internal.StorageInterface):
"""Client for local filesystem."""

def exists(self, *, dst: pathlib.Path) -> bool:
def exists(self, *, dst: pathlib.Path) -> bool: # noqa: D102
return dst.exists()

def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int: # noqa: D102
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: # noqa: D102
if src == dst:
return os.stat(src).st_size
return dst

dst.parent.mkdir(parents=True, exist_ok=True)
shutil.move(src=src, dst=dst)
# Do delete temp file here to avoid local duplication of file.
src.unlink(missing_ok=True)
nbytes = os.stat(dst).st_size
log.debug(
event="stored file locally",
src=src.as_posix(),
dst=dst.as_posix(),
nbytes=nbytes
)
return nbytes
if nbytes != dst.stat().st_size:
log.warn(
event="file size mismatch",
src=src.as_posix(),
dst=dst.as_posix(),
srcbytes=src.stat().st_size,
dstbytes=nbytes
)
else:
log.debug(
event="stored file locally",
src=src.as_posix(),
dst=dst.as_posix(),
nbytes=nbytes
)
return dst

def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: # noqa: D102
# List all the inittime folders in the given directory
Expand Down
4 changes: 2 additions & 2 deletions src/nwp_consumer/internal/outputs/localfs/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ def test_store(self):
src.write_bytes(bytes("test_file_contents", 'utf-8'))

# Store the file using the function
size = self.testClient.store(src=src, dst=dst)
out = self.testClient.store(src=src, dst=dst)

# Assert that the file exists
self.assertTrue(dst.exists())
# Assert that the file has the correct size
self.assertEqual(size, 18)
self.assertEqual(out, dst)
# Assert that the temporary file has been deleted
self.assertFalse(src.exists())

Expand Down
35 changes: 22 additions & 13 deletions src/nwp_consumer/internal/outputs/s3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ class Client(internal.StorageInterface):
__fs: s3fs.S3FileSystem

def __init__(self, key: str, secret: str, bucket: str, region: str,
endpointURL: str = None) -> None:
endpointURL: str | None = None) -> None:
"""Create a new S3Client."""
(key, secret) = (None, None) if (key, secret) == ("", "") else (key, secret)
if key is None and secret is None:
(_key, _secret) = (None, None) if (key, secret) == ("", "") else (key, secret)
if _key is None and _secret is None:
log.info(
event="attempting AWS connection using default credentials",
)
Expand All @@ -39,10 +39,10 @@ def __init__(self, key: str, secret: str, bucket: str, region: str,

self.__bucket = pathlib.Path(bucket)

def exists(self, *, dst: pathlib.Path) -> bool:
def exists(self, *, dst: pathlib.Path) -> bool: # noqa: D102
return self.__fs.exists((self.__bucket / dst).as_posix())

def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int:
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: # noqa: D102
log.debug(
event="storing file in s3",
src=src.as_posix(),
Expand All @@ -52,15 +52,24 @@ def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int:
# Don't delete temp file as user may want to do further processing locally.
# All temp files are deleted at the end of the program.
nbytes = self.__fs.du((self.__bucket / dst).as_posix())
log.debug(
event="stored file in s3",
src=src.as_posix(),
dst=(self.__bucket / dst).as_posix(),
nbytes=nbytes
)
return nbytes
if nbytes != src.stat().st_size:
log.warn(
event="file size mismatch",
src=src.as_posix(),
dst=(self.__bucket / dst).as_posix(),
srcsize=src.stat().st_size,
dstsize=nbytes
)
else:
log.debug(
event="stored file in s3",
src=src.as_posix(),
dst=(self.__bucket / dst).as_posix(),
nbytes=nbytes
)
return dst

def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]:
def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: # noqa: D102
allDirs = [
pathlib.Path(d).relative_to(self.__bucket / prefix)
for d in self.__fs.glob(f'{self.__bucket}/{prefix}/{internal.IT_FOLDER_GLOBSTR}')
Expand Down
4 changes: 2 additions & 2 deletions src/nwp_consumer/internal/outputs/s3/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_store(self):
# Write the data to the temporary file
src.write_bytes(bytes(fileName, 'utf-8'))

n = self.client.store(src=src, dst=dst)
name = self.client.store(src=src, dst=dst)

# Verify the written file in the raw directory
response = self.testS3.get_object(
Expand All @@ -117,7 +117,7 @@ def test_store(self):
self.assertEqual(response["Body"].read(), bytes(fileName, 'utf-8'))

# Verify the correct number of bytes was written
self.assertEqual(n, len(bytes(fileName, 'utf-8')))
self.assertEqual(name, dst)

# Delete the created file and the temp file
self.testS3.delete_object(
Expand Down
Loading