From 211c3f268adcddf93be21508dbd3aad66b4a058a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Fri, 13 Dec 2019 12:46:32 +0100 Subject: [PATCH] Concurrency implementation of fmu-ensemble using concurrent.futures * Batch processing after init on ensembles * Functionality for turning off concurrency * Concurrent apply() * Parallelize add_from_runpathfile * Allow running find_files at init of realizations * Parallelize get_smry() --- .github/workflows/fmu-ensemble.yml | 1 + src/fmu/ensemble/common.py | 63 +++++ src/fmu/ensemble/ensemble.py | 294 +++++++++++++++++------ src/fmu/ensemble/realization.py | 68 ++++-- src/fmu/ensemble/virtualensemble.py | 7 +- tests/test_batch.py | 37 +++ tests/test_ecl2df.py | 46 ++-- tests/test_ensemble.py | 78 ++++-- tests/test_ensembleset.py | 19 +- tests/test_webviz_subsurface_testdata.py | 115 +++++++++ 10 files changed, 574 insertions(+), 154 deletions(-) create mode 100644 src/fmu/ensemble/common.py create mode 100644 tests/test_webviz_subsurface_testdata.py diff --git a/.github/workflows/fmu-ensemble.yml b/.github/workflows/fmu-ensemble.yml index d656179f..43c6b5d3 100644 --- a/.github/workflows/fmu-ensemble.yml +++ b/.github/workflows/fmu-ensemble.yml @@ -21,6 +21,7 @@ jobs: strategy: matrix: python-version: ['3.6', '3.7', '3.8'] + FMU_CONCURRENCY: ['True', 'False'] steps: - name: 📖 Checkout commit locally diff --git a/src/fmu/ensemble/common.py b/src/fmu/ensemble/common.py new file mode 100644 index 00000000..ba3267a4 --- /dev/null +++ b/src/fmu/ensemble/common.py @@ -0,0 +1,63 @@ +"""Common functions for fmu.ensemble""" + +import os +import sys + +import six + +from .etc import Interaction + +fmux = Interaction() +logger = fmux.basiclogger(__name__) + +ENV_NAME = "FMU_CONCURRENCY" + + +def use_concurrent(): + """Determine whether we should use concurrency or not + + This is based on both an environment variable + and presence of concurrent.futures, and on Python version + (Py2 deliberately not attempted to support) + + Returns: + bool: True if concurrency mode should be used + """ + if six.PY2: + # Py2-support not attempted + return False + if "concurrent.futures" in sys.modules: + if ENV_NAME not in os.environ: + return True + env_var = os.environ[ENV_NAME] + if ( + str(env_var) == "0" + or str(env_var).lower() == "false" + or str(env_var).lower() == "no" + ): + return False + return True + # If concurrent.futures is not available to import, we end here. + return False + + +def set_concurrent(concurrent): + """Set the concurrency mode used by fmu.ensemble. + + This is done through modifying the enviroment variable + for the current Python process + + If concurrency is asked for by but not possible, a warning + will be printed and the code will continue in sequential mode. + + Args: + concurrent (bool): Set to True if concurrent mode is requested, + False if not. + """ + if isinstance(concurrent, bool): + os.environ[ENV_NAME] = str(concurrent) + else: + raise TypeError + # Check for success: + if concurrent and not use_concurrent(): + logger.warning("Unable to activate concurrent code") diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index b2604d43..c12691f3 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -4,6 +4,7 @@ import os import glob import logging +from concurrent.futures import ProcessPoolExecutor import dateutil import pandas as pd @@ -12,7 +13,6 @@ from ecl import EclDataType from ecl.eclfile import EclKW -from .etc import Interaction # noqa from .realization import ScratchRealization from .virtualrealization import VirtualRealization from .virtualensemble import VirtualEnsemble @@ -20,6 +20,7 @@ from .realization import parse_number from .util import shortcut2path from .util.dates import unionize_smry_dates +from .common import use_concurrent logger = logging.getLogger(__name__) @@ -60,8 +61,8 @@ class ScratchEnsemble(object): or relative path to a realization RUNPATH, third column is the basename of the Eclipse simulation, relative to RUNPATH. Fourth column is not used. - runpathfilter (str): If supplied, the only the runpaths in - the runpathfile which contains this string will be included + runpathfilter (str): If supplied, only the runpaths in + the runpathfile which contain this string will be included Use to select only a specific realization f.ex. autodiscovery (boolean): True by default, means that the class can try to autodiscover data in the realization. Turn @@ -180,9 +181,12 @@ def add_realizations( Args: paths (list/str): String or list of strings with wildcards to file system. Absolute or relative paths. + realidxregexp (str): Passed on to ScratchRealization init, + used to determine index from the path. autodiscovery (boolean): whether files can be attempted auto-discovered - batch (list): Batch commands sent to each realization. + batch (list): Batch commands sent to each realization for + immediate execution after initialization. Returns: count (int): Number of realizations successfully added. @@ -195,13 +199,29 @@ def add_realizations( globbedpaths = glob.glob(paths) count = 0 - for realdir in globbedpaths: - realization = ScratchRealization( - realdir, - realidxregexp=realidxregexp, - autodiscovery=autodiscovery, - batch=batch, - ) + if use_concurrent(): + with ProcessPoolExecutor() as executor: + loaded_reals = [ + executor.submit( + ScratchRealization, + realdir, + realidxregexp=realidxregexp, + autodiscovery=autodiscovery, + batch=batch, + ).result() + for realdir in globbedpaths + ] + else: + loaded_reals = [ + ScratchRealization( + realdir, + realidxregexp=realidxregexp, + autodiscovery=autodiscovery, + batch=batch, + ) + for realdir in globbedpaths + ] + for realdir, realization in zip(globbedpaths, loaded_reals): if realization.index is None: logger.critical( "Could not determine realization index for path %s", realdir @@ -259,21 +279,46 @@ def add_from_runpathfile(self, runpath, runpathfilter=None, batch=None): ): raise ValueError("runpath dataframe not correct") - for _, row in runpath_df.iterrows(): - if runpathfilter and runpathfilter not in row["runpath"]: - continue - logger.info("Adding realization from %s", row["runpath"]) - realization = ScratchRealization( - row["runpath"], - index=int(row["index"]), - autodiscovery=False, - batch=batch, + if runpathfilter: + runpath_df = runpath_df[runpath_df["runpath"].str.contains(runpathfilter)] + + if use_concurrent(): + logger.info( + "Loading %s realizations concurrently from runpathfile", + str(len(runpath_df)), + ) + with ProcessPoolExecutor() as executor: + loaded_reals = [ + executor.submit( + ScratchRealization, + row.runpath, + index=int(row.index), + autodiscovery=False, + find_files=[ + row.eclbase + ".DATA", + row.eclbase + ".UNSMRY", + ], + batch=batch, + ).result() + for row in runpath_df.itertuples() + ] + else: + logger.info( + "Loading %s realizations sequentially from runpathfile", + str(len(runpath_df)), ) - # Use the ECLBASE from the runpath file to - # ensure we recognize the correct UNSMRY file - realization.find_files(row["eclbase"] + ".DATA") - realization.find_files(row["eclbase"] + ".UNSMRY") - self.realizations[int(row["index"])] = realization + loaded_reals = [ + ScratchRealization( + row.runpath, + index=int(row.index), + autodiscovery=False, + find_files=[row.eclbase + ".DATA", row.eclbase + ".UNSMRY"], + batch=batch, + ) + for row in runpath_df.itertuples() + ] + for real in loaded_reals: + self.realizations[real.index] = real return len(self) - prelength @@ -307,8 +352,11 @@ def remove_realizations(self, realindices): realindices = [realindices] popped = 0 for index in realindices: - self.realizations.pop(index, None) - popped += 1 + if index in self.realizations.keys(): + self.realizations.pop(index, None) + popped += 1 + else: + logger.warning("Can't remove realization %d, it is not there", index) logger.info("removed %d realization(s)", popped) def to_virtual(self, name=None): @@ -502,18 +550,18 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals pd.Dataframe: with loaded data aggregated. Column 'REAL' distuinguishes each realizations data. """ - for index, realization in self.realizations.items(): - try: - realization.load_file(localpath, fformat, convert_numeric, force_reread) - except ValueError as exc: - # This would at least occur for unsupported fileformat, - # and that we should not skip. - logger.critical("load_file() failed in realization %d", index) - raise ValueError from exc - except IOError: - # At ensemble level, we allow files to be missing in - # some realizations - logger.warning("Could not read %s for realization %d", localpath, index) + self.process_batch( + batch=[ + { + "load_file": { + "localpath": localpath, + "fformat": fformat, + "convert_numeric": convert_numeric, + "force_reread": force_reread, + } + } + ] + ) if self.get_df(localpath).empty: raise ValueError("No ensemble data found for {}".format(localpath)) return self.get_df(localpath) @@ -669,6 +717,10 @@ def get_df(self, localpath, merge=None): """ dflist = {} for index, realization in self.realizations.items(): + # There is probably no gain from running this concurrently + # over the realizations. Each realization object holds + # the dataframes in memory already, so retrieval + # is at no cost. try: data = realization.get_df(localpath, merge=merge) if isinstance(data, dict): @@ -762,25 +814,34 @@ def load_smry( """ if not stacked: raise NotImplementedError - # Future: Multithread this! - for realidx, realization in self.realizations.items(): - # We do not store the returned DataFrames here, - # instead we look them up afterwards using get_df() - # Downside is that we have to compute the name of the - # cached object as it is not returned. - logger.info("Loading smry from realization %s", realidx) - realization.load_smry( - time_index=time_index, - column_keys=column_keys, - cache_eclsum=cache_eclsum, - start_date=start_date, - end_date=end_date, - include_restart=include_restart, - ) + + # process_batch() will modify each Realization object + # and add the loaded smry data to the list of internalized + # data, under a well-defined name (/unsmry--.csv) + + # Since load_smry() also should return the aggregation + # of the loaded smry data, we need to pick up this + # data and aggregate it. + self.process_batch( + batch=[ + { + "load_smry": { + "column_keys": column_keys, + "time_index": time_index, + "cache_eclsum": cache_eclsum, + "start_date": start_date, + "end_date": end_date, + "include_restart": include_restart, + } + } + ] + ) if isinstance(time_index, (list, np.ndarray)): time_index = "custom" elif time_index is None: time_index = "raw" + # Note the dependency that the load_smry() function in + # ScratchRealization will store to this key-name: return self.get_df("share/results/tables/unsmry--" + time_index + ".csv") def get_volumetric_rates(self, column_keys=None, time_index=None): @@ -921,12 +982,31 @@ def process_batch(self, batch=None): ScratchEnsemble: This ensemble object (self), for it to be picked up by ProcessPoolExecutor and pickling. """ - for realization in self.realizations.values(): - realization.process_batch(batch) + if use_concurrent(): + with ProcessPoolExecutor() as executor: + real_indices = self.realizations.keys() + futures_reals = [ + executor.submit( + real.process_batch, batch, excepts=(OSError, IOError) + ) + for real in self.realizations.values() + ] + # Reassemble the realization dictionary from + # the pickled results of the ProcessPool: + self.realizations = { + r_idx: real + for (r_idx, real) in zip( + real_indices, [x.result() for x in futures_reals] + ) + } + else: + for realization in self.realizations.values(): + realization.process_batch(batch, excepts=(OSError, IOError)) + return self def apply(self, callback, **kwargs): - """Callback functionalty, apply a function to every realization + """Callback functionality, apply a function to every realization The supplied function handle will be handed over to each underlying realization object. The function supplied @@ -946,16 +1026,47 @@ def apply(self, callback, **kwargs): pd.DataFrame, aggregated result of the supplied function on each realization. """ - results = [] logger.info("Ensemble %s is running callback %s", self.name, str(callback)) - for realidx, realization in self.realizations.items(): - result = realization.apply(callback, **kwargs).copy() - # (we took a copy since we are modifying it here:) - # Todo: Avoid copy by concatenatint a dict of dataframes - # where realization index is the dict keys. - result["REAL"] = realidx - results.append(result) - return pd.concat(results, sort=False, ignore_index=True) + + # It is tempting to just call process_batch() here, but then we + # don't know how to collect the results from this particular + # apply() operation (if we enforced nonempty localpath, we could) + # > kwargs["callback"] = callback + # > ens.process_batch(batch=[{"apply": **kwargs}]) # (untested) + if use_concurrent(): + with ProcessPoolExecutor() as executor: + real_indices = self.realizations.keys() + kwargs["excepts"] = (OSError, IOError) + futures_reals = [ + executor.submit(real.apply, callback, **kwargs) + for real in self.realizations.values() + ] + # Reassemble a list of dataframes from the pickled results + # of the ProcessPool: + dframes_dict_from_apply = { + realidx: dframe + for (realidx, dframe) in zip( + real_indices, [x.result() for x in futures_reals] + ) + } + # If localpath is an argument to the apply function, we not only + # need to return the data aggregated, but should also modify + # the realization data-dictionary for each member of the ensemble + # object. + if "localpath" in kwargs: + for realidx, dataframe in dframes_dict_from_apply.items(): + self.realizations[realidx].data[kwargs["localpath"]] = dataframe + dframes_from_apply = [ + dframe.assign(REAL=realidx) + for (realidx, dframe) in dframes_dict_from_apply.items() + ] + + else: + dframes_from_apply = [ + realization.apply(callback, **kwargs).assign(REAL=realidx) + for (realidx, realization) in self.realizations.items() + ] + return pd.concat(dframes_from_apply, sort=False, ignore_index=True) def get_smry_dates( self, @@ -1374,19 +1485,44 @@ def get_smry( end_date=end_date, include_restart=include_restart, ) - dflist = [] - for index, realization in self.realizations.items(): - dframe = realization.get_smry( - time_index=time_index, - column_keys=column_keys, - cache_eclsum=cache_eclsum, - include_restart=include_restart, - ) - dframe.insert(0, "REAL", index) - dframe.index.name = "DATE" - dflist.append(dframe) - if dflist: - return pd.concat(dflist, sort=False).reset_index() + dframes = [] + if use_concurrent(): + with ProcessPoolExecutor() as executor: + real_indices = self.realizations.keys() + # Note that we cannot use process_batch() + # here as we need dataframes in return, not + # realizations. + futures_dframes = [ + executor.submit( + realization.get_smry, + time_index=time_index, + column_keys=column_keys, + cache_eclsum=cache_eclsum, + include_restart=include_restart, + ) + for realization in self.realizations.values() + ] + # Reassemble a list of dataframes from the pickled results + # of the ProcessPool: + for realidx, dframe in zip( + real_indices, [x.result() for x in futures_dframes] + ): + dframes.append(dframe.assign(REAL=realidx).rename_axis("DATE")) + else: + # Sequential version: + for realidx, realization in self.realizations.items(): + dframes.append( + realization.get_smry( + time_index=time_index, + column_keys=column_keys, + cache_eclsum=cache_eclsum, + include_restart=include_restart, + ) + .assign(REAL=realidx) + .rename_axis("DATE") + ) + if dframes: + return pd.concat(dframes, sort=False).reset_index() return pd.DataFrame() def get_eclgrid(self, props, report=0, agg="mean", active_only=False): diff --git a/src/fmu/ensemble/realization.py b/src/fmu/ensemble/realization.py index d748df06..07d0bba8 100644 --- a/src/fmu/ensemble/realization.py +++ b/src/fmu/ensemble/realization.py @@ -12,7 +12,7 @@ import copy import glob import json -from datetime import datetime, date, time +import datetime import dateutil import logging @@ -78,10 +78,18 @@ class ScratchRealization(object): should be run at time of initialization. Each element is a length 1 dictionary with the function name to run as the key and each keys value should be the function arguments as a dict. + find_files (list of str): Each element in this list will be given + to find_files() before any batch commands are processed. """ def __init__( - self, path, realidxregexp=None, index=None, autodiscovery=True, batch=None + self, + path, + realidxregexp=None, + index=None, + autodiscovery=True, + batch=None, + find_files=None, ): self._origpath = os.path.abspath(path) self.index = None @@ -101,7 +109,7 @@ def __init__( self.eclfiles = None # ecl2df.EclFiles object self._eclsum = None # Placeholder for caching - self._eclsum_include_restart = None # Flag for cached object + self._eclsum_include_restart = None # The datastore for internalized data. Dictionary # indexed by filenames (local to the realization). @@ -163,12 +171,16 @@ def __init__( if os.path.exists(os.path.join(abspath, "parameters.txt")): self.load_txt("parameters.txt") + if find_files is not None: + for to_find in find_files: + self.find_files(to_find) + if batch: self.process_batch(batch) logger.info("Initialized %s", abspath) - def process_batch(self, batch): + def process_batch(self, batch, excepts=None): """Process a list of functions to run/apply This is equivalent to calling each function individually @@ -180,6 +192,8 @@ def process_batch(self, batch): batch (list): Each list element is a dictionary with one key, being a function names, value pr key is a dict with keyword arguments to be supplied to each function. + excepts (tuple): Tuple of exceptions that are to be ignored in + each individual realization. Returns: ScratchRealization: This realization object (self), for it to be picked up by ProcessPoolExecutor and pickling. @@ -209,7 +223,15 @@ def process_batch(self, batch): logger.warning("process_batch skips illegal function: %s", fn_name) continue assert isinstance(cmd[fn_name], dict) - getattr(self, fn_name)(**cmd[fn_name]) + if excepts is None: + getattr(self, fn_name)(**cmd[fn_name]) + else: + try: + getattr(self, fn_name)(**cmd[fn_name]) + except excepts as exception: + logger.info( + "Ignoring exception in real %d: %s", self.index, str(exception) + ) return self def runpath(self): @@ -540,12 +562,14 @@ def load_status(self): else: try: hms = list(map(int, jobrow["STARTTIME"].split(":"))) - start = datetime.combine( - date.today(), time(hour=hms[0], minute=hms[1], second=hms[2]) + start = datetime.datetime.combine( + datetime.date.today(), + datetime.time(hour=hms[0], minute=hms[1], second=hms[2]), ) hms = list(map(int, jobrow["ENDTIME"].split(":"))) - end = datetime.combine( - date.today(), time(hour=hms[0], minute=hms[1], second=hms[2]) + end = datetime.datetime.combine( + datetime.date.today(), + datetime.time(hour=hms[0], minute=hms[1], second=hms[2]), ) # This works also when we have crossed 00:00:00. # Jobs > 24 h will be wrong. @@ -918,6 +942,13 @@ def get_eclsum(self, cache=True, include_restart=True): EclSum: object representing the summary file. None if nothing was found. """ + # pylint: disable=import-outside-toplevel + from .common import use_concurrent + + if use_concurrent(): + # In concurrent mode, caching is not used as + # we do not pickle the loaded EclSum objects + cache = False if cache and self._eclsum: # Return cached object if available if self._eclsum_include_restart == include_restart: return self._eclsum @@ -1121,7 +1152,7 @@ def _glob_smry_keys(self, column_keys): keys = set() for key in column_keys: if isinstance(key, str): - keys = keys.union(set(self._eclsum.keys(key))) + keys = keys.union(set(self.get_eclsum().keys(key))) return list(keys) def get_volumetric_rates(self, column_keys=None, time_index=None, time_unit=None): @@ -1135,6 +1166,8 @@ def get_smryvalues(self, props_wildcard=None): """ Fetch selected vectors from Eclipse Summary data. + NOTE: This function might face depreciation. + Args: props_wildcard : string or list of strings with vector wildcards @@ -1142,24 +1175,23 @@ def get_smryvalues(self, props_wildcard=None): a dataframe with values. Raw times from UNSMRY. Empty dataframe if no summary file data available """ - if not self._eclsum: # check if it is cached - self.get_eclsum() - - if not self._eclsum: + if not self.get_eclsum(): + # Return empty, but do not store the empty dataframe in self.data return pd.DataFrame() props = self._glob_smry_keys(props_wildcard) - if "numpy_vector" in dir(self._eclsum): + if "numpy_vector" in dir(self.get_eclsum()): data = { - prop: self._eclsum.numpy_vector(prop, report_only=False) + prop: self.get_eclsum().numpy_vector(prop, report_only=False) for prop in props } else: # get_values() is deprecated in newer libecl data = { - prop: self._eclsum.get_values(prop, report_only=False) for prop in props + prop: self.get_eclsum().get_values(prop, report_only=False) + for prop in props } - dates = self._eclsum.get_dates(report_only=False) + dates = self.get_eclsum().get_dates(report_only=False) return pd.DataFrame(data=data, index=dates) def get_smry_dates( diff --git a/src/fmu/ensemble/virtualensemble.py b/src/fmu/ensemble/virtualensemble.py index 994c93d2..0499b097 100644 --- a/src/fmu/ensemble/virtualensemble.py +++ b/src/fmu/ensemble/virtualensemble.py @@ -7,6 +7,7 @@ import fnmatch import datetime import logging +import time import yaml import numpy as np @@ -641,7 +642,7 @@ def from_disk(self, filesystempath, fmt="parquet", lazy_load=False): lazy_load (bool): If True, loading of dataframes from disk will be postponed until get_df() is actually called. """ - start_time = datetime.datetime.now() + start_time = time.time() if fmt not in ["csv", "parquet"]: raise ValueError("Unknown format for from_disk: %s" % fmt) @@ -717,7 +718,7 @@ def from_disk(self, filesystempath, fmt="parquet", lazy_load=False): # IT MIGHT BE INCORRECT IF LAZY_LOAD... self.update_realindices() - end_time = datetime.datetime.now() + end_time = time.time() if lazy_load: lazy_str = "(lazy) " else: @@ -725,7 +726,7 @@ def from_disk(self, filesystempath, fmt="parquet", lazy_load=False): logger.info( "Loading ensemble from disk %stook %g seconds", lazy_str, - (end_time - start_time).total_seconds(), + end_time - start_time, ) def _load_frame_fromdisk(self, key, filename): diff --git a/tests/test_batch.py b/tests/test_batch.py index 273e13cd..5b44b569 100644 --- a/tests/test_batch.py +++ b/tests/test_batch.py @@ -2,10 +2,13 @@ import os import logging +import time import yaml +import pandas as pd from fmu.ensemble import ScratchEnsemble, EnsembleSet +from fmu.ensemble.common import use_concurrent, set_concurrent logger = logging.getLogger(__name__) @@ -78,3 +81,37 @@ def test_yaml(): assert "OK" in ensset.keys() assert "npv.txt" in ensset.keys() assert not ensset.get_df("unsmry--yearly").empty + + +def sleeper(): + """Sleeps for one second. + + This function must be a module member for it to be + pickled in concurrent applications""" + time.sleep(1) + return pd.DataFrame() + + +def test_speedup(): + """Naive test of speedup in concurrent mode""" + + testdir = os.path.dirname(os.path.abspath(__file__)) + ens = ScratchEnsemble( + "reektest", testdir + "/data/testensemble-reek001/" + "realization-*/iter-0" + ) + + set_concurrent(True) + start_time = time.time() + ens.process_batch(batch=[{"apply": {"callback": sleeper}}]) + end_time = time.time() + conc_elapsed = end_time - start_time + print("FMU_CONCURRENCY: {}".format(use_concurrent())) + print("Elapsed time for concurrent batch apply sleep: {}".format(conc_elapsed)) + + set_concurrent(False) + start_time = time.time() + ens.process_batch(batch=[{"apply": {"callback": sleeper}}]) + end_time = time.time() + seq_elapsed = end_time - start_time + print("FMU_CONCURRENCY: {}".format(use_concurrent())) + print("Elapsed time for sequential batch apply sleep: {}".format(seq_elapsed)) diff --git a/tests/test_ecl2df.py b/tests/test_ecl2df.py index 0c30d13d..933b3e88 100644 --- a/tests/test_ecl2df.py +++ b/tests/test_ecl2df.py @@ -28,6 +28,18 @@ def test_ecl2df_real(): assert "KH" in compdat_df +def extract_compdat(kwargs): + """Callback function to extract compdata data using ecl2df + on a ScratchRealization""" + eclfiles = kwargs["realization"].get_eclfiles() + if not eclfiles: + print( + "Could not obtain EclFiles object for realization " + + str(kwargs["realization"].index) + ) + return ecl2df.compdat.deck2dfs(eclfiles.get_ecldeck())["COMPDAT"] + + def test_reek(): """Import the reek ensemble and apply ecl2df functions on the realizations""" @@ -41,17 +53,6 @@ def test_reek(): "reektest", testdir + "/data/testensemble-reek001/" + "realization-*/iter-0" ) - def extract_compdat(kwargs): - """Callback fnction to extract compdata data using ecl2df - on a ScratchRealization""" - eclfiles = kwargs["realization"].get_eclfiles() - if not eclfiles: - print( - "Could not obtain EclFiles object for realization " - + str(kwargs["realization"].index) - ) - return ecl2df.compdat.deck2dfs(eclfiles.get_ecldeck())["COMPDAT"] - allcompdats = reekens.apply(extract_compdat) assert not allcompdats.empty assert 0 in allcompdats["REAL"] @@ -59,6 +60,18 @@ def extract_compdat(kwargs): # Pr. now, only realization-0 has eclipse/include in git +def get_smry(kwargs): + """Callback function to extract smry data using ecl2df on a + ScratchRealization""" + eclfiles = kwargs["realization"].get_eclfiles() + return ecl2df.summary.df( + eclfiles, + time_index=kwargs["time_index"], + column_keys=kwargs["column_keys"], + datetime=True, + ) + + def test_smry_via_ecl2df(): """Test that we could use ecl2df for smry extraction instead of the native code inside fmu-ensemble. @@ -66,17 +79,6 @@ def test_smry_via_ecl2df(): (This test code was made before fmu-ensemble used ecl2df by default) """ - def get_smry(kwargs): - """Callback function to extract smry data using ecl2df on a - ScratchRealization""" - eclfiles = kwargs["realization"].get_eclfiles() - return ecl2df.summary.df( - eclfiles, - time_index=kwargs["time_index"], - column_keys=kwargs["column_keys"], - datetime=True, - ) - if "__file__" in globals(): testdir = os.path.dirname(os.path.abspath(__file__)) else: diff --git a/tests/test_ensemble.py b/tests/test_ensemble.py index 75552dd2..0d7a3f5d 100644 --- a/tests/test_ensemble.py +++ b/tests/test_ensemble.py @@ -13,6 +13,7 @@ from .test_ensembleset import symlink_iter from fmu.ensemble import ScratchEnsemble, ScratchRealization +from fmu.ensemble.common import use_concurrent try: @@ -325,8 +326,13 @@ def test_noautodiscovery(): ) # Default ensemble construction will include auto-discovery, check # that we got that: - assert not reekensemble.get_smry(column_keys="FOPT").empty + assert not reekensemble.load_smry(column_keys="FOPT").empty assert "UNSMRY" in reekensemble.files["FILETYPE"].values + # (beware that get_smry() behaves differently depending + # on whether it is run concurrently or not, sequential + # running of get_smry will lead to UNSMRY being discovered, + # while in concurrent mode the realization object where it + # is discovered is thrown away) # Now try again, with no autodiscovery reekensemble = ScratchEnsemble( @@ -810,6 +816,14 @@ def test_ertrunpathfile(): # because ECLBASE is given in the runpathfile assert sum(["UNSMRY" in x for x in ens.files["BASENAME"].unique()]) == 5 + # Run once more to test runpathfilter: + ens = ScratchEnsemble( + "filtensfromrunpath", + runpathfile=testdir + "/data/ert-runpath-file", + runpathfilter="realization-3", + ) + assert len(ens) == 1 + assert ens[3].index == 3 os.chdir(cwd) @@ -829,7 +843,10 @@ def test_nonexisting(): def test_eclsumcaching(): - """Test caching of eclsum""" + """Test caching of eclsum, but only if we don't use concurrency""" + + if use_concurrent(): + pytest.skip("Not testing caching when we use concurrency") if "__file__" in globals(): # Easen up copying test code into interactive sessions @@ -928,6 +945,26 @@ def test_read_eclgrid(): assert len(grid_df["i"]) == 35840 +def fipnum2zone(): + """Helper function for injecting mocked frame into + each realization + + This function must be global to the module for + concurrent.futures to able to pickle it. + """ + return pd.DataFrame( + columns=["FIPNUM", "ZONE"], + data=[ + [1, "UpperReek"], + [2, "MidReek"], + [3, "LowerReek"], + [4, "UpperReek"], + [5, "MidReek"], + [6, "LowerReek"], + ], + ) + + def test_get_df(): """Test the data retrieval functionality @@ -970,21 +1007,6 @@ def test_get_df(): # Inject a mocked dataframe to the realization, there is # no "add_data" API for ensembles, but we can use the apply() # functionality - def fipnum2zone(): - """Helper function for injecting mocked frame into - each realization""" - return pd.DataFrame( - columns=["FIPNUM", "ZONE"], - data=[ - [1, "UpperReek"], - [2, "MidReek"], - [3, "LowerReek"], - [4, "UpperReek"], - [5, "MidReek"], - [6, "LowerReek"], - ], - ) - ens.apply(fipnum2zone, localpath="fipnum2zone") volframe = ens.get_df("simulator_volume_fipnum", merge="fipnum2zone") @@ -1001,6 +1023,22 @@ def fipnum2zone(): assert "npv.txt" in vol_npv +# Function to be given to apply() as picklable callback: +def ex_func1(): + """Example function that will return a constant dataframe""" + return pd.DataFrame(index=["1", "2"], columns=["foo", "bar"], data=[[1, 2], [3, 4]]) + + +# Function to be given to apply() as picklable callback +def rms_vol2df(kwargs): + """Example function for bridging with fmu.tools to parse volumetrics""" + fullpath = os.path.join(kwargs["realization"].runpath(), kwargs["filename"]) + # The supplied callback should not fail too easy. + if os.path.exists(fullpath): + return volumetrics.rmsvolumetrics_txt2df(fullpath) + return pd.DataFrame() + + def test_apply(tmpdir): """ Test the callback functionality @@ -1017,12 +1055,6 @@ def test_apply(tmpdir): ens = ScratchEnsemble("reektest", "realization-*/iter-0") - def ex_func1(): - """Example function that will return a constant dataframe""" - return pd.DataFrame( - index=["1", "2"], columns=["foo", "bar"], data=[[1, 2], [3, 4]] - ) - result = ens.apply(ex_func1) assert isinstance(result, pd.DataFrame) assert "REAL" in result.columns diff --git a/tests/test_ensembleset.py b/tests/test_ensembleset.py index b002ffe3..658219f3 100644 --- a/tests/test_ensembleset.py +++ b/tests/test_ensembleset.py @@ -37,6 +37,15 @@ def symlink_iter(origensdir, newitername): ) +def rms_vol2df(kwargs): + """Callback function to be sent to ensemble objects""" + fullpath = os.path.join(kwargs["realization"].runpath(), kwargs["filename"]) + # The supplied callback should not fail too easy. + if os.path.exists(fullpath): + return volumetrics.rmsvolumetrics_txt2df(fullpath) + return pd.DataFrame() + + def test_ensembleset_reek001(tmpdir): """Test import of a stripped 5 realization ensemble, manually doubled to two identical ensembles @@ -202,15 +211,7 @@ def test_ensembleset_reek001(tmpdir): assert len(ensset3.keys()) == predel_len - 1 # Test callback functionality, that we can convert rms - # volumetrics in each realization. First we need a - # wrapper which is able to work on ScratchRealizations. - def rms_vol2df(kwargs): - """Callback function to be sent to ensemble objects""" - fullpath = os.path.join(kwargs["realization"].runpath(), kwargs["filename"]) - # The supplied callback should not fail too easy. - if os.path.exists(fullpath): - return volumetrics.rmsvolumetrics_txt2df(fullpath) - return pd.DataFrame() + # volumetrics in each realization. if not SKIP_FMU_TOOLS: rmsvols_df = ensset3.apply( diff --git a/tests/test_webviz_subsurface_testdata.py b/tests/test_webviz_subsurface_testdata.py new file mode 100644 index 00000000..4e4c40d6 --- /dev/null +++ b/tests/test_webviz_subsurface_testdata.py @@ -0,0 +1,115 @@ +"""Testing loading the webviz subsurface testdata.""" + +import os +import time + +import pytest + +from fmu.ensemble import ScratchEnsemble +from fmu.ensemble.common import use_concurrent, set_concurrent + + +def check_testdata(): + """Check if we have webviz subsurface testdata, skip if not""" + testdir = os.path.dirname(os.path.abspath(__file__)) + if not os.path.exists(os.path.join(testdir, "data/webviz-subsurface-testdata")): + print("Skipping loading webviz-subsurface-testdata") + print("Do") + print(" $ cd tests/data") + print( + " $ git clone --depth 1 " + "https://github.com/equinor/webviz-subsurface-testdata" + ) + print("to download and use with pytest") + pytest.skip() + + +def test_webviz_subsurface_testdata(): + """Check that we can load the webviz subsurface testdata""" + + check_testdata() + set_concurrent(True) + + if "__file__" in globals(): + # Easen up copying test code into interactive sessions + testdir = os.path.dirname(os.path.abspath(__file__)) + else: + testdir = os.path.abspath(".") + + ensdir = os.path.join(testdir, "data/webviz-subsurface-testdata/reek_fullmatrix/") + ens = ScratchEnsemble("reek_fullmatrix", ensdir + "realization-*/iter-0") + + smry_monthly = ens.load_smry() + assert "REAL" in smry_monthly + assert len(smry_monthly["REAL"].unique()) == 40 + + ens.load_csv("share/results/tables/relperm.csv") + ens.load_csv("share/results/tables/equil.csv") + ens.load_csv("share/results/tables/rft.csv") + ens.load_csv("share/results/tables/pvt.csv") + + ens.load_csv("share/results/volumes/simulator_volume_fipnum.csv") + ens.load_csv("share/results/volumes/geogrid--oil.csv") + ens.load_csv("share/results/volumes/simgrid--oil.csv") + + assert len(ens.keys()) == 11 + + +def _do_load_webviz_subsurface_testdata_batch(): + """Example workload for testing concurrency efficiency""" + testdir = os.path.dirname(os.path.abspath(__file__)) + ensdir = os.path.join(testdir, "data/webviz-subsurface-testdata/reek_fullmatrix/") + batch_cmds = [ + {"load_smry": {"column_keys": "*", "time_index": "yearly"}}, + {"load_smry": {"column_keys": "*", "time_index": "monthly"}}, + {"load_smry": {"column_keys": "*", "time_index": "last"}}, + {"load_smry": {"column_keys": "*", "time_index": "daily"}}, + {"load_csv": {"localpath": "share/results/tables/relperm.csv"}}, + {"load_csv": {"localpath": "share/results/tables/equil.csv"}}, + {"load_csv": {"localpath": "share/results/tables/rft.csv"}}, + {"load_csv": {"localpath": "share/results/tables/pvt.csv"}}, + { + "load_csv": { + "localpath": "share/results/volumes/simulator_volume_fipnum.csv" + } + }, + {"load_csv": {"localpath": "share/results/volumes/geogrid--oil.csv"}}, + {"load_csv": {"localpath": "share/results/volumes/simgrid--oil.csv"}}, + ] + ens = ScratchEnsemble( + "reek_fullmatrix", ensdir + "realization-*/iter-0", batch=batch_cmds + ) + assert len(ens.keys()) == 3 + len(batch_cmds) # 3 more than length of batch + + +def test_webviz_subsurface_testdata_batch(): + """Check that we can load the webviz subsurface testdata in batch + + Also display timings, this should hopefully reveal that concurrent operations + are actually faster. + """ + + check_testdata() + start_time = time.time() + _do_load_webviz_subsurface_testdata_batch() + end_time = time.time() + elapsed = end_time - start_time + print("FMU_CONCURRENCY: {}".format(use_concurrent())) + print("Elapsed time for batch ensemble initialization: {}".format(elapsed)) + + +def test_webviz_subsurface_testdata_sequential_batch(): + """Check that we can load the webviz subsurface testdata in batch + + Also display timings, this should hopefully reveal that concurrent operations + are actually faster. + """ + + check_testdata() + set_concurrent(False) + start_time = time.time() + _do_load_webviz_subsurface_testdata_batch() + end_time = time.time() + elapsed = end_time - start_time + print("FMU_CONCURRENCY: {}".format(use_concurrent())) + print("Elapsed time for batch ensemble initialization: {}".format(elapsed))