diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9e755cb6..8ef0c139 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,6 +11,7 @@ on: branches: [] paths-ignore: - 'README.md' + workflow_dispatch: # Specify concurrency such that only one workflow can run at a time # * Different workflow files are not affected @@ -112,7 +113,9 @@ jobs: runs-on: ubuntu-latest container: quay.io/condaforge/miniforge3:latest needs: build-venv - if: contains(github.ref, 'refs/tags/v') && github.event_name == 'push' + if: | + github.event_name == 'workflow_dispatch' || + (contains(github.ref, 'refs/tags/v') && github.event_name == 'push') steps: - name: Checkout repository diff --git a/src/nwp_consumer/cmd/main.py b/src/nwp_consumer/cmd/main.py index 37e86848..e9420e98 100644 --- a/src/nwp_consumer/cmd/main.py +++ b/src/nwp_consumer/cmd/main.py @@ -35,6 +35,7 @@ import structlog from docopt import docopt +import pathlib from nwp_consumer import internal from nwp_consumer.internal import config, inputs, outputs @@ -48,7 +49,7 @@ log = structlog.getLogger() -def run(arguments: dict) -> int: +def run(arguments: dict) -> tuple[list[pathlib.Path], list[pathlib.Path]]: """Run the CLI.""" # --- Map arguments to service configuration --- # @@ -134,38 +135,44 @@ def run(arguments: dict) -> int: # Logic for the "check" command if arguments['check']: - return service.Check() + _ = service.Check() + return ([], []) # Logic for the env command if arguments['env']: # Missing env vars are printed during mapping of source/sink args - return 0 + return ([], []) - log.info("nwp-consumer starting", version=__version__, arguments=arguments) + log.info("nwp-consumer service starting", version=__version__, arguments=arguments) + + rawFiles: list[pathlib.Path] = [] + processedFiles: list[pathlib.Path] = [] if arguments['download']: - service.DownloadRawDataset( + rawFiles += service.DownloadRawDataset( start=startDate, end=endDate ) if arguments['convert']: - service.ConvertRawDatasetToZarr( + processedFiles += service.ConvertRawDatasetToZarr( start=startDate, end=endDate ) if arguments['consume']: service.Check() - service.DownloadAndConvert( + r, p = service.DownloadAndConvert( start=startDate, end=endDate ) + rawFiles += r + processedFiles += p if arguments['--create-latest']: - service.CreateLatestZarr() + processedFiles += service.CreateLatestZarr() - return 0 + return rawFiles, processedFiles def main() -> None: @@ -176,7 +183,12 @@ def main() -> None: programStartTime = dt.datetime.now() try: - run(arguments=arguments) + files: tuple[list[pathlib.Path], list[pathlib.Path]] = run(arguments=arguments) + log.info( + event="processed files", + raw_files=len(files[0]), + processed_files=len(files[1]), + ) except Exception as e: log.error("encountered error running nwp-consumer", error=str(e), exc_info=True) erred = True @@ -189,7 +201,7 @@ def main() -> None: p.unlink(missing_ok=True) elapsedTime = dt.datetime.now() - programStartTime log.info( - "nwp-consumer finished", + event="nwp-consumer finished", elapsed_time=str(elapsedTime), version=__version__ ) diff --git a/src/nwp_consumer/internal/inputs/icon/client.py b/src/nwp_consumer/internal/inputs/icon/client.py index 174192ba..4d5034cd 100644 --- a/src/nwp_consumer/internal/inputs/icon/client.py +++ b/src/nwp_consumer/internal/inputs/icon/client.py @@ -50,7 +50,7 @@ def __init__(self, model: str) -> None: match model: case "europe": self.baseurl += "/icon-eu/grib" case "global": self.baseurl += "/icon/grib" - case _: raise ValueError(f"unknown icon model {model}. Valid models are 'eu' and 'global'") + case _: raise ValueError(f"unknown icon model {model}. Valid models are 'europe' and 'global'") def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoModel]: # noqa: D102 @@ -69,13 +69,14 @@ def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoM # Files are split per parameter, level, and step, with a webpage per parameter for param, _ in PARAMETER_RENAME_MAP.items(): + if self.model == "europe" and (param == "clat" or param == "clon"): + # The latitude and longitude files are not available for the EU model + continue + # Fetch DWD webpage detailing the available files for the parameter response = requests.get(f"{self.baseurl}/{it.strftime('%H')}/{param}/", timeout=3) if response.status_code != 200: - if self.model == "eu" and param == "clat": - # The latitude and longitude files are not available for the EU model - continue log.warn( event="error fetching filelisting webpage for parameter", status=response.status_code, diff --git a/src/nwp_consumer/internal/models.py b/src/nwp_consumer/internal/models.py index b56cc818..5353604a 100644 --- a/src/nwp_consumer/internal/models.py +++ b/src/nwp_consumer/internal/models.py @@ -125,7 +125,7 @@ def exists(self, *, dst: pathlib.Path) -> bool: pass @abc.abstractmethod - def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int: + def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: """Move the given temp file to the store at path p. :param src: Path to temp file to move diff --git a/src/nwp_consumer/internal/outputs/huggingface/client.py b/src/nwp_consumer/internal/outputs/huggingface/client.py index 5d0bf33b..3dff3221 100644 --- a/src/nwp_consumer/internal/outputs/huggingface/client.py +++ b/src/nwp_consumer/internal/outputs/huggingface/client.py @@ -38,13 +38,28 @@ def __init__(self, repoID: str, token: str | None = None, endpoint: str | None def exists(self, *, dst: pathlib.Path) -> bool: # noqa: D102 return self.__fs.exists(self.datasetPath / dst.as_posix()) - def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int: # noqa: D102 + def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: # noqa: D102 self.__fs.put( lpath=src.as_posix(), rpath=(self.datasetPath / dst).as_posix(), recursive=True ) - return self.__fs.du(path=(self.datasetPath / dst).as_posix()) + nbytes = self.__fs.du(path=(self.datasetPath / dst).as_posix()) + if nbytes != src.stat().st_size: + log.warn( + event="stored file size does not match source file size", + src=src.as_posix(), + dst=dst.as_posix(), + srcsize=src.stat().st_size, + dstsize=nbytes + ) + else: + log.debug( + event="stored file", + filepath=dst.as_posix(), + nbytes=nbytes + ) + return dst def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: # noqa: D102 allDirs = [ diff --git a/src/nwp_consumer/internal/outputs/huggingface/test_client.py b/src/nwp_consumer/internal/outputs/huggingface/test_client.py index 54e5b3f8..3a9ce1d6 100644 --- a/src/nwp_consumer/internal/outputs/huggingface/test_client.py +++ b/src/nwp_consumer/internal/outputs/huggingface/test_client.py @@ -44,8 +44,8 @@ def test_store(self): src = internal.TMP_DIR / f'nwpc-{uuid.uuid4()}' src.write_bytes(bytes(filename, 'utf-8')) - n = self.client.store(src=src, dst=dst) - self.assertEqual(n, 30) + out = self.client.store(src=src, dst=dst) + self.assertEqual(out, dst) self.assertTrue(self.mock_fs.put.called_with(src, dst)) self.assertTrue(self.mock_fs.du.called_with(dst)) diff --git a/src/nwp_consumer/internal/outputs/localfs/client.py b/src/nwp_consumer/internal/outputs/localfs/client.py index f919f846..4520971a 100644 --- a/src/nwp_consumer/internal/outputs/localfs/client.py +++ b/src/nwp_consumer/internal/outputs/localfs/client.py @@ -13,25 +13,34 @@ class Client(internal.StorageInterface): """Client for local filesystem.""" - def exists(self, *, dst: pathlib.Path) -> bool: + def exists(self, *, dst: pathlib.Path) -> bool: # noqa: D102 return dst.exists() - def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int: # noqa: D102 + def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: # noqa: D102 if src == dst: - return os.stat(src).st_size + return dst dst.parent.mkdir(parents=True, exist_ok=True) shutil.move(src=src, dst=dst) # Do delete temp file here to avoid local duplication of file. src.unlink(missing_ok=True) nbytes = os.stat(dst).st_size - log.debug( - event="stored file locally", - src=src.as_posix(), - dst=dst.as_posix(), - nbytes=nbytes - ) - return nbytes + if nbytes != dst.stat().st_size: + log.warn( + event="file size mismatch", + src=src.as_posix(), + dst=dst.as_posix(), + srcbytes=src.stat().st_size, + dstbytes=nbytes + ) + else: + log.debug( + event="stored file locally", + src=src.as_posix(), + dst=dst.as_posix(), + nbytes=nbytes + ) + return dst def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: # noqa: D102 # List all the inittime folders in the given directory diff --git a/src/nwp_consumer/internal/outputs/localfs/test_client.py b/src/nwp_consumer/internal/outputs/localfs/test_client.py index 83609571..08b4a3df 100644 --- a/src/nwp_consumer/internal/outputs/localfs/test_client.py +++ b/src/nwp_consumer/internal/outputs/localfs/test_client.py @@ -86,12 +86,12 @@ def test_store(self): src.write_bytes(bytes("test_file_contents", 'utf-8')) # Store the file using the function - size = self.testClient.store(src=src, dst=dst) + out = self.testClient.store(src=src, dst=dst) # Assert that the file exists self.assertTrue(dst.exists()) # Assert that the file has the correct size - self.assertEqual(size, 18) + self.assertEqual(out, dst) # Assert that the temporary file has been deleted self.assertFalse(src.exists()) diff --git a/src/nwp_consumer/internal/outputs/s3/client.py b/src/nwp_consumer/internal/outputs/s3/client.py index ce98a41f..5f2da566 100644 --- a/src/nwp_consumer/internal/outputs/s3/client.py +++ b/src/nwp_consumer/internal/outputs/s3/client.py @@ -19,10 +19,10 @@ class Client(internal.StorageInterface): __fs: s3fs.S3FileSystem def __init__(self, key: str, secret: str, bucket: str, region: str, - endpointURL: str = None) -> None: + endpointURL: str | None = None) -> None: """Create a new S3Client.""" - (key, secret) = (None, None) if (key, secret) == ("", "") else (key, secret) - if key is None and secret is None: + (_key, _secret) = (None, None) if (key, secret) == ("", "") else (key, secret) + if _key is None and _secret is None: log.info( event="attempting AWS connection using default credentials", ) @@ -39,10 +39,10 @@ def __init__(self, key: str, secret: str, bucket: str, region: str, self.__bucket = pathlib.Path(bucket) - def exists(self, *, dst: pathlib.Path) -> bool: + def exists(self, *, dst: pathlib.Path) -> bool: # noqa: D102 return self.__fs.exists((self.__bucket / dst).as_posix()) - def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int: + def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: # noqa: D102 log.debug( event="storing file in s3", src=src.as_posix(), @@ -52,15 +52,24 @@ def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int: # Don't delete temp file as user may want to do further processing locally. # All temp files are deleted at the end of the program. nbytes = self.__fs.du((self.__bucket / dst).as_posix()) - log.debug( - event="stored file in s3", - src=src.as_posix(), - dst=(self.__bucket / dst).as_posix(), - nbytes=nbytes - ) - return nbytes + if nbytes != src.stat().st_size: + log.warn( + event="file size mismatch", + src=src.as_posix(), + dst=(self.__bucket / dst).as_posix(), + srcsize=src.stat().st_size, + dstsize=nbytes + ) + else: + log.debug( + event="stored file in s3", + src=src.as_posix(), + dst=(self.__bucket / dst).as_posix(), + nbytes=nbytes + ) + return dst - def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: + def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: # noqa: D102 allDirs = [ pathlib.Path(d).relative_to(self.__bucket / prefix) for d in self.__fs.glob(f'{self.__bucket}/{prefix}/{internal.IT_FOLDER_GLOBSTR}') diff --git a/src/nwp_consumer/internal/outputs/s3/test_client.py b/src/nwp_consumer/internal/outputs/s3/test_client.py index 6a7bc5ce..a9f6492d 100644 --- a/src/nwp_consumer/internal/outputs/s3/test_client.py +++ b/src/nwp_consumer/internal/outputs/s3/test_client.py @@ -107,7 +107,7 @@ def test_store(self): # Write the data to the temporary file src.write_bytes(bytes(fileName, 'utf-8')) - n = self.client.store(src=src, dst=dst) + name = self.client.store(src=src, dst=dst) # Verify the written file in the raw directory response = self.testS3.get_object( @@ -117,7 +117,7 @@ def test_store(self): self.assertEqual(response["Body"].read(), bytes(fileName, 'utf-8')) # Verify the correct number of bytes was written - self.assertEqual(n, len(bytes(fileName, 'utf-8'))) + self.assertEqual(name, dst) # Delete the created file and the temp file self.testS3.delete_object( diff --git a/src/nwp_consumer/internal/service/service.py b/src/nwp_consumer/internal/service/service.py index 94e20b9f..151cd2d3 100644 --- a/src/nwp_consumer/internal/service/service.py +++ b/src/nwp_consumer/internal/service/service.py @@ -16,6 +16,8 @@ from nwp_consumer import internal log = structlog.getLogger() +# Enable dask to split large chunks +dask.config.set({"array.slicing.split_large_chunks": True}) class NWPConsumerService: @@ -32,7 +34,7 @@ def __init__(self, *, fetcher: internal.FetcherInterface, storer: internal.Stora self.rawdir = pathlib.Path(rawdir) self.zarrdir = pathlib.Path(zarrdir) - def DownloadRawDataset(self, *, start: dt.date, end: dt.date) -> int: + def DownloadRawDataset(self, *, start: dt.date, end: dt.date) -> list[pathlib.Path]: """Fetch raw data for each initTime in the given range. :param start: The start date of the time range to download @@ -75,19 +77,20 @@ def DownloadRawDataset(self, *, start: dt.date, end: dt.date) -> int: numFiles=len(newWantedFileInfos)) # Create a dask pipeline to download the files - nbytes = dask.bag.from_sequence(newWantedFileInfos, npartitions=len(newWantedFileInfos)) \ + storedFiles = dask.bag.from_sequence( + seq=newWantedFileInfos, + npartitions=len(newWantedFileInfos)) \ .map(lambda fi: self.fetcher.downloadToTemp(fi=fi)) \ .filter(lambda infoPathTuple: infoPathTuple[1] != pathlib.Path()) \ .map(lambda infoPathTuple: self.storer.store( src=infoPathTuple[1], dst=self.rawdir/infoPathTuple[0].it().strftime(internal.IT_FOLDER_FMTSTR)/(infoPathTuple[0].filename()) )) \ - .sum() \ .compute() - return nbytes + return storedFiles - def ConvertRawDatasetToZarr(self, *, start: dt.date, end: dt.date) -> int: + def ConvertRawDatasetToZarr(self, *, start: dt.date, end: dt.date) -> list[pathlib.Path]: """Convert raw data for the given time range to Zarr. :param start: The start date of the time range to convert @@ -112,7 +115,7 @@ def ConvertRawDatasetToZarr(self, *, start: dt.date, end: dt.date) -> int: log.info("no new files to convert to zarr", startDate=start.strftime("%Y/%m/%d %H:%M"), endDate=end.strftime("%Y/%m/%d %H:%M")) - return 0 + return [] else: log.info( event=f"converting {len(desiredInitTimes)} init times to zarr.", @@ -123,44 +126,45 @@ def ConvertRawDatasetToZarr(self, *, start: dt.date, end: dt.date) -> int: # * Build a bag from the sequence of init times # * Partition the bag by init time bag = dask.bag.from_sequence(desiredInitTimes, npartitions=len(desiredInitTimes)) - nbytes = bag.map(lambda time: self.storer.copyITFolderToTemp(prefix=self.rawdir, it=time)) \ + storedfiles = bag.map(lambda time: self.storer.copyITFolderToTemp(prefix=self.rawdir, it=time)) \ .filter(lambda temppaths: len(temppaths) != 0) \ .map(lambda temppaths: [self.fetcher.mapTemp(p=p) for p in temppaths]) \ .map(lambda datasets: xr.merge(objects=datasets, combine_attrs="drop_conflicts")) \ .filter(_dataQualityFilter) \ .map(lambda ds: _saveAsTempZipZarr(ds=ds)) \ .map(lambda path: self.storer.store(src=path, dst=self.zarrdir / path.name)) \ - .sum() \ .compute(num_workers=1) # AWS ECS only has 1 CPU which amounts to half a physical core - return nbytes + if not isinstance(storedfiles, list): + storedfiles = [storedfiles] - def DownloadAndConvert(self, *, start: dt.date, end: dt.date) -> tuple[int, int]: + return storedfiles + + def DownloadAndConvert(self, *, start: dt.date, end: dt.date) -> tuple[list[pathlib.Path], list[pathlib.Path]]: """Fetch and save as Zarr a dataset for each initTime in the given time range. :param start: The start date of the time range to download and convert :param end: The end date of the time range to download and convert """ - downloadedBytes = self.DownloadRawDataset( + downloadedFiles = self.DownloadRawDataset( start=start, end=end ) - storedBytes = self.ConvertRawDatasetToZarr( + convertedFiles = self.ConvertRawDatasetToZarr( start=start, end=end ) - return downloadedBytes, storedBytes + return downloadedFiles, convertedFiles - def CreateLatestZarr(self) -> int: + def CreateLatestZarr(self) -> list[pathlib.Path]: """Create a Zarr file for the latest init time.""" - nbytes = 0 # Get the latest init time allInitTimes: list[dt.datetime] = self.storer.listInitTimes(prefix=self.rawdir) if not allInitTimes: log.info(event="no init times found", within=self.rawdir) - return nbytes + return [] latestInitTime = allInitTimes[-1] # Load the latest init time as a dataset @@ -183,24 +187,22 @@ def CreateLatestZarr(self) -> int: # Save as zipped zarr if self.storer.exists(dst=self.zarrdir / 'latest.zarr.zip'): self.storer.delete(p=self.zarrdir / 'latest.zarr.zip') - nbytes1 = datasets.map(lambda ds: _saveAsTempZipZarr(ds=ds)) \ + storedFiles = datasets.map(lambda ds: _saveAsTempZipZarr(ds=ds)) \ .map(lambda path: self.storer.store(src=path, dst=self.zarrdir / 'latest.zarr.zip')) \ - .sum() \ .compute() # Save as regular zarr if self.storer.exists(dst=self.zarrdir / 'latest.zarr'): self.storer.delete(p=self.zarrdir / 'latest.zarr') - _ = datasets.map(lambda ds: _saveAsTempRegularZarr(ds=ds)) \ + storedFiles += datasets.map(lambda ds: _saveAsTempRegularZarr(ds=ds)) \ .map(lambda path: self.storer.store(src=path, dst=self.zarrdir / 'latest.zarr')) \ - .sum() \ .compute() # Delete the temporary files for f in tempPaths: f.unlink(missing_ok=True) - return nbytes1 + return storedFiles def Check(self) -> int: """Perform a healthcheck on the service.""" diff --git a/src/nwp_consumer/internal/service/test_service.py b/src/nwp_consumer/internal/service/test_service.py index 69cbbf93..a339c695 100644 --- a/src/nwp_consumer/internal/service/test_service.py +++ b/src/nwp_consumer/internal/service/test_service.py @@ -26,12 +26,13 @@ def exists(self, *, dst: pathlib.Path) -> bool: return True return False - def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int: - if src.is_dir(): - shutil.rmtree(src.as_posix(), ignore_errors=True) - else: - src.unlink(missing_ok=True) - return len(dst.name) + def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: + if src.exists(): + if src.is_dir(): + shutil.rmtree(src.as_posix(), ignore_errors=True) + else: + src.unlink(missing_ok=True) + return dst def listInitTimes(self, prefix: pathlib.Path) -> list[dt.datetime]: return testInitTimes @@ -40,8 +41,12 @@ def copyITFolderToTemp(self, *, prefix: pathlib.Path, it: dt.datetime) \ -> list[pathlib.Path]: return [pathlib.Path(f'{it:%Y%m%d%H%M}/{f}.grib') for f in INIT_TIME_FILES] - def delete(self, *, dst: pathlib.Path) -> None: - pass + def delete(self, *, p: pathlib.Path) -> None: + if p.exists(): + if p.is_dir(): + shutil.rmtree(p.as_posix(), ignore_errors=True) + else: + p.unlink(missing_ok=True) class DummyFileInfo(internal.FileInfoModel): @@ -95,6 +100,8 @@ def mapTemp(self, *, p: pathlib.Path) -> xr.Dataset: class TestNWPConsumerService(unittest.TestCase): + service: NWPConsumerService + @classmethod def setUpClass(cls) -> None: testStorer = DummyStorer() @@ -112,25 +119,26 @@ def test_downloadRawDataset(self): startDate = testInitTimes[0].date() endDate = testInitTimes[-1].date() - n = self.service.DownloadRawDataset(start=startDate, end=endDate) + files = self.service.DownloadRawDataset(start=startDate, end=endDate) # 2 files per init time, all init times - self.assertEqual(2 * len(INIT_HOURS) * (len(DAYS)) * len("xxxxx.grib"), n) + self.assertEqual(2 * len(INIT_HOURS) * (len(DAYS)), len(files)) def test_convertRawDataset(self): startDate = testInitTimes[0].date() endDate = testInitTimes[-1].date() - n = self.service.ConvertRawDatasetToZarr(start=startDate, end=endDate) + files = self.service.ConvertRawDatasetToZarr(start=startDate, end=endDate) # 1 Dataset per init time, all init times per day, all days filesize = len(dt.datetime.now().strftime(internal.ZARR_FMTSTR.split("/")[-1]) + ".zarr.zip") - self.assertEqual(1 * len(INIT_HOURS) * (len(DAYS)) * filesize, n) + self.assertEqual(1 * len(INIT_HOURS) * (len(DAYS)), len(files)) def test_createLatestZarr(self): - n1 = self.service.CreateLatestZarr() - self.assertEqual(len("latest.zarr.zip"), n1) + files = self.service.CreateLatestZarr() + # 1 zarr, 1 zipped zarr + self.assertEqual(2, len(files)) # ------------ Static Methods ----------- # diff --git a/src/test_integration/test_inputs_integration.py b/src/test_integration/test_inputs_integration.py index 310251ad..dcd51ad9 100644 --- a/src/test_integration/test_inputs_integration.py +++ b/src/test_integration/test_inputs_integration.py @@ -150,3 +150,4 @@ def test_getsFileInfosFromICON(self): if __name__ == '__main__': unittest.main() + diff --git a/src/test_integration/test_service_integration.py b/src/test_integration/test_service_integration.py index 9eeb621b..951756f3 100644 --- a/src/test_integration/test_service_integration.py +++ b/src/test_integration/test_service_integration.py @@ -42,11 +42,11 @@ def setUp(self) -> None: def test_downloadAndConvertDataset(self): initTime: dt.date = dt.datetime.now().date() - nbytes = self.testService.DownloadRawDataset(start=initTime, end=initTime) - self.assertGreater(nbytes, 0) + out = self.testService.DownloadRawDataset(start=initTime, end=initTime) + self.assertGreater(len(out), 0) - nbytes = self.testService.ConvertRawDatasetToZarr(start=initTime, end=initTime) - self.assertGreater(nbytes, 0) + out = self.testService.ConvertRawDatasetToZarr(start=initTime, end=initTime) + self.assertGreater(len(out), 0) for path in pathlib.Path(self.zarrdir).glob(ZARR_GLOBSTR + '.zarr.zip'): @@ -93,11 +93,11 @@ def setUp(self) -> None: def test_downloadAndConvertDataset(self): initTime: dt.date = dt.date(year=2022, month=1, day=1) - nbytes = self.testService.DownloadRawDataset(start=initTime, end=initTime) - self.assertGreater(nbytes, 0) + out = self.testService.DownloadRawDataset(start=initTime, end=initTime) + self.assertGreater(len(out), 0) - nbytes = self.testService.ConvertRawDatasetToZarr(start=initTime, end=initTime) - self.assertGreater(nbytes, 0) + out = self.testService.ConvertRawDatasetToZarr(start=initTime, end=initTime) + self.assertGreater(len(out), 0) for path in pathlib.Path(self.zarrdir).glob(ZARR_GLOBSTR + '.zarr.zip'): ds = xr.open_zarr(store=f"zip::{path.as_posix()}").compute() @@ -138,11 +138,11 @@ def setUp(self): def test_downloadAndConvertDataset(self): initTime: dt.date = dt.date(year=2022, month=1, day=1) - nbytes = self.testService.DownloadRawDataset(start=initTime, end=initTime) - self.assertGreater(nbytes, 0) + out = self.testService.DownloadRawDataset(start=initTime, end=initTime) + self.assertGreater(len(out), 0) - nbytes = self.testService.ConvertRawDatasetToZarr(start=initTime, end=initTime) - self.assertGreater(nbytes, 0) + out = self.testService.ConvertRawDatasetToZarr(start=initTime, end=initTime) + self.assertGreater(len(out), 0) for path in pathlib.Path(self.zarrdir).glob(ZARR_GLOBSTR + '.zarr.zip'): ds = xr.open_zarr(store=f"zip::{path.as_posix()}").compute() @@ -168,7 +168,7 @@ def setUp(self) -> None: storageClient = outputs.localfs.Client() env = config.ICONEnv() iconClient = inputs.icon.Client( - model="global", + model="globallen(out) ) self.rawdir = 'data/ic_raw' @@ -184,11 +184,11 @@ def setUp(self) -> None: def test_downloadAndConvertDataset(self): initTime: dt.date = dt.datetime.now().date() - nbytes = self.testService.DownloadRawDataset(start=initTime, end=initTime) - self.assertGreater(nbytes, 0) + out = self.testService.DownloadRawDataset(start=initTime, end=initTime) + self.assertGreater(len(out), 0) - nbytes = self.testService.ConvertRawDatasetToZarr(start=initTime, end=initTime) - self.assertGreater(nbytes, 0) + out = self.testService.ConvertRawDatasetToZarr(start=initTime, end=initTime) + self.assertGreater(len(out), 0) for path in pathlib.Path(self.zarrdir).glob(ZARR_GLOBSTR + '.zarr.zip'): ds = xr.open_zarr(store=f"zip::{path.as_posix()}").compute() @@ -205,3 +205,4 @@ def test_downloadAndConvertDataset(self): shutil.rmtree(self.rawdir) shutil.rmtree(self.zarrdir) +