From f7abe42119d01d589b4fd5ae37161edb2ed065c6 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Mon, 9 Dec 2019 13:52:57 +0100 Subject: [PATCH 01/24] Concurrent loading of ensemble --- src/fmu/ensemble/ensemble.py | 63 ++++++++++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 14 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index de944d8b..e5daf94d 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -10,6 +10,11 @@ import os import glob from datetime import datetime +try: + import concurrent.futures + USE_CONCURRENT = True +except ImportError: + USE_CONCURRENT = False import six import pandas as pd @@ -205,6 +210,20 @@ def _shortcut2path(keys, shortpath): # calling function handle further errors. return shortpath + def InitScratchRealization(self, realdir): + """Wrapper function used for concurrent loading + + Args: + realdir (string): directory to realization + + Returns: + realization (ScratchRealization): Initialized ScratchRealization + """ + realization = ScratchRealization( + realdir, realidxregexp=None, autodiscovery=True + ) + return realization + def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): """Utility function to add realizations to the ensemble. @@ -232,23 +251,39 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): ) else: globbedpaths = glob.glob(paths) - + count = 0 - for realdir in globbedpaths: - realization = ScratchRealization( - realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery - ) - if realization.index is None: - logger.critical( - "Could not determine realization index " + "for path " + realdir + if USE_CONCURRENT: + with concurrent.futures.ProcessPoolExecutor() as executor: + for realization in executor.map(self.InitScratchRealization, globbedpaths): + if realization.index is None: + logger.critical( + "Could not determine realization index " + "for path " + realdir + ) + if not realidxregexp: + logger.critical("Maybe you need to supply a regexp.") + else: + logger.critical("Your regular expression is maybe wrong.") + else: + count += 1 + self._realizations[realization.index] = realization + else: + for realdir in globbedpaths: + realization = ScratchRealization( + realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery ) - if not realidxregexp: - logger.critical("Maybe you need to supply a regexp.") + if realization.index is None: + logger.critical( + "Could not determine realization index " + "for path " + realdir + ) + if not realidxregexp: + logger.critical("Maybe you need to supply a regexp.") + else: + logger.critical("Your regular expression is maybe wrong.") else: - logger.critical("Your regular expression is maybe wrong.") - else: - count += 1 - self._realizations[realization.index] = realization + count += 1 + self._realizations[realization.index] = realization + logger.info("add_realizations() found %d realizations", len(self._realizations)) return count From 06910ed279e779e97807855ce1a33315f5edd521 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 10:40:04 +0100 Subject: [PATCH 02/24] Removed some spaces --- src/fmu/ensemble/ensemble.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index e5daf94d..ca42fbc2 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -251,7 +251,7 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): ) else: globbedpaths = glob.glob(paths) - + count = 0 if USE_CONCURRENT: with concurrent.futures.ProcessPoolExecutor() as executor: From 37cbc426d5bc3567a5b86bd5ed5af0fff3917ddc Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 12:53:39 +0100 Subject: [PATCH 03/24] Remove double code, changed camel case --- src/fmu/ensemble/ensemble.py | 62 +++++++++++++++++++++--------------- 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index ca42fbc2..f70245c3 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -210,11 +210,12 @@ def _shortcut2path(keys, shortpath): # calling function handle further errors. return shortpath - def InitScratchRealization(self, realdir): + @staticmethod + def _init_scratch_realization(realdir): """Wrapper function used for concurrent loading Args: - realdir (string): directory to realization + realdir (str): directory to realization Returns: realization (ScratchRealization): Initialized ScratchRealization @@ -222,7 +223,34 @@ def InitScratchRealization(self, realdir): realization = ScratchRealization( realdir, realidxregexp=None, autodiscovery=True ) - return realization + return realdir, realization + + def _check_loading_of_realization(self, realization, realdir, realidxregexp): + """Helper function for checking and logging the results of loading a realization. + Succesfully loaded realizations will be added to self._realizations. + + Args: + realization (ScratchRealization): Initialized ScratchRealization + realdir (str): directory to realization + realidxregexp (str): Regular expression + + Returns: + int: either 0 on fail, or 1 on success + """ + return_value = 0 + if realization.index is None: + logger.critical( + "Could not determine realization index " + "for path " + realdir + ) + if not realidxregexp: + logger.critical("Maybe you need to supply a regexp.") + else: + logger.critical("Your regular expression is maybe wrong.") + else: + self._realizations[realization.index] = realization + return_value = 1 + + return return_value def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): """Utility function to add realizations to the ensemble. @@ -237,6 +265,7 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): Args: paths (list/str): String or list of strings with wildcards to file system. Absolute or relative paths. + realidxregexp (str): Regular expression autodiscovery (boolean): whether files can be attempted auto-discovered @@ -255,36 +284,17 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): count = 0 if USE_CONCURRENT: with concurrent.futures.ProcessPoolExecutor() as executor: - for realization in executor.map(self.InitScratchRealization, globbedpaths): - if realization.index is None: - logger.critical( - "Could not determine realization index " + "for path " + realdir - ) - if not realidxregexp: - logger.critical("Maybe you need to supply a regexp.") - else: - logger.critical("Your regular expression is maybe wrong.") - else: - count += 1 - self._realizations[realization.index] = realization + for realdir, realization in executor.map(self._init_scratch_realization, globbedpaths): + count += self._check_loading_of_realization(realization=realization, realdir=realdir, realidxregexp=realidxregexp) else: for realdir in globbedpaths: realization = ScratchRealization( realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery ) - if realization.index is None: - logger.critical( - "Could not determine realization index " + "for path " + realdir - ) - if not realidxregexp: - logger.critical("Maybe you need to supply a regexp.") - else: - logger.critical("Your regular expression is maybe wrong.") - else: - count += 1 - self._realizations[realization.index] = realization + count += self._check_loading_of_realization(realization=realization, realdir=realdir, realidxregexp=realidxregexp) logger.info("add_realizations() found %d realizations", len(self._realizations)) + return count def add_from_runpathfile(self, runpath, runpathfilter=None): From eea4314e22aa3fa33c412c403c87f6b44c422e74 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 12:53:58 +0100 Subject: [PATCH 04/24] black --- src/fmu/ensemble/ensemble.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index f70245c3..d21f664a 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -10,8 +10,10 @@ import os import glob from datetime import datetime + try: import concurrent.futures + USE_CONCURRENT = True except ImportError: USE_CONCURRENT = False @@ -284,14 +286,24 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): count = 0 if USE_CONCURRENT: with concurrent.futures.ProcessPoolExecutor() as executor: - for realdir, realization in executor.map(self._init_scratch_realization, globbedpaths): - count += self._check_loading_of_realization(realization=realization, realdir=realdir, realidxregexp=realidxregexp) + for realdir, realization in executor.map( + self._init_scratch_realization, globbedpaths + ): + count += self._check_loading_of_realization( + realization=realization, + realdir=realdir, + realidxregexp=realidxregexp, + ) else: for realdir in globbedpaths: realization = ScratchRealization( realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery ) - count += self._check_loading_of_realization(realization=realization, realdir=realdir, realidxregexp=realidxregexp) + count += self._check_loading_of_realization( + realization=realization, + realdir=realdir, + realidxregexp=realidxregexp, + ) logger.info("add_realizations() found %d realizations", len(self._realizations)) From d16f52551481ff1b1311f88b19f9f440df6aec67 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 12:58:22 +0100 Subject: [PATCH 05/24] Leaner writing of functional calls --- src/fmu/ensemble/ensemble.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index d21f664a..0e861bda 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -290,19 +290,13 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): self._init_scratch_realization, globbedpaths ): count += self._check_loading_of_realization( - realization=realization, - realdir=realdir, - realidxregexp=realidxregexp, + realization, realdir, realidxregexp ) else: for realdir in globbedpaths: - realization = ScratchRealization( - realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery - ) + realization = ScratchRealization(realdir, realidxregexp, autodiscovery) count += self._check_loading_of_realization( - realization=realization, - realdir=realdir, - realidxregexp=realidxregexp, + realization, realdir, realidxregexp ) logger.info("add_realizations() found %d realizations", len(self._realizations)) From 24492f991e982df5f584e7434d0ff9e800c8d3bf Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 13:00:54 +0100 Subject: [PATCH 06/24] Small change in documentation --- src/fmu/ensemble/ensemble.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 0e861bda..2149e340 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -234,7 +234,7 @@ def _check_loading_of_realization(self, realization, realdir, realidxregexp): Args: realization (ScratchRealization): Initialized ScratchRealization realdir (str): directory to realization - realidxregexp (str): Regular expression + realidxregexp (str): Regular expression or None Returns: int: either 0 on fail, or 1 on success @@ -267,7 +267,7 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): Args: paths (list/str): String or list of strings with wildcards to file system. Absolute or relative paths. - realidxregexp (str): Regular expression + realidxregexp (str): Regular expression or None autodiscovery (boolean): whether files can be attempted auto-discovered @@ -292,6 +292,7 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): count += self._check_loading_of_realization( realization, realdir, realidxregexp ) + print(realidxregexp) else: for realdir in globbedpaths: realization = ScratchRealization(realdir, realidxregexp, autodiscovery) From ac061e8cfd191bc40ae8da38ad3cd3badd082cac Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 13:23:59 +0100 Subject: [PATCH 07/24] Added concurrent to load_file() --- src/fmu/ensemble/ensemble.py | 67 ++++++++++++++++++++++++++++-------- 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 2149e340..fdb8349b 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -292,7 +292,6 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): count += self._check_loading_of_realization( realization, realdir, realidxregexp ) - print(realidxregexp) else: for realdir in globbedpaths: realization = ScratchRealization(realdir, realidxregexp, autodiscovery) @@ -521,6 +520,43 @@ def load_csv(self, localpath, convert_numeric=True, force_reread=False): """ return self.load_file(localpath, "csv", convert_numeric, force_reread) + def _load_file( + self, realization, localpath, fformat, convert_numeric, force_reread + ): + """Wrapper function to be used for parallel loading of files + + Args: + realization: Single realization + localpath (str): path to the text file, relative to each realization + fformat (str): string identifying the file format. Supports 'txt' + and 'csv'. + convert_numeric (boolean): If set to True, numerical columns + will be searched for and have their dtype set + to integers or floats. If scalars, only numerical + data will be loaded. + force_reread (boolean): Force reread from file system. If + False, repeated calls to this function will + returned cached results. + + Returns: + Nothing + + """ + try: + realization.load_file(localpath, fformat, convert_numeric, force_reread) + except ValueError: + # This would at least occur for unsupported fileformat, + # and that we should not skip. + logger.critical("load_file() failed in realization %d", index) + raise ValueError + except IOError: + # At ensemble level, we allow files to be missing in + # some realizations + logger.warning("Could not read %s for realization %d", localpath, index) + + if self.get_df(localpath).empty: + raise ValueError("No ensemble data found for %s", localpath) + def load_file(self, localpath, fformat, convert_numeric=False, force_reread=False): """Function for calling load_file() in every realization @@ -540,21 +576,22 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals Returns: pd.Dataframe: with loaded data aggregated. Column 'REAL' distuinguishes each realizations data. + """ - for index, realization in self._realizations.items(): - try: - realization.load_file(localpath, fformat, convert_numeric, force_reread) - except ValueError: - # This would at least occur for unsupported fileformat, - # and that we should not skip. - logger.critical("load_file() failed in realization %d", index) - raise ValueError - except IOError: - # At ensemble level, we allow files to be missing in - # some realizations - logger.warning("Could not read %s for realization %d", localpath, index) - if self.get_df(localpath).empty: - raise ValueError("No ensemble data found for %s", localpath) + if USE_CONCURRENT: + with concurrent.futures.ProcessPoolExecutor() as executor: + for realization in executor.map( + self._load_file, self._realizations.items() + ): + self._load_( + realization, localpath, fformat, convert_numeric, force_reread + ) + else: + for index, realization in self._realizations.items(): + self._load_( + realization, localpath, fformat, convert_numeric, force_reread + ) + return self.get_df(localpath) def find_files(self, paths, metadata=None, metayaml=False): From 983e92737d1727ffdf4a626daa74f4da5ffd8c7f Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 13:43:12 +0100 Subject: [PATCH 08/24] Fix static realidxregexp and autodiscovery --- src/fmu/ensemble/ensemble.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index fdb8349b..ad470259 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -213,17 +213,20 @@ def _shortcut2path(keys, shortpath): return shortpath @staticmethod - def _init_scratch_realization(realdir): + def _init_scratch_realization(realdir, realidxregexp, autodiscovery): """Wrapper function used for concurrent loading Args: realdir (str): directory to realization + realidxregexp (str): Regular expression or None + autodiscovery (boolean): whether files can be attempted + auto-discovered Returns: realization (ScratchRealization): Initialized ScratchRealization """ realization = ScratchRealization( - realdir, realidxregexp=None, autodiscovery=True + realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery ) return realdir, realization @@ -285,9 +288,10 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): count = 0 if USE_CONCURRENT: + args = [(realdir, realidxregexp, autodiscovery) for realdir in globbedpaths] with concurrent.futures.ProcessPoolExecutor() as executor: for realdir, realization in executor.map( - self._init_scratch_realization, globbedpaths + self._init_scratch_realization, *zip(*args) ): count += self._check_loading_of_realization( realization, realdir, realidxregexp From 501005739a59029b1b01647751f376c93331bfff Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 13:54:50 +0100 Subject: [PATCH 09/24] Added index, minor documentation change --- src/fmu/ensemble/ensemble.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index ad470259..027da4ac 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -231,8 +231,9 @@ def _init_scratch_realization(realdir, realidxregexp, autodiscovery): return realdir, realization def _check_loading_of_realization(self, realization, realdir, realidxregexp): - """Helper function for checking and logging the results of loading a realization. - Succesfully loaded realizations will be added to self._realizations. + """Helper function for checking and logging the results of loading a + realization. Successfully loaded realizations will be added to + self._realizations. Args: realization (ScratchRealization): Initialized ScratchRealization @@ -525,7 +526,7 @@ def load_csv(self, localpath, convert_numeric=True, force_reread=False): return self.load_file(localpath, "csv", convert_numeric, force_reread) def _load_file( - self, realization, localpath, fformat, convert_numeric, force_reread + self, realization, localpath, fformat, convert_numeric, force_reread, index ): """Wrapper function to be used for parallel loading of files @@ -541,6 +542,7 @@ def _load_file( force_reread (boolean): Force reread from file system. If False, repeated calls to this function will returned cached results. + index (int): realization index Returns: Nothing @@ -583,17 +585,22 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals """ if USE_CONCURRENT: + args = [ + (realization, localpath, fformat, convert_numeric, force_reread, index) + for index, realization in self._realizations.items() + ] with concurrent.futures.ProcessPoolExecutor() as executor: - for realization in executor.map( - self._load_file, self._realizations.items() - ): - self._load_( - realization, localpath, fformat, convert_numeric, force_reread - ) + for _ in executor.map(self._load_file, *zip(*args)): + pass else: for index, realization in self._realizations.items(): - self._load_( - realization, localpath, fformat, convert_numeric, force_reread + self._load_file( + realization, + localpath, + fformat, + convert_numeric, + force_reread, + index, ) return self.get_df(localpath) From f2565fb82b3eacdb79c09ec0c9970c5ef3d53823 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 14:01:00 +0100 Subject: [PATCH 10/24] Limit concurrent.futures import to ProcessPollExecutor --- src/fmu/ensemble/ensemble.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 027da4ac..328453fb 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -12,7 +12,7 @@ from datetime import datetime try: - import concurrent.futures + from concurrent.futures import ProcessPoolExecutor USE_CONCURRENT = True except ImportError: @@ -290,7 +290,7 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): count = 0 if USE_CONCURRENT: args = [(realdir, realidxregexp, autodiscovery) for realdir in globbedpaths] - with concurrent.futures.ProcessPoolExecutor() as executor: + with ProcessPoolExecutor() as executor: for realdir, realization in executor.map( self._init_scratch_realization, *zip(*args) ): @@ -589,7 +589,7 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals (realization, localpath, fformat, convert_numeric, force_reread, index) for index, realization in self._realizations.items() ] - with concurrent.futures.ProcessPoolExecutor() as executor: + with ProcessPoolExecutor() as executor: for _ in executor.map(self._load_file, *zip(*args)): pass else: From 184cea723839fecf41589177150dc0e08142fb4b Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 14:05:19 +0100 Subject: [PATCH 11/24] Minor documentation changes --- src/fmu/ensemble/ensemble.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 328453fb..20ed7da3 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -268,6 +268,9 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): This function passes on initialization to ScratchRealization and stores a reference to those generated objects. + This function will use multithreading when running in Python 3.2+, + via the concurrent.futures modules. + Args: paths (list/str): String or list of strings with wildcards to file system. Absolute or relative paths. @@ -566,7 +569,8 @@ def _load_file( def load_file(self, localpath, fformat, convert_numeric=False, force_reread=False): """Function for calling load_file() in every realization - This function may utilize multithreading. + This function will use multithreading when running in Python 3.2+, + via the concurrent.futures modules. Args: localpath (str): path to the text file, relative to each realization @@ -581,7 +585,7 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals returned cached results. Returns: pd.Dataframe: with loaded data aggregated. Column 'REAL' - distuinguishes each realizations data. + distinguishes each realizations data. """ if USE_CONCURRENT: From 2b49f8bb54acb4179776dd3a9f78421356832496 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 14:53:26 +0100 Subject: [PATCH 12/24] Fixed missing positional argument names --- src/fmu/ensemble/ensemble.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 20ed7da3..ac9c779d 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -302,13 +302,14 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): ) else: for realdir in globbedpaths: - realization = ScratchRealization(realdir, realidxregexp, autodiscovery) + realization = ScratchRealization( + realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery + ) count += self._check_loading_of_realization( realization, realdir, realidxregexp ) logger.info("add_realizations() found %d realizations", len(self._realizations)) - return count def add_from_runpathfile(self, runpath, runpathfilter=None): From 58d55cf658eb2700b1be89a8b217a66ac79a1699 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 16:19:04 +0100 Subject: [PATCH 13/24] Changed argument creation for concurrent --- src/fmu/ensemble/ensemble.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index ac9c779d..ba800240 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -564,9 +564,6 @@ def _load_file( # some realizations logger.warning("Could not read %s for realization %d", localpath, index) - if self.get_df(localpath).empty: - raise ValueError("No ensemble data found for %s", localpath) - def load_file(self, localpath, fformat, convert_numeric=False, force_reread=False): """Function for calling load_file() in every realization @@ -590,10 +587,18 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals """ if USE_CONCURRENT: - args = [ - (realization, localpath, fformat, convert_numeric, force_reread, index) - for index, realization in self._realizations.items() - ] + args = [] + for index, realization in self._realizations.items(): + args.append( + ( + realization, + localpath, + fformat, + convert_numeric, + force_reread, + index, + ) + ) with ProcessPoolExecutor() as executor: for _ in executor.map(self._load_file, *zip(*args)): pass @@ -608,6 +613,9 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals index, ) + if self.get_df(localpath).empty: + raise ValueError("No ensemble data found for %s", localpath) + return self.get_df(localpath) def find_files(self, paths, metadata=None, metayaml=False): @@ -723,6 +731,7 @@ def get_df(self, localpath): # No logging here, those error messages # should have appeared at construction using load_*() pass + if dflist: # Merge a dictionary of dataframes. The dict key is # the realization index, and end up in a MultiIndex From 08fac791e148ec75d4d1fcb5a97f8b6e12b5ed23 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 16:30:38 +0100 Subject: [PATCH 14/24] Fix for concurrent not updating self --- src/fmu/ensemble/ensemble.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index ba800240..9e3fa431 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -529,8 +529,9 @@ def load_csv(self, localpath, convert_numeric=True, force_reread=False): """ return self.load_file(localpath, "csv", convert_numeric, force_reread) + @staticmethod def _load_file( - self, realization, localpath, fformat, convert_numeric, force_reread, index + realization, localpath, fformat, convert_numeric, force_reread, index ): """Wrapper function to be used for parallel loading of files @@ -549,7 +550,7 @@ def _load_file( index (int): realization index Returns: - Nothing + realization with loaded file. """ try: @@ -564,6 +565,8 @@ def _load_file( # some realizations logger.warning("Could not read %s for realization %d", localpath, index) + return realization + def load_file(self, localpath, fformat, convert_numeric=False, force_reread=False): """Function for calling load_file() in every realization @@ -600,8 +603,8 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals ) ) with ProcessPoolExecutor() as executor: - for _ in executor.map(self._load_file, *zip(*args)): - pass + for realization in executor.map(self._load_file, *zip(*args)): + self._realizations[realization.index] = realization else: for index, realization in self._realizations.items(): self._load_file( From 3904d2152042d0d15f3b4d447cddd47bd6bebf77 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Tue, 10 Dec 2019 20:36:04 +0100 Subject: [PATCH 15/24] Concurrent to load_smry --- src/fmu/ensemble/ensemble.py | 71 ++++++++++++++++++++++++++++-------- 1 file changed, 56 insertions(+), 15 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 9e3fa431..5e55dcec 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -745,6 +745,28 @@ def get_df(self, localpath): else: raise ValueError("No data found for " + localpath) + @staticmethod + def _load_smry( + realidx, + realization, + time_index, + column_keys, + cache_eclsum, + start_date, + end_date, + include_restart, + ): + logger.info("Loading smry from realization %s", realidx) + realization.load_smry( + time_index=time_index, + column_keys=column_keys, + cache_eclsum=cache_eclsum, + start_date=start_date, + end_date=end_date, + include_restart=include_restart, + ) + return realization + def load_smry( self, time_index="raw", @@ -814,21 +836,40 @@ def load_smry( """ if not stacked: raise NotImplementedError - # Future: Multithread this! - for realidx, realization in self._realizations.items(): - # We do not store the returned DataFrames here, - # instead we look them up afterwards using get_df() - # Downside is that we have to compute the name of the - # cached object as it is not returned. - logger.info("Loading smry from realization %s", realidx) - realization.load_smry( - time_index=time_index, - column_keys=column_keys, - cache_eclsum=cache_eclsum, - start_date=start_date, - end_date=end_date, - include_restart=include_restart, - ) + + if USE_CONCURRENT: + args = [] + for realidx, realization in self._realizations.items(): + args.append( + ( + realidx, + realization, + time_index, + column_keys, + False, + start_date, + end_date, + include_restart, + ) + ) + with ProcessPoolExecutor() as executor: + for realization in executor.map(self._load_smry, *zip(*args)): + self._realizations[realization.index] = realization + else: + for realidx, realization in self._realizations.items(): + # We do not store the returned DataFrames here, + # instead we look them up afterwards using get_df() + # Downside is that we have to compute the name of the + # cached object as it is not returned. + logger.info("Loading smry from realization %s", realidx) + realization.load_smry( + time_index=time_index, + column_keys=column_keys, + cache_eclsum=cache_eclsum, + start_date=start_date, + end_date=end_date, + include_restart=include_restart, + ) if isinstance(time_index, list): time_index = "custom" return self.get_df("share/results/tables/unsmry--" + time_index + ".csv") From 28e9870da9e03b852f39a0f9e6dc9f90d906209f Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Wed, 11 Dec 2019 12:25:53 +0100 Subject: [PATCH 16/24] Alternative way to test Python version --- src/fmu/ensemble/ensemble.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 5e55dcec..56cefd4d 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -6,16 +6,16 @@ from __future__ import division from __future__ import print_function +import sys import re import os import glob from datetime import datetime -try: +if sys.version_info >= (3, 2): from concurrent.futures import ProcessPoolExecutor - USE_CONCURRENT = True -except ImportError: +else: USE_CONCURRENT = False import six From 345ce2638fa5136029e5a8fab5b833bbe5963678 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Wed, 11 Dec 2019 14:07:16 +0100 Subject: [PATCH 17/24] removing caching for parallel --- src/fmu/ensemble/ensemble.py | 16 ++++++++---- src/fmu/ensemble/ensembleset.py | 17 +++++++++--- src/fmu/ensemble/realization.py | 23 +++++++++++++++-- tests/test_ensemble.py | 46 ++++++++++++++++++--------------- 4 files changed, 70 insertions(+), 32 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 56cefd4d..ee83f942 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -14,6 +14,7 @@ if sys.version_info >= (3, 2): from concurrent.futures import ProcessPoolExecutor + USE_CONCURRENT = True else: USE_CONCURRENT = False @@ -751,7 +752,6 @@ def _load_smry( realization, time_index, column_keys, - cache_eclsum, start_date, end_date, include_restart, @@ -760,7 +760,7 @@ def _load_smry( realization.load_smry( time_index=time_index, column_keys=column_keys, - cache_eclsum=cache_eclsum, + cache_eclsum=False, start_date=start_date, end_date=end_date, include_restart=include_restart, @@ -846,7 +846,6 @@ def load_smry( realization, time_index, column_keys, - False, start_date, end_date, include_restart, @@ -1065,6 +1064,8 @@ def get_smry_dates( Returns: list of datetimes. Empty list if no data found. """ + if USE_CONCURRENT: + cache_eclsum = False # Build list of list of eclsum dates eclsumsdates = [] @@ -1169,6 +1170,7 @@ def _get_smry_dates(eclsumsdates, freq, normalize, start_date, end_date): datetimes = [start_date] + datetimes if end_date and end_date not in datetimes: datetimes = datetimes + [end_date] + return datetimes def get_smry_stats( @@ -1218,6 +1220,9 @@ def get_smry_stats( strings in the outer index are changed accordingly. If no data is found, return empty DataFrame. """ + if USE_CONCURRENT: + cache = False + if quantiles is None: quantiles = [10, 90] @@ -1253,7 +1258,7 @@ def get_smry_stats( return pd.concat(dframes, names=["STATISTIC"], sort=False) - def get_wellnames(self, well_match=None): + def get_wellnames(self, well_match=None, cache=True): """ Return a union of all Eclipse Summary well names in all realizations (union). In addition, can return a list @@ -1262,6 +1267,7 @@ def get_wellnames(self, well_match=None): Args: well_match: `Optional`. String (or list of strings) with wildcard filter. If None, all wells are returned + cache (bool): `Optional`. Bool to set caching or not. Returns: list of strings with eclipse well names. Empty list if no summary file or no matched well names. @@ -1271,7 +1277,7 @@ def get_wellnames(self, well_match=None): well_match = [well_match] result = set() for _, realization in self._realizations.items(): - eclsum = realization.get_eclsum() + eclsum = realization.get_eclsum(cache=cache) if eclsum: if well_match is None: result = result.union(set(eclsum.wells())) diff --git a/src/fmu/ensemble/ensembleset.py b/src/fmu/ensemble/ensembleset.py index e99af1d6..c7183045 100644 --- a/src/fmu/ensemble/ensembleset.py +++ b/src/fmu/ensemble/ensembleset.py @@ -6,6 +6,7 @@ from __future__ import division from __future__ import print_function +import sys import re import os import glob @@ -526,7 +527,7 @@ def load_smry( self, time_index=None, column_keys=None, - cache_eclsum=True, + cache_eclsum=False, start_date=None, end_date=None, ): @@ -626,7 +627,7 @@ def get_smry( return pd.concat(smrylist, sort=False) def get_smry_dates( - self, freq="monthly", cache_eclsum=True, start_date=None, end_date=None + self, freq="monthly", cache_eclsum=False, start_date=None, end_date=None ): """Return list of datetimes from an ensembleset @@ -658,7 +659,7 @@ def get_smry_dates( rawdates = rawdates.union( ensemble.get_smry_dates( freq="report", - cache_eclsum=cache_eclsum, + cache_eclsum=False, start_date=start_date, end_date=end_date, ) @@ -696,7 +697,15 @@ def get_wellnames(self, well_match=None): summary file or no matched well names. """ + # Caching should not be used when running concurrent + if sys.version_info >= (3, 2): + cache = False + else: + cache = True + result = set() for _, ensemble in self._ensembles.items(): - result = result.union(ensemble.get_wellnames(well_match)) + result = result.union( + ensemble.get_wellnames(well_match=well_match, cache=cache) + ) return sorted(list(result)) diff --git a/src/fmu/ensemble/realization.py b/src/fmu/ensemble/realization.py index 065ffe06..00cd8f2e 100644 --- a/src/fmu/ensemble/realization.py +++ b/src/fmu/ensemble/realization.py @@ -38,6 +38,13 @@ except ImportError: HAVE_ECL2DF = False +try: + from concurrent.futures import ProcessPoolExecutor + + USE_CONCURRENT = True +except ImportError: + USE_CONCURRENT = False + from .etc import Interaction from .virtualrealization import VirtualRealization from .realizationcombination import RealizationCombination @@ -838,6 +845,13 @@ def get_eclsum(self, cache=True, include_restart=True): EclSum: object representing the summary file. None if nothing was found. """ + + if USE_CONCURRENT: + # Using caching when in concurrent mode will result + # in segementation errors from libecl. + # cache=False + pass + if cache and self._eclsum: # Return cached object if available if self._eclsum_include_restart == include_restart: return self._eclsum @@ -934,6 +948,9 @@ def load_smry( DataFrame: with summary keys as columns and dates as indices. Empty dataframe if no summary is available. """ + if USE_CONCURRENT: + cache = False + if not self.get_eclsum(cache=cache_eclsum): # Return empty, but do not store the empty dataframe in self.data return pd.DataFrame() @@ -1019,7 +1036,6 @@ def get_smry( ) else: time_index_arg = time_index - if self.get_eclsum(cache=cache_eclsum, include_restart=include_restart): try: dataframe = self.get_eclsum( @@ -1030,7 +1046,9 @@ def get_smry( return pd.DataFrame() if not cache_eclsum: # Ensure EclSum object can be garbage collected - self._eclsum = None + # Commented out otherwise segmetation error + # self._eclsum = None + pass return dataframe else: return pd.DataFrame() @@ -1237,6 +1255,7 @@ def get_smryvalues(self, props_wildcard=None): prop: self._eclsum.get_values(prop, report_only=False) for prop in props } dates = self._eclsum.get_dates(report_only=False) + return pd.DataFrame(data=data, index=dates) def get_smry_dates( diff --git a/tests/test_ensemble.py b/tests/test_ensemble.py index f3df423e..15a73284 100644 --- a/tests/test_ensemble.py +++ b/tests/test_ensemble.py @@ -5,6 +5,7 @@ from __future__ import division from __future__ import print_function +import sys import os import shutil @@ -767,35 +768,38 @@ def test_eclsumcaching(): # _eclsum variable must be None. ens.load_smry() - # Default is to do caching, so these will not be None: - assert all([x._eclsum for (idx, x) in ens._realizations.items()]) - # If we redo this operation, the same objects should all - # be None afterwards: - ens.load_smry(cache_eclsum=None) - assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) + if sys.version_info < (3, 2): + # When not using concurrent, in older Python versions, the default is + # to do caching, so these will not be None: + assert all([x._eclsum for (idx, x) in ens._realizations.items()]) - ens.get_smry() - assert all([x._eclsum for (idx, x) in ens._realizations.items()]) + # If we redo this operation, the same objects should all + # be None afterwards: + ens.load_smry(cache_eclsum=None) + assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) - ens.get_smry(cache_eclsum=False) - assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) + ens.get_smry() + assert all([x._eclsum for (idx, x) in ens._realizations.items()]) - ens.get_smry_stats() - assert all([x._eclsum for (idx, x) in ens._realizations.items()]) + ens.get_smry(cache_eclsum=False) + assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) - ens.get_smry_stats(cache_eclsum=False) - assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) + ens.get_smry_stats() + assert all([x._eclsum for (idx, x) in ens._realizations.items()]) - ens.get_smry_dates() - assert all([x._eclsum for (idx, x) in ens._realizations.items()]) + ens.get_smry_stats(cache_eclsum=False) + assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) - # Clear the cached objects because the statement above has cached it.. - for _, realization in ens._realizations.items(): - realization._eclsum = None + ens.get_smry_dates() + assert all([x._eclsum for (idx, x) in ens._realizations.items()]) - ens.get_smry_dates(cache_eclsum=False) - assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) + # Clear the cached objects because the statement above has cached it.. + for _, realization in ens._realizations.items(): + realization._eclsum = None + + ens.get_smry_dates(cache_eclsum=False) + assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) def test_filedescriptors(): From 7ac23bbf0aa6674a104ac4fbbdef51d1e742d73a Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Wed, 11 Dec 2019 14:19:53 +0100 Subject: [PATCH 18/24] Removed unused code, black --- src/fmu/ensemble/ensemble.py | 3 --- src/fmu/ensemble/ensembleset.py | 4 +--- src/fmu/ensemble/realization.py | 16 ++++++---------- 3 files changed, 7 insertions(+), 16 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index ee83f942..284d3647 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -1220,9 +1220,6 @@ def get_smry_stats( strings in the outer index are changed accordingly. If no data is found, return empty DataFrame. """ - if USE_CONCURRENT: - cache = False - if quantiles is None: quantiles = [10, 90] diff --git a/src/fmu/ensemble/ensembleset.py b/src/fmu/ensemble/ensembleset.py index c7183045..df14ecd9 100644 --- a/src/fmu/ensemble/ensembleset.py +++ b/src/fmu/ensemble/ensembleset.py @@ -626,9 +626,7 @@ def get_smry( if smrylist: return pd.concat(smrylist, sort=False) - def get_smry_dates( - self, freq="monthly", cache_eclsum=False, start_date=None, end_date=None - ): + def get_smry_dates(self, freq="monthly", start_date=None, end_date=None): """Return list of datetimes from an ensembleset Datetimes from each realization in each ensemble can diff --git a/src/fmu/ensemble/realization.py b/src/fmu/ensemble/realization.py index 00cd8f2e..8b0611ca 100644 --- a/src/fmu/ensemble/realization.py +++ b/src/fmu/ensemble/realization.py @@ -12,6 +12,7 @@ from __future__ import division from __future__ import print_function +import sys import os import re import copy @@ -38,17 +39,15 @@ except ImportError: HAVE_ECL2DF = False -try: - from concurrent.futures import ProcessPoolExecutor - - USE_CONCURRENT = True -except ImportError: - USE_CONCURRENT = False - from .etc import Interaction from .virtualrealization import VirtualRealization from .realizationcombination import RealizationCombination +if sys.version_info >= (3, 2): + USE_CONCURRENT = True +else: + USE_CONCURRENT = False + fmux = Interaction() logger = fmux.basiclogger(__name__) @@ -948,9 +947,6 @@ def load_smry( DataFrame: with summary keys as columns and dates as indices. Empty dataframe if no summary is available. """ - if USE_CONCURRENT: - cache = False - if not self.get_eclsum(cache=cache_eclsum): # Return empty, but do not store the empty dataframe in self.data return pd.DataFrame() From 57a3ccb21c5345d66e3f6753b3a223b59ced1043 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Wed, 11 Dec 2019 14:27:28 +0100 Subject: [PATCH 19/24] Minor code fixes --- src/fmu/ensemble/realization.py | 5 +---- tests/test_ensemble.py | 30 ++++++++++++++---------------- 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/src/fmu/ensemble/realization.py b/src/fmu/ensemble/realization.py index 8b0611ca..dab5f354 100644 --- a/src/fmu/ensemble/realization.py +++ b/src/fmu/ensemble/realization.py @@ -43,10 +43,7 @@ from .virtualrealization import VirtualRealization from .realizationcombination import RealizationCombination -if sys.version_info >= (3, 2): - USE_CONCURRENT = True -else: - USE_CONCURRENT = False +USE_CONCURRENT = bool(sys.version_info >= (3, 2)) fmux = Interaction() logger = fmux.basiclogger(__name__) diff --git a/tests/test_ensemble.py b/tests/test_ensemble.py index 15a73284..fef6cab8 100644 --- a/tests/test_ensemble.py +++ b/tests/test_ensemble.py @@ -751,25 +751,23 @@ def test_nonexisting(): def test_eclsumcaching(): """Test caching of eclsum""" + if sys.version_info < (3, 2): + if "__file__" in globals(): + # Easen up copying test code into interactive sessions + testdir = os.path.dirname(os.path.abspath(__file__)) + else: + testdir = os.path.abspath(".") - if "__file__" in globals(): - # Easen up copying test code into interactive sessions - testdir = os.path.dirname(os.path.abspath(__file__)) - else: - testdir = os.path.abspath(".") - - dirs = testdir + "/data/testensemble-reek001/" + "realization-*/iter-0" - ens = ScratchEnsemble("reektest", dirs) - - # The problem here is if you load in a lot of UNSMRY files - # and the Python process keeps them in memory. Not sure - # how to check in code that an object has been garbage collected - # but for garbage collection to work, at least the realization - # _eclsum variable must be None. + dirs = testdir + "/data/testensemble-reek001/" + "realization-*/iter-0" + ens = ScratchEnsemble("reektest", dirs) - ens.load_smry() + # The problem here is if you load in a lot of UNSMRY files + # and the Python process keeps them in memory. Not sure + # how to check in code that an object has been garbage collected + # but for garbage collection to work, at least the realization + # _eclsum variable must be None. + ens.load_smry() - if sys.version_info < (3, 2): # When not using concurrent, in older Python versions, the default is # to do caching, so these will not be None: assert all([x._eclsum for (idx, x) in ens._realizations.items()]) From cc8df36f37aa22d30995eec26f8e591ebba94a21 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Wed, 11 Dec 2019 16:30:55 +0100 Subject: [PATCH 20/24] Fix Python2.7 cach removal --- src/fmu/ensemble/realization.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/fmu/ensemble/realization.py b/src/fmu/ensemble/realization.py index dab5f354..b5bea9ae 100644 --- a/src/fmu/ensemble/realization.py +++ b/src/fmu/ensemble/realization.py @@ -1040,8 +1040,8 @@ def get_smry( if not cache_eclsum: # Ensure EclSum object can be garbage collected # Commented out otherwise segmetation error - # self._eclsum = None - pass + if not USE_CONCURRENT: + self._eclsum = None return dataframe else: return pd.DataFrame() From 32e9ec84c5e1a2c872363b20bfe386a23702e469 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Wed, 11 Dec 2019 16:45:05 +0100 Subject: [PATCH 21/24] Added pylint disable W0212 --- tests/test_ensemble.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_ensemble.py b/tests/test_ensemble.py index fef6cab8..c1c25de3 100644 --- a/tests/test_ensemble.py +++ b/tests/test_ensemble.py @@ -752,6 +752,7 @@ def test_nonexisting(): def test_eclsumcaching(): """Test caching of eclsum""" if sys.version_info < (3, 2): + # pylint: disable=W0212 if "__file__" in globals(): # Easen up copying test code into interactive sessions testdir = os.path.dirname(os.path.abspath(__file__)) From 8e76bfddda462ad028f95821c9a4b00262683846 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Wed, 11 Dec 2019 20:59:33 +0100 Subject: [PATCH 22/24] rm wrapper for ScratchRealization.__init__ Seems to work identically in this way --- src/fmu/ensemble/ensemble.py | 29 ++++++----------------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 284d3647..b9cf859d 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -213,23 +213,6 @@ def _shortcut2path(keys, shortpath): # calling function handle further errors. return shortpath - @staticmethod - def _init_scratch_realization(realdir, realidxregexp, autodiscovery): - """Wrapper function used for concurrent loading - - Args: - realdir (str): directory to realization - realidxregexp (str): Regular expression or None - autodiscovery (boolean): whether files can be attempted - auto-discovered - - Returns: - realization (ScratchRealization): Initialized ScratchRealization - """ - realization = ScratchRealization( - realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery - ) - return realdir, realization def _check_loading_of_realization(self, realization, realdir, realidxregexp): """Helper function for checking and logging the results of loading a @@ -295,12 +278,12 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): if USE_CONCURRENT: args = [(realdir, realidxregexp, autodiscovery) for realdir in globbedpaths] with ProcessPoolExecutor() as executor: - for realdir, realization in executor.map( - self._init_scratch_realization, *zip(*args) - ): - count += self._check_loading_of_realization( - realization, realdir, realidxregexp - ) + realfutures = [executor.submit(ScratchRealization, realdir, realidxregexp=realidxregexp, + autodiscovery=autodiscovery) for realdir in globbedpaths] + + for realfuture in realfutures: + count += self._check_loading_of_realization( + realfuture.result(), "dummydirfornow", None) else: for realdir in globbedpaths: realization = ScratchRealization( From e0ac3e71db96823b68b8933885fa56eab05b90f7 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Thu, 12 Dec 2019 08:32:04 +0100 Subject: [PATCH 23/24] Minor code fixes + black --- src/fmu/ensemble/ensemble.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index b9cf859d..f868d66d 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -213,7 +213,6 @@ def _shortcut2path(keys, shortpath): # calling function handle further errors. return shortpath - def _check_loading_of_realization(self, realization, realdir, realidxregexp): """Helper function for checking and logging the results of loading a realization. Successfully loaded realizations will be added to @@ -276,14 +275,21 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): count = 0 if USE_CONCURRENT: - args = [(realdir, realidxregexp, autodiscovery) for realdir in globbedpaths] with ProcessPoolExecutor() as executor: - realfutures = [executor.submit(ScratchRealization, realdir, realidxregexp=realidxregexp, - autodiscovery=autodiscovery) for realdir in globbedpaths] + realfutures = [ + executor.submit( + ScratchRealization, + realdir, + realidxregexp=realidxregexp, + autodiscovery=autodiscovery, + ) + for realdir in globbedpaths + ] - for realfuture in realfutures: + for i, realfuture in enumerate(realfutures): count += self._check_loading_of_realization( - realfuture.result(), "dummydirfornow", None) + realfuture.result(), globbedpaths[i], realidxregexp + ) else: for realdir in globbedpaths: realization = ScratchRealization( From 5909079b52d3010ad2448b293e63026ceb327bc0 Mon Sep 17 00:00:00 2001 From: "Wouter J. de Bruin" Date: Thu, 12 Dec 2019 08:38:08 +0100 Subject: [PATCH 24/24] Changed i to idx because of pylint complaining --- src/fmu/ensemble/ensemble.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index f868d66d..e36ffd3b 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -286,9 +286,9 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): for realdir in globbedpaths ] - for i, realfuture in enumerate(realfutures): + for idx, realfuture in enumerate(realfutures): count += self._check_loading_of_realization( - realfuture.result(), globbedpaths[i], realidxregexp + realfuture.result(), globbedpaths[idx], realidxregexp ) else: for realdir in globbedpaths: