Skip to content

Commit

Permalink
Remove func parameter from map calls
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Jan 17, 2024
1 parent df42906 commit 579d2f2
Showing 1 changed file with 5 additions and 7 deletions.
12 changes: 5 additions & 7 deletions src/nwp_consumer/internal/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,14 @@ def DownloadRawDataset(self, *, start: dt.datetime, end: dt.datetime) -> list[pa
# Create a dask pipeline to download the files
storedFiles: list[pathlib.Path] = (
dask.bag.from_sequence(seq=newWantedFileInfos, npartitions=len(newWantedFileInfos))
.map(func=lambda fi: self.fetcher.downloadToTemp(fi=fi))
.filter(func=lambda infoPathTuple: infoPathTuple[1] != pathlib.Path())
.map(
func=lambda infoPathTuple: self.storer.store(
.map(lambda fi: self.fetcher.downloadToTemp(fi=fi))
.filter(lambda infoPathTuple: infoPathTuple[1] != pathlib.Path())
.map(lambda infoPathTuple: self.storer.store(
src=infoPathTuple[1],
dst=self.rawdir
/ infoPathTuple[0].it().strftime(internal.IT_FOLDER_FMTSTR)
/ (infoPathTuple[0].filename()),
),
pure=False,
)
)
.compute()
)
Expand Down Expand Up @@ -171,7 +169,7 @@ def ConvertRawDatasetToZarr(
# * Partition the bag by init time
bag = dask.bag.from_sequence(desiredInitTimes, npartitions=len(desiredInitTimes))
storedfiles = (
bag.map(func=lambda time: self.storer.copyITFolderToTemp(prefix=self.rawdir, it=time))
bag.map(lambda time: self.storer.copyITFolderToTemp(prefix=self.rawdir, it=time))
.filter(lambda temppaths: len(temppaths) != 0)
.map(lambda temppaths: [self.fetcher.mapTemp(p=p) for p in temppaths])
.map(lambda datasets: _mergeDatasets(datasets=datasets))
Expand Down

0 comments on commit 579d2f2

Please sign in to comment.