diff --git a/.github/workflows/fmu-ensemble.yml b/.github/workflows/fmu-ensemble.yml
index d656179f..43c6b5d3 100644
--- a/.github/workflows/fmu-ensemble.yml
+++ b/.github/workflows/fmu-ensemble.yml
@@ -21,6 +21,7 @@ jobs:
strategy:
matrix:
python-version: ['3.6', '3.7', '3.8']
+ FMU_CONCURRENCY: ['True', 'False']
steps:
- name: 📖 Checkout commit locally
diff --git a/src/fmu/ensemble/common.py b/src/fmu/ensemble/common.py
new file mode 100644
index 00000000..ba3267a4
--- /dev/null
+++ b/src/fmu/ensemble/common.py
@@ -0,0 +1,63 @@
+"""Common functions for fmu.ensemble"""
+
+import os
+import sys
+
+import six
+
+from .etc import Interaction
+
+fmux = Interaction()
+logger = fmux.basiclogger(__name__)
+
+ENV_NAME = "FMU_CONCURRENCY"
+
+
+def use_concurrent():
+ """Determine whether we should use concurrency or not
+
+ This is based on both an environment variable
+ and presence of concurrent.futures, and on Python version
+ (Py2 deliberately not attempted to support)
+
+ Returns:
+ bool: True if concurrency mode should be used
+ """
+ if six.PY2:
+ # Py2-support not attempted
+ return False
+ if "concurrent.futures" in sys.modules:
+ if ENV_NAME not in os.environ:
+ return True
+ env_var = os.environ[ENV_NAME]
+ if (
+ str(env_var) == "0"
+ or str(env_var).lower() == "false"
+ or str(env_var).lower() == "no"
+ ):
+ return False
+ return True
+ # If concurrent.futures is not available to import, we end here.
+ return False
+
+
+def set_concurrent(concurrent):
+ """Set the concurrency mode used by fmu.ensemble.
+
+ This is done through modifying the enviroment variable
+ for the current Python process
+
+ If concurrency is asked for by but not possible, a warning
+ will be printed and the code will continue in sequential mode.
+
+ Args:
+ concurrent (bool): Set to True if concurrent mode is requested,
+ False if not.
+ """
+ if isinstance(concurrent, bool):
+ os.environ[ENV_NAME] = str(concurrent)
+ else:
+ raise TypeError
+ # Check for success:
+ if concurrent and not use_concurrent():
+ logger.warning("Unable to activate concurrent code")
diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py
index b2604d43..c12691f3 100644
--- a/src/fmu/ensemble/ensemble.py
+++ b/src/fmu/ensemble/ensemble.py
@@ -4,6 +4,7 @@
import os
import glob
import logging
+from concurrent.futures import ProcessPoolExecutor
import dateutil
import pandas as pd
@@ -12,7 +13,6 @@
from ecl import EclDataType
from ecl.eclfile import EclKW
-from .etc import Interaction # noqa
from .realization import ScratchRealization
from .virtualrealization import VirtualRealization
from .virtualensemble import VirtualEnsemble
@@ -20,6 +20,7 @@
from .realization import parse_number
from .util import shortcut2path
from .util.dates import unionize_smry_dates
+from .common import use_concurrent
logger = logging.getLogger(__name__)
@@ -60,8 +61,8 @@ class ScratchEnsemble(object):
or relative path to a realization RUNPATH, third column is
the basename of the Eclipse simulation, relative to RUNPATH.
Fourth column is not used.
- runpathfilter (str): If supplied, the only the runpaths in
- the runpathfile which contains this string will be included
+ runpathfilter (str): If supplied, only the runpaths in
+ the runpathfile which contain this string will be included
Use to select only a specific realization f.ex.
autodiscovery (boolean): True by default, means that the class
can try to autodiscover data in the realization. Turn
@@ -180,9 +181,12 @@ def add_realizations(
Args:
paths (list/str): String or list of strings with wildcards
to file system. Absolute or relative paths.
+ realidxregexp (str): Passed on to ScratchRealization init,
+ used to determine index from the path.
autodiscovery (boolean): whether files can be attempted
auto-discovered
- batch (list): Batch commands sent to each realization.
+ batch (list): Batch commands sent to each realization for
+ immediate execution after initialization.
Returns:
count (int): Number of realizations successfully added.
@@ -195,13 +199,29 @@ def add_realizations(
globbedpaths = glob.glob(paths)
count = 0
- for realdir in globbedpaths:
- realization = ScratchRealization(
- realdir,
- realidxregexp=realidxregexp,
- autodiscovery=autodiscovery,
- batch=batch,
- )
+ if use_concurrent():
+ with ProcessPoolExecutor() as executor:
+ loaded_reals = [
+ executor.submit(
+ ScratchRealization,
+ realdir,
+ realidxregexp=realidxregexp,
+ autodiscovery=autodiscovery,
+ batch=batch,
+ ).result()
+ for realdir in globbedpaths
+ ]
+ else:
+ loaded_reals = [
+ ScratchRealization(
+ realdir,
+ realidxregexp=realidxregexp,
+ autodiscovery=autodiscovery,
+ batch=batch,
+ )
+ for realdir in globbedpaths
+ ]
+ for realdir, realization in zip(globbedpaths, loaded_reals):
if realization.index is None:
logger.critical(
"Could not determine realization index for path %s", realdir
@@ -259,21 +279,46 @@ def add_from_runpathfile(self, runpath, runpathfilter=None, batch=None):
):
raise ValueError("runpath dataframe not correct")
- for _, row in runpath_df.iterrows():
- if runpathfilter and runpathfilter not in row["runpath"]:
- continue
- logger.info("Adding realization from %s", row["runpath"])
- realization = ScratchRealization(
- row["runpath"],
- index=int(row["index"]),
- autodiscovery=False,
- batch=batch,
+ if runpathfilter:
+ runpath_df = runpath_df[runpath_df["runpath"].str.contains(runpathfilter)]
+
+ if use_concurrent():
+ logger.info(
+ "Loading %s realizations concurrently from runpathfile",
+ str(len(runpath_df)),
+ )
+ with ProcessPoolExecutor() as executor:
+ loaded_reals = [
+ executor.submit(
+ ScratchRealization,
+ row.runpath,
+ index=int(row.index),
+ autodiscovery=False,
+ find_files=[
+ row.eclbase + ".DATA",
+ row.eclbase + ".UNSMRY",
+ ],
+ batch=batch,
+ ).result()
+ for row in runpath_df.itertuples()
+ ]
+ else:
+ logger.info(
+ "Loading %s realizations sequentially from runpathfile",
+ str(len(runpath_df)),
)
- # Use the ECLBASE from the runpath file to
- # ensure we recognize the correct UNSMRY file
- realization.find_files(row["eclbase"] + ".DATA")
- realization.find_files(row["eclbase"] + ".UNSMRY")
- self.realizations[int(row["index"])] = realization
+ loaded_reals = [
+ ScratchRealization(
+ row.runpath,
+ index=int(row.index),
+ autodiscovery=False,
+ find_files=[row.eclbase + ".DATA", row.eclbase + ".UNSMRY"],
+ batch=batch,
+ )
+ for row in runpath_df.itertuples()
+ ]
+ for real in loaded_reals:
+ self.realizations[real.index] = real
return len(self) - prelength
@@ -307,8 +352,11 @@ def remove_realizations(self, realindices):
realindices = [realindices]
popped = 0
for index in realindices:
- self.realizations.pop(index, None)
- popped += 1
+ if index in self.realizations.keys():
+ self.realizations.pop(index, None)
+ popped += 1
+ else:
+ logger.warning("Can't remove realization %d, it is not there", index)
logger.info("removed %d realization(s)", popped)
def to_virtual(self, name=None):
@@ -502,18 +550,18 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals
pd.Dataframe: with loaded data aggregated. Column 'REAL'
distuinguishes each realizations data.
"""
- for index, realization in self.realizations.items():
- try:
- realization.load_file(localpath, fformat, convert_numeric, force_reread)
- except ValueError as exc:
- # This would at least occur for unsupported fileformat,
- # and that we should not skip.
- logger.critical("load_file() failed in realization %d", index)
- raise ValueError from exc
- except IOError:
- # At ensemble level, we allow files to be missing in
- # some realizations
- logger.warning("Could not read %s for realization %d", localpath, index)
+ self.process_batch(
+ batch=[
+ {
+ "load_file": {
+ "localpath": localpath,
+ "fformat": fformat,
+ "convert_numeric": convert_numeric,
+ "force_reread": force_reread,
+ }
+ }
+ ]
+ )
if self.get_df(localpath).empty:
raise ValueError("No ensemble data found for {}".format(localpath))
return self.get_df(localpath)
@@ -669,6 +717,10 @@ def get_df(self, localpath, merge=None):
"""
dflist = {}
for index, realization in self.realizations.items():
+ # There is probably no gain from running this concurrently
+ # over the realizations. Each realization object holds
+ # the dataframes in memory already, so retrieval
+ # is at no cost.
try:
data = realization.get_df(localpath, merge=merge)
if isinstance(data, dict):
@@ -762,25 +814,34 @@ def load_smry(
"""
if not stacked:
raise NotImplementedError
- # Future: Multithread this!
- for realidx, realization in self.realizations.items():
- # We do not store the returned DataFrames here,
- # instead we look them up afterwards using get_df()
- # Downside is that we have to compute the name of the
- # cached object as it is not returned.
- logger.info("Loading smry from realization %s", realidx)
- realization.load_smry(
- time_index=time_index,
- column_keys=column_keys,
- cache_eclsum=cache_eclsum,
- start_date=start_date,
- end_date=end_date,
- include_restart=include_restart,
- )
+
+ # process_batch() will modify each Realization object
+ # and add the loaded smry data to the list of internalized
+ # data, under a well-defined name (
/unsmry--.csv)
+
+ # Since load_smry() also should return the aggregation
+ # of the loaded smry data, we need to pick up this
+ # data and aggregate it.
+ self.process_batch(
+ batch=[
+ {
+ "load_smry": {
+ "column_keys": column_keys,
+ "time_index": time_index,
+ "cache_eclsum": cache_eclsum,
+ "start_date": start_date,
+ "end_date": end_date,
+ "include_restart": include_restart,
+ }
+ }
+ ]
+ )
if isinstance(time_index, (list, np.ndarray)):
time_index = "custom"
elif time_index is None:
time_index = "raw"
+ # Note the dependency that the load_smry() function in
+ # ScratchRealization will store to this key-name:
return self.get_df("share/results/tables/unsmry--" + time_index + ".csv")
def get_volumetric_rates(self, column_keys=None, time_index=None):
@@ -921,12 +982,31 @@ def process_batch(self, batch=None):
ScratchEnsemble: This ensemble object (self), for it
to be picked up by ProcessPoolExecutor and pickling.
"""
- for realization in self.realizations.values():
- realization.process_batch(batch)
+ if use_concurrent():
+ with ProcessPoolExecutor() as executor:
+ real_indices = self.realizations.keys()
+ futures_reals = [
+ executor.submit(
+ real.process_batch, batch, excepts=(OSError, IOError)
+ )
+ for real in self.realizations.values()
+ ]
+ # Reassemble the realization dictionary from
+ # the pickled results of the ProcessPool:
+ self.realizations = {
+ r_idx: real
+ for (r_idx, real) in zip(
+ real_indices, [x.result() for x in futures_reals]
+ )
+ }
+ else:
+ for realization in self.realizations.values():
+ realization.process_batch(batch, excepts=(OSError, IOError))
+
return self
def apply(self, callback, **kwargs):
- """Callback functionalty, apply a function to every realization
+ """Callback functionality, apply a function to every realization
The supplied function handle will be handed over to
each underlying realization object. The function supplied
@@ -946,16 +1026,47 @@ def apply(self, callback, **kwargs):
pd.DataFrame, aggregated result of the supplied function
on each realization.
"""
- results = []
logger.info("Ensemble %s is running callback %s", self.name, str(callback))
- for realidx, realization in self.realizations.items():
- result = realization.apply(callback, **kwargs).copy()
- # (we took a copy since we are modifying it here:)
- # Todo: Avoid copy by concatenatint a dict of dataframes
- # where realization index is the dict keys.
- result["REAL"] = realidx
- results.append(result)
- return pd.concat(results, sort=False, ignore_index=True)
+
+ # It is tempting to just call process_batch() here, but then we
+ # don't know how to collect the results from this particular
+ # apply() operation (if we enforced nonempty localpath, we could)
+ # > kwargs["callback"] = callback
+ # > ens.process_batch(batch=[{"apply": **kwargs}]) # (untested)
+ if use_concurrent():
+ with ProcessPoolExecutor() as executor:
+ real_indices = self.realizations.keys()
+ kwargs["excepts"] = (OSError, IOError)
+ futures_reals = [
+ executor.submit(real.apply, callback, **kwargs)
+ for real in self.realizations.values()
+ ]
+ # Reassemble a list of dataframes from the pickled results
+ # of the ProcessPool:
+ dframes_dict_from_apply = {
+ realidx: dframe
+ for (realidx, dframe) in zip(
+ real_indices, [x.result() for x in futures_reals]
+ )
+ }
+ # If localpath is an argument to the apply function, we not only
+ # need to return the data aggregated, but should also modify
+ # the realization data-dictionary for each member of the ensemble
+ # object.
+ if "localpath" in kwargs:
+ for realidx, dataframe in dframes_dict_from_apply.items():
+ self.realizations[realidx].data[kwargs["localpath"]] = dataframe
+ dframes_from_apply = [
+ dframe.assign(REAL=realidx)
+ for (realidx, dframe) in dframes_dict_from_apply.items()
+ ]
+
+ else:
+ dframes_from_apply = [
+ realization.apply(callback, **kwargs).assign(REAL=realidx)
+ for (realidx, realization) in self.realizations.items()
+ ]
+ return pd.concat(dframes_from_apply, sort=False, ignore_index=True)
def get_smry_dates(
self,
@@ -1374,19 +1485,44 @@ def get_smry(
end_date=end_date,
include_restart=include_restart,
)
- dflist = []
- for index, realization in self.realizations.items():
- dframe = realization.get_smry(
- time_index=time_index,
- column_keys=column_keys,
- cache_eclsum=cache_eclsum,
- include_restart=include_restart,
- )
- dframe.insert(0, "REAL", index)
- dframe.index.name = "DATE"
- dflist.append(dframe)
- if dflist:
- return pd.concat(dflist, sort=False).reset_index()
+ dframes = []
+ if use_concurrent():
+ with ProcessPoolExecutor() as executor:
+ real_indices = self.realizations.keys()
+ # Note that we cannot use process_batch()
+ # here as we need dataframes in return, not
+ # realizations.
+ futures_dframes = [
+ executor.submit(
+ realization.get_smry,
+ time_index=time_index,
+ column_keys=column_keys,
+ cache_eclsum=cache_eclsum,
+ include_restart=include_restart,
+ )
+ for realization in self.realizations.values()
+ ]
+ # Reassemble a list of dataframes from the pickled results
+ # of the ProcessPool:
+ for realidx, dframe in zip(
+ real_indices, [x.result() for x in futures_dframes]
+ ):
+ dframes.append(dframe.assign(REAL=realidx).rename_axis("DATE"))
+ else:
+ # Sequential version:
+ for realidx, realization in self.realizations.items():
+ dframes.append(
+ realization.get_smry(
+ time_index=time_index,
+ column_keys=column_keys,
+ cache_eclsum=cache_eclsum,
+ include_restart=include_restart,
+ )
+ .assign(REAL=realidx)
+ .rename_axis("DATE")
+ )
+ if dframes:
+ return pd.concat(dframes, sort=False).reset_index()
return pd.DataFrame()
def get_eclgrid(self, props, report=0, agg="mean", active_only=False):
diff --git a/src/fmu/ensemble/realization.py b/src/fmu/ensemble/realization.py
index d748df06..07d0bba8 100644
--- a/src/fmu/ensemble/realization.py
+++ b/src/fmu/ensemble/realization.py
@@ -12,7 +12,7 @@
import copy
import glob
import json
-from datetime import datetime, date, time
+import datetime
import dateutil
import logging
@@ -78,10 +78,18 @@ class ScratchRealization(object):
should be run at time of initialization. Each element is a
length 1 dictionary with the function name to run as the key
and each keys value should be the function arguments as a dict.
+ find_files (list of str): Each element in this list will be given
+ to find_files() before any batch commands are processed.
"""
def __init__(
- self, path, realidxregexp=None, index=None, autodiscovery=True, batch=None
+ self,
+ path,
+ realidxregexp=None,
+ index=None,
+ autodiscovery=True,
+ batch=None,
+ find_files=None,
):
self._origpath = os.path.abspath(path)
self.index = None
@@ -101,7 +109,7 @@ def __init__(
self.eclfiles = None # ecl2df.EclFiles object
self._eclsum = None # Placeholder for caching
- self._eclsum_include_restart = None # Flag for cached object
+ self._eclsum_include_restart = None
# The datastore for internalized data. Dictionary
# indexed by filenames (local to the realization).
@@ -163,12 +171,16 @@ def __init__(
if os.path.exists(os.path.join(abspath, "parameters.txt")):
self.load_txt("parameters.txt")
+ if find_files is not None:
+ for to_find in find_files:
+ self.find_files(to_find)
+
if batch:
self.process_batch(batch)
logger.info("Initialized %s", abspath)
- def process_batch(self, batch):
+ def process_batch(self, batch, excepts=None):
"""Process a list of functions to run/apply
This is equivalent to calling each function individually
@@ -180,6 +192,8 @@ def process_batch(self, batch):
batch (list): Each list element is a dictionary with one key,
being a function names, value pr key is a dict with keyword
arguments to be supplied to each function.
+ excepts (tuple): Tuple of exceptions that are to be ignored in
+ each individual realization.
Returns:
ScratchRealization: This realization object (self), for it
to be picked up by ProcessPoolExecutor and pickling.
@@ -209,7 +223,15 @@ def process_batch(self, batch):
logger.warning("process_batch skips illegal function: %s", fn_name)
continue
assert isinstance(cmd[fn_name], dict)
- getattr(self, fn_name)(**cmd[fn_name])
+ if excepts is None:
+ getattr(self, fn_name)(**cmd[fn_name])
+ else:
+ try:
+ getattr(self, fn_name)(**cmd[fn_name])
+ except excepts as exception:
+ logger.info(
+ "Ignoring exception in real %d: %s", self.index, str(exception)
+ )
return self
def runpath(self):
@@ -540,12 +562,14 @@ def load_status(self):
else:
try:
hms = list(map(int, jobrow["STARTTIME"].split(":")))
- start = datetime.combine(
- date.today(), time(hour=hms[0], minute=hms[1], second=hms[2])
+ start = datetime.datetime.combine(
+ datetime.date.today(),
+ datetime.time(hour=hms[0], minute=hms[1], second=hms[2]),
)
hms = list(map(int, jobrow["ENDTIME"].split(":")))
- end = datetime.combine(
- date.today(), time(hour=hms[0], minute=hms[1], second=hms[2])
+ end = datetime.datetime.combine(
+ datetime.date.today(),
+ datetime.time(hour=hms[0], minute=hms[1], second=hms[2]),
)
# This works also when we have crossed 00:00:00.
# Jobs > 24 h will be wrong.
@@ -918,6 +942,13 @@ def get_eclsum(self, cache=True, include_restart=True):
EclSum: object representing the summary file. None if
nothing was found.
"""
+ # pylint: disable=import-outside-toplevel
+ from .common import use_concurrent
+
+ if use_concurrent():
+ # In concurrent mode, caching is not used as
+ # we do not pickle the loaded EclSum objects
+ cache = False
if cache and self._eclsum: # Return cached object if available
if self._eclsum_include_restart == include_restart:
return self._eclsum
@@ -1121,7 +1152,7 @@ def _glob_smry_keys(self, column_keys):
keys = set()
for key in column_keys:
if isinstance(key, str):
- keys = keys.union(set(self._eclsum.keys(key)))
+ keys = keys.union(set(self.get_eclsum().keys(key)))
return list(keys)
def get_volumetric_rates(self, column_keys=None, time_index=None, time_unit=None):
@@ -1135,6 +1166,8 @@ def get_smryvalues(self, props_wildcard=None):
"""
Fetch selected vectors from Eclipse Summary data.
+ NOTE: This function might face depreciation.
+
Args:
props_wildcard : string or list of strings with vector
wildcards
@@ -1142,24 +1175,23 @@ def get_smryvalues(self, props_wildcard=None):
a dataframe with values. Raw times from UNSMRY.
Empty dataframe if no summary file data available
"""
- if not self._eclsum: # check if it is cached
- self.get_eclsum()
-
- if not self._eclsum:
+ if not self.get_eclsum():
+ # Return empty, but do not store the empty dataframe in self.data
return pd.DataFrame()
props = self._glob_smry_keys(props_wildcard)
- if "numpy_vector" in dir(self._eclsum):
+ if "numpy_vector" in dir(self.get_eclsum()):
data = {
- prop: self._eclsum.numpy_vector(prop, report_only=False)
+ prop: self.get_eclsum().numpy_vector(prop, report_only=False)
for prop in props
}
else: # get_values() is deprecated in newer libecl
data = {
- prop: self._eclsum.get_values(prop, report_only=False) for prop in props
+ prop: self.get_eclsum().get_values(prop, report_only=False)
+ for prop in props
}
- dates = self._eclsum.get_dates(report_only=False)
+ dates = self.get_eclsum().get_dates(report_only=False)
return pd.DataFrame(data=data, index=dates)
def get_smry_dates(
diff --git a/src/fmu/ensemble/virtualensemble.py b/src/fmu/ensemble/virtualensemble.py
index 994c93d2..0499b097 100644
--- a/src/fmu/ensemble/virtualensemble.py
+++ b/src/fmu/ensemble/virtualensemble.py
@@ -7,6 +7,7 @@
import fnmatch
import datetime
import logging
+import time
import yaml
import numpy as np
@@ -641,7 +642,7 @@ def from_disk(self, filesystempath, fmt="parquet", lazy_load=False):
lazy_load (bool): If True, loading of dataframes from disk
will be postponed until get_df() is actually called.
"""
- start_time = datetime.datetime.now()
+ start_time = time.time()
if fmt not in ["csv", "parquet"]:
raise ValueError("Unknown format for from_disk: %s" % fmt)
@@ -717,7 +718,7 @@ def from_disk(self, filesystempath, fmt="parquet", lazy_load=False):
# IT MIGHT BE INCORRECT IF LAZY_LOAD...
self.update_realindices()
- end_time = datetime.datetime.now()
+ end_time = time.time()
if lazy_load:
lazy_str = "(lazy) "
else:
@@ -725,7 +726,7 @@ def from_disk(self, filesystempath, fmt="parquet", lazy_load=False):
logger.info(
"Loading ensemble from disk %stook %g seconds",
lazy_str,
- (end_time - start_time).total_seconds(),
+ end_time - start_time,
)
def _load_frame_fromdisk(self, key, filename):
diff --git a/tests/test_batch.py b/tests/test_batch.py
index 273e13cd..5b44b569 100644
--- a/tests/test_batch.py
+++ b/tests/test_batch.py
@@ -2,10 +2,13 @@
import os
import logging
+import time
import yaml
+import pandas as pd
from fmu.ensemble import ScratchEnsemble, EnsembleSet
+from fmu.ensemble.common import use_concurrent, set_concurrent
logger = logging.getLogger(__name__)
@@ -78,3 +81,37 @@ def test_yaml():
assert "OK" in ensset.keys()
assert "npv.txt" in ensset.keys()
assert not ensset.get_df("unsmry--yearly").empty
+
+
+def sleeper():
+ """Sleeps for one second.
+
+ This function must be a module member for it to be
+ pickled in concurrent applications"""
+ time.sleep(1)
+ return pd.DataFrame()
+
+
+def test_speedup():
+ """Naive test of speedup in concurrent mode"""
+
+ testdir = os.path.dirname(os.path.abspath(__file__))
+ ens = ScratchEnsemble(
+ "reektest", testdir + "/data/testensemble-reek001/" + "realization-*/iter-0"
+ )
+
+ set_concurrent(True)
+ start_time = time.time()
+ ens.process_batch(batch=[{"apply": {"callback": sleeper}}])
+ end_time = time.time()
+ conc_elapsed = end_time - start_time
+ print("FMU_CONCURRENCY: {}".format(use_concurrent()))
+ print("Elapsed time for concurrent batch apply sleep: {}".format(conc_elapsed))
+
+ set_concurrent(False)
+ start_time = time.time()
+ ens.process_batch(batch=[{"apply": {"callback": sleeper}}])
+ end_time = time.time()
+ seq_elapsed = end_time - start_time
+ print("FMU_CONCURRENCY: {}".format(use_concurrent()))
+ print("Elapsed time for sequential batch apply sleep: {}".format(seq_elapsed))
diff --git a/tests/test_ecl2df.py b/tests/test_ecl2df.py
index 0c30d13d..933b3e88 100644
--- a/tests/test_ecl2df.py
+++ b/tests/test_ecl2df.py
@@ -28,6 +28,18 @@ def test_ecl2df_real():
assert "KH" in compdat_df
+def extract_compdat(kwargs):
+ """Callback function to extract compdata data using ecl2df
+ on a ScratchRealization"""
+ eclfiles = kwargs["realization"].get_eclfiles()
+ if not eclfiles:
+ print(
+ "Could not obtain EclFiles object for realization "
+ + str(kwargs["realization"].index)
+ )
+ return ecl2df.compdat.deck2dfs(eclfiles.get_ecldeck())["COMPDAT"]
+
+
def test_reek():
"""Import the reek ensemble and apply ecl2df functions on
the realizations"""
@@ -41,17 +53,6 @@ def test_reek():
"reektest", testdir + "/data/testensemble-reek001/" + "realization-*/iter-0"
)
- def extract_compdat(kwargs):
- """Callback fnction to extract compdata data using ecl2df
- on a ScratchRealization"""
- eclfiles = kwargs["realization"].get_eclfiles()
- if not eclfiles:
- print(
- "Could not obtain EclFiles object for realization "
- + str(kwargs["realization"].index)
- )
- return ecl2df.compdat.deck2dfs(eclfiles.get_ecldeck())["COMPDAT"]
-
allcompdats = reekens.apply(extract_compdat)
assert not allcompdats.empty
assert 0 in allcompdats["REAL"]
@@ -59,6 +60,18 @@ def extract_compdat(kwargs):
# Pr. now, only realization-0 has eclipse/include in git
+def get_smry(kwargs):
+ """Callback function to extract smry data using ecl2df on a
+ ScratchRealization"""
+ eclfiles = kwargs["realization"].get_eclfiles()
+ return ecl2df.summary.df(
+ eclfiles,
+ time_index=kwargs["time_index"],
+ column_keys=kwargs["column_keys"],
+ datetime=True,
+ )
+
+
def test_smry_via_ecl2df():
"""Test that we could use ecl2df for smry extraction instead
of the native code inside fmu-ensemble.
@@ -66,17 +79,6 @@ def test_smry_via_ecl2df():
(This test code was made before fmu-ensemble used ecl2df by default)
"""
- def get_smry(kwargs):
- """Callback function to extract smry data using ecl2df on a
- ScratchRealization"""
- eclfiles = kwargs["realization"].get_eclfiles()
- return ecl2df.summary.df(
- eclfiles,
- time_index=kwargs["time_index"],
- column_keys=kwargs["column_keys"],
- datetime=True,
- )
-
if "__file__" in globals():
testdir = os.path.dirname(os.path.abspath(__file__))
else:
diff --git a/tests/test_ensemble.py b/tests/test_ensemble.py
index 75552dd2..0d7a3f5d 100644
--- a/tests/test_ensemble.py
+++ b/tests/test_ensemble.py
@@ -13,6 +13,7 @@
from .test_ensembleset import symlink_iter
from fmu.ensemble import ScratchEnsemble, ScratchRealization
+from fmu.ensemble.common import use_concurrent
try:
@@ -325,8 +326,13 @@ def test_noautodiscovery():
)
# Default ensemble construction will include auto-discovery, check
# that we got that:
- assert not reekensemble.get_smry(column_keys="FOPT").empty
+ assert not reekensemble.load_smry(column_keys="FOPT").empty
assert "UNSMRY" in reekensemble.files["FILETYPE"].values
+ # (beware that get_smry() behaves differently depending
+ # on whether it is run concurrently or not, sequential
+ # running of get_smry will lead to UNSMRY being discovered,
+ # while in concurrent mode the realization object where it
+ # is discovered is thrown away)
# Now try again, with no autodiscovery
reekensemble = ScratchEnsemble(
@@ -810,6 +816,14 @@ def test_ertrunpathfile():
# because ECLBASE is given in the runpathfile
assert sum(["UNSMRY" in x for x in ens.files["BASENAME"].unique()]) == 5
+ # Run once more to test runpathfilter:
+ ens = ScratchEnsemble(
+ "filtensfromrunpath",
+ runpathfile=testdir + "/data/ert-runpath-file",
+ runpathfilter="realization-3",
+ )
+ assert len(ens) == 1
+ assert ens[3].index == 3
os.chdir(cwd)
@@ -829,7 +843,10 @@ def test_nonexisting():
def test_eclsumcaching():
- """Test caching of eclsum"""
+ """Test caching of eclsum, but only if we don't use concurrency"""
+
+ if use_concurrent():
+ pytest.skip("Not testing caching when we use concurrency")
if "__file__" in globals():
# Easen up copying test code into interactive sessions
@@ -928,6 +945,26 @@ def test_read_eclgrid():
assert len(grid_df["i"]) == 35840
+def fipnum2zone():
+ """Helper function for injecting mocked frame into
+ each realization
+
+ This function must be global to the module for
+ concurrent.futures to able to pickle it.
+ """
+ return pd.DataFrame(
+ columns=["FIPNUM", "ZONE"],
+ data=[
+ [1, "UpperReek"],
+ [2, "MidReek"],
+ [3, "LowerReek"],
+ [4, "UpperReek"],
+ [5, "MidReek"],
+ [6, "LowerReek"],
+ ],
+ )
+
+
def test_get_df():
"""Test the data retrieval functionality
@@ -970,21 +1007,6 @@ def test_get_df():
# Inject a mocked dataframe to the realization, there is
# no "add_data" API for ensembles, but we can use the apply()
# functionality
- def fipnum2zone():
- """Helper function for injecting mocked frame into
- each realization"""
- return pd.DataFrame(
- columns=["FIPNUM", "ZONE"],
- data=[
- [1, "UpperReek"],
- [2, "MidReek"],
- [3, "LowerReek"],
- [4, "UpperReek"],
- [5, "MidReek"],
- [6, "LowerReek"],
- ],
- )
-
ens.apply(fipnum2zone, localpath="fipnum2zone")
volframe = ens.get_df("simulator_volume_fipnum", merge="fipnum2zone")
@@ -1001,6 +1023,22 @@ def fipnum2zone():
assert "npv.txt" in vol_npv
+# Function to be given to apply() as picklable callback:
+def ex_func1():
+ """Example function that will return a constant dataframe"""
+ return pd.DataFrame(index=["1", "2"], columns=["foo", "bar"], data=[[1, 2], [3, 4]])
+
+
+# Function to be given to apply() as picklable callback
+def rms_vol2df(kwargs):
+ """Example function for bridging with fmu.tools to parse volumetrics"""
+ fullpath = os.path.join(kwargs["realization"].runpath(), kwargs["filename"])
+ # The supplied callback should not fail too easy.
+ if os.path.exists(fullpath):
+ return volumetrics.rmsvolumetrics_txt2df(fullpath)
+ return pd.DataFrame()
+
+
def test_apply(tmpdir):
"""
Test the callback functionality
@@ -1017,12 +1055,6 @@ def test_apply(tmpdir):
ens = ScratchEnsemble("reektest", "realization-*/iter-0")
- def ex_func1():
- """Example function that will return a constant dataframe"""
- return pd.DataFrame(
- index=["1", "2"], columns=["foo", "bar"], data=[[1, 2], [3, 4]]
- )
-
result = ens.apply(ex_func1)
assert isinstance(result, pd.DataFrame)
assert "REAL" in result.columns
diff --git a/tests/test_ensembleset.py b/tests/test_ensembleset.py
index b002ffe3..658219f3 100644
--- a/tests/test_ensembleset.py
+++ b/tests/test_ensembleset.py
@@ -37,6 +37,15 @@ def symlink_iter(origensdir, newitername):
)
+def rms_vol2df(kwargs):
+ """Callback function to be sent to ensemble objects"""
+ fullpath = os.path.join(kwargs["realization"].runpath(), kwargs["filename"])
+ # The supplied callback should not fail too easy.
+ if os.path.exists(fullpath):
+ return volumetrics.rmsvolumetrics_txt2df(fullpath)
+ return pd.DataFrame()
+
+
def test_ensembleset_reek001(tmpdir):
"""Test import of a stripped 5 realization ensemble,
manually doubled to two identical ensembles
@@ -202,15 +211,7 @@ def test_ensembleset_reek001(tmpdir):
assert len(ensset3.keys()) == predel_len - 1
# Test callback functionality, that we can convert rms
- # volumetrics in each realization. First we need a
- # wrapper which is able to work on ScratchRealizations.
- def rms_vol2df(kwargs):
- """Callback function to be sent to ensemble objects"""
- fullpath = os.path.join(kwargs["realization"].runpath(), kwargs["filename"])
- # The supplied callback should not fail too easy.
- if os.path.exists(fullpath):
- return volumetrics.rmsvolumetrics_txt2df(fullpath)
- return pd.DataFrame()
+ # volumetrics in each realization.
if not SKIP_FMU_TOOLS:
rmsvols_df = ensset3.apply(
diff --git a/tests/test_webviz_subsurface_testdata.py b/tests/test_webviz_subsurface_testdata.py
new file mode 100644
index 00000000..4e4c40d6
--- /dev/null
+++ b/tests/test_webviz_subsurface_testdata.py
@@ -0,0 +1,115 @@
+"""Testing loading the webviz subsurface testdata."""
+
+import os
+import time
+
+import pytest
+
+from fmu.ensemble import ScratchEnsemble
+from fmu.ensemble.common import use_concurrent, set_concurrent
+
+
+def check_testdata():
+ """Check if we have webviz subsurface testdata, skip if not"""
+ testdir = os.path.dirname(os.path.abspath(__file__))
+ if not os.path.exists(os.path.join(testdir, "data/webviz-subsurface-testdata")):
+ print("Skipping loading webviz-subsurface-testdata")
+ print("Do")
+ print(" $ cd tests/data")
+ print(
+ " $ git clone --depth 1 "
+ "https://github.com/equinor/webviz-subsurface-testdata"
+ )
+ print("to download and use with pytest")
+ pytest.skip()
+
+
+def test_webviz_subsurface_testdata():
+ """Check that we can load the webviz subsurface testdata"""
+
+ check_testdata()
+ set_concurrent(True)
+
+ if "__file__" in globals():
+ # Easen up copying test code into interactive sessions
+ testdir = os.path.dirname(os.path.abspath(__file__))
+ else:
+ testdir = os.path.abspath(".")
+
+ ensdir = os.path.join(testdir, "data/webviz-subsurface-testdata/reek_fullmatrix/")
+ ens = ScratchEnsemble("reek_fullmatrix", ensdir + "realization-*/iter-0")
+
+ smry_monthly = ens.load_smry()
+ assert "REAL" in smry_monthly
+ assert len(smry_monthly["REAL"].unique()) == 40
+
+ ens.load_csv("share/results/tables/relperm.csv")
+ ens.load_csv("share/results/tables/equil.csv")
+ ens.load_csv("share/results/tables/rft.csv")
+ ens.load_csv("share/results/tables/pvt.csv")
+
+ ens.load_csv("share/results/volumes/simulator_volume_fipnum.csv")
+ ens.load_csv("share/results/volumes/geogrid--oil.csv")
+ ens.load_csv("share/results/volumes/simgrid--oil.csv")
+
+ assert len(ens.keys()) == 11
+
+
+def _do_load_webviz_subsurface_testdata_batch():
+ """Example workload for testing concurrency efficiency"""
+ testdir = os.path.dirname(os.path.abspath(__file__))
+ ensdir = os.path.join(testdir, "data/webviz-subsurface-testdata/reek_fullmatrix/")
+ batch_cmds = [
+ {"load_smry": {"column_keys": "*", "time_index": "yearly"}},
+ {"load_smry": {"column_keys": "*", "time_index": "monthly"}},
+ {"load_smry": {"column_keys": "*", "time_index": "last"}},
+ {"load_smry": {"column_keys": "*", "time_index": "daily"}},
+ {"load_csv": {"localpath": "share/results/tables/relperm.csv"}},
+ {"load_csv": {"localpath": "share/results/tables/equil.csv"}},
+ {"load_csv": {"localpath": "share/results/tables/rft.csv"}},
+ {"load_csv": {"localpath": "share/results/tables/pvt.csv"}},
+ {
+ "load_csv": {
+ "localpath": "share/results/volumes/simulator_volume_fipnum.csv"
+ }
+ },
+ {"load_csv": {"localpath": "share/results/volumes/geogrid--oil.csv"}},
+ {"load_csv": {"localpath": "share/results/volumes/simgrid--oil.csv"}},
+ ]
+ ens = ScratchEnsemble(
+ "reek_fullmatrix", ensdir + "realization-*/iter-0", batch=batch_cmds
+ )
+ assert len(ens.keys()) == 3 + len(batch_cmds) # 3 more than length of batch
+
+
+def test_webviz_subsurface_testdata_batch():
+ """Check that we can load the webviz subsurface testdata in batch
+
+ Also display timings, this should hopefully reveal that concurrent operations
+ are actually faster.
+ """
+
+ check_testdata()
+ start_time = time.time()
+ _do_load_webviz_subsurface_testdata_batch()
+ end_time = time.time()
+ elapsed = end_time - start_time
+ print("FMU_CONCURRENCY: {}".format(use_concurrent()))
+ print("Elapsed time for batch ensemble initialization: {}".format(elapsed))
+
+
+def test_webviz_subsurface_testdata_sequential_batch():
+ """Check that we can load the webviz subsurface testdata in batch
+
+ Also display timings, this should hopefully reveal that concurrent operations
+ are actually faster.
+ """
+
+ check_testdata()
+ set_concurrent(False)
+ start_time = time.time()
+ _do_load_webviz_subsurface_testdata_batch()
+ end_time = time.time()
+ elapsed = end_time - start_time
+ print("FMU_CONCURRENCY: {}".format(use_concurrent()))
+ print("Elapsed time for batch ensemble initialization: {}".format(elapsed))