Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Concurrent loading of ensemble #77

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f7abe42
Concurrent loading of ensemble
wouterjdb Dec 9, 2019
06910ed
Removed some spaces
wouterjdb Dec 10, 2019
37cbc42
Remove double code, changed camel case
wouterjdb Dec 10, 2019
eea4314
black
wouterjdb Dec 10, 2019
d16f525
Leaner writing of functional calls
wouterjdb Dec 10, 2019
24492f9
Small change in documentation
wouterjdb Dec 10, 2019
ac061e8
Added concurrent to load_file()
wouterjdb Dec 10, 2019
983e927
Fix static realidxregexp and autodiscovery
wouterjdb Dec 10, 2019
5010057
Added index, minor documentation change
wouterjdb Dec 10, 2019
f2565fb
Limit concurrent.futures import to ProcessPollExecutor
wouterjdb Dec 10, 2019
184cea7
Minor documentation changes
wouterjdb Dec 10, 2019
2b49f8b
Fixed missing positional argument names
wouterjdb Dec 10, 2019
58d55cf
Changed argument creation for concurrent
wouterjdb Dec 10, 2019
08fac79
Fix for concurrent not updating self
wouterjdb Dec 10, 2019
3904d21
Concurrent to load_smry
wouterjdb Dec 10, 2019
28e9870
Alternative way to test Python version
wouterjdb Dec 11, 2019
345ce26
removing caching for parallel
wouterjdb Dec 11, 2019
7ac23bb
Removed unused code, black
wouterjdb Dec 11, 2019
57a3ccb
Minor code fixes
wouterjdb Dec 11, 2019
cc8df36
Fix Python2.7 cach removal
wouterjdb Dec 11, 2019
32e9ec8
Added pylint disable W0212
wouterjdb Dec 11, 2019
8e76bfd
rm wrapper for ScratchRealization.__init__
berland Dec 11, 2019
e0ac3e7
Minor code fixes + black
wouterjdb Dec 12, 2019
5909079
Changed i to idx because of pylint complaining
wouterjdb Dec 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 195 additions & 45 deletions src/fmu/ensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,19 @@
from __future__ import division
from __future__ import print_function

import sys
import re
import os
import glob
from datetime import datetime

if sys.version_info >= (3, 2):
from concurrent.futures import ProcessPoolExecutor

USE_CONCURRENT = True
else:
USE_CONCURRENT = False

import six
import pandas as pd
import numpy as np
Expand Down Expand Up @@ -205,6 +213,34 @@ def _shortcut2path(keys, shortpath):
# calling function handle further errors.
return shortpath

def _check_loading_of_realization(self, realization, realdir, realidxregexp):
"""Helper function for checking and logging the results of loading a
realization. Successfully loaded realizations will be added to
self._realizations.

Args:
realization (ScratchRealization): Initialized ScratchRealization
realdir (str): directory to realization
realidxregexp (str): Regular expression or None

Returns:
int: either 0 on fail, or 1 on success
"""
return_value = 0
if realization.index is None:
logger.critical(
"Could not determine realization index " + "for path " + realdir
)
if not realidxregexp:
logger.critical("Maybe you need to supply a regexp.")
else:
logger.critical("Your regular expression is maybe wrong.")
else:
self._realizations[realization.index] = realization
return_value = 1

return return_value

def add_realizations(self, paths, realidxregexp=None, autodiscovery=True):
"""Utility function to add realizations to the ensemble.

Expand All @@ -215,9 +251,13 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True):
This function passes on initialization to ScratchRealization
and stores a reference to those generated objects.

This function will use multithreading when running in Python 3.2+,
via the concurrent.futures modules.

Args:
paths (list/str): String or list of strings with wildcards
to file system. Absolute or relative paths.
realidxregexp (str): Regular expression or None
autodiscovery (boolean): whether files can be attempted
auto-discovered

Expand All @@ -234,21 +274,31 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True):
globbedpaths = glob.glob(paths)

count = 0
for realdir in globbedpaths:
realization = ScratchRealization(
realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery
)
if realization.index is None:
logger.critical(
"Could not determine realization index " + "for path " + realdir
if USE_CONCURRENT:
with ProcessPoolExecutor() as executor:
realfutures = [
executor.submit(
ScratchRealization,
realdir,
realidxregexp=realidxregexp,
autodiscovery=autodiscovery,
)
for realdir in globbedpaths
]

for idx, realfuture in enumerate(realfutures):
count += self._check_loading_of_realization(
realfuture.result(), globbedpaths[idx], realidxregexp
)
if not realidxregexp:
logger.critical("Maybe you need to supply a regexp.")
else:
logger.critical("Your regular expression is maybe wrong.")
else:
count += 1
self._realizations[realization.index] = realization
else:
for realdir in globbedpaths:
realization = ScratchRealization(
realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery
)
count += self._check_loading_of_realization(
realization, realdir, realidxregexp
)

logger.info("add_realizations() found %d realizations", len(self._realizations))
return count

Expand Down Expand Up @@ -469,10 +519,49 @@ def load_csv(self, localpath, convert_numeric=True, force_reread=False):
"""
return self.load_file(localpath, "csv", convert_numeric, force_reread)

@staticmethod
def _load_file(
realization, localpath, fformat, convert_numeric, force_reread, index
):
"""Wrapper function to be used for parallel loading of files

Args:
realization: Single realization
localpath (str): path to the text file, relative to each realization
fformat (str): string identifying the file format. Supports 'txt'
and 'csv'.
convert_numeric (boolean): If set to True, numerical columns
will be searched for and have their dtype set
to integers or floats. If scalars, only numerical
data will be loaded.
force_reread (boolean): Force reread from file system. If
False, repeated calls to this function will
returned cached results.
index (int): realization index

Returns:
realization with loaded file.

"""
try:
realization.load_file(localpath, fformat, convert_numeric, force_reread)
except ValueError:
# This would at least occur for unsupported fileformat,
# and that we should not skip.
logger.critical("load_file() failed in realization %d", index)
raise ValueError
except IOError:
# At ensemble level, we allow files to be missing in
# some realizations
logger.warning("Could not read %s for realization %d", localpath, index)

return realization

def load_file(self, localpath, fformat, convert_numeric=False, force_reread=False):
"""Function for calling load_file() in every realization

This function may utilize multithreading.
This function will use multithreading when running in Python 3.2+,
via the concurrent.futures modules.

Args:
localpath (str): path to the text file, relative to each realization
Expand All @@ -487,22 +576,39 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals
returned cached results.
Returns:
pd.Dataframe: with loaded data aggregated. Column 'REAL'
distuinguishes each realizations data.
distinguishes each realizations data.

"""
for index, realization in self._realizations.items():
try:
realization.load_file(localpath, fformat, convert_numeric, force_reread)
except ValueError:
# This would at least occur for unsupported fileformat,
# and that we should not skip.
logger.critical("load_file() failed in realization %d", index)
raise ValueError
except IOError:
# At ensemble level, we allow files to be missing in
# some realizations
logger.warning("Could not read %s for realization %d", localpath, index)
if USE_CONCURRENT:
args = []
for index, realization in self._realizations.items():
args.append(
(
realization,
localpath,
fformat,
convert_numeric,
force_reread,
index,
)
)
with ProcessPoolExecutor() as executor:
for realization in executor.map(self._load_file, *zip(*args)):
self._realizations[realization.index] = realization
else:
for index, realization in self._realizations.items():
self._load_file(
realization,
localpath,
fformat,
convert_numeric,
force_reread,
index,
)

if self.get_df(localpath).empty:
raise ValueError("No ensemble data found for %s", localpath)

return self.get_df(localpath)

def find_files(self, paths, metadata=None, metayaml=False):
Expand Down Expand Up @@ -618,6 +724,7 @@ def get_df(self, localpath):
# No logging here, those error messages
# should have appeared at construction using load_*()
pass

if dflist:
# Merge a dictionary of dataframes. The dict key is
# the realization index, and end up in a MultiIndex
Expand All @@ -628,6 +735,27 @@ def get_df(self, localpath):
else:
raise ValueError("No data found for " + localpath)

@staticmethod
def _load_smry(
realidx,
realization,
time_index,
column_keys,
start_date,
end_date,
include_restart,
):
logger.info("Loading smry from realization %s", realidx)
realization.load_smry(
time_index=time_index,
column_keys=column_keys,
cache_eclsum=False,
start_date=start_date,
end_date=end_date,
include_restart=include_restart,
)
return realization

def load_smry(
self,
time_index="raw",
Expand Down Expand Up @@ -697,21 +825,39 @@ def load_smry(
"""
if not stacked:
raise NotImplementedError
# Future: Multithread this!
for realidx, realization in self._realizations.items():
# We do not store the returned DataFrames here,
# instead we look them up afterwards using get_df()
# Downside is that we have to compute the name of the
# cached object as it is not returned.
logger.info("Loading smry from realization %s", realidx)
realization.load_smry(
time_index=time_index,
column_keys=column_keys,
cache_eclsum=cache_eclsum,
start_date=start_date,
end_date=end_date,
include_restart=include_restart,
)

if USE_CONCURRENT:
args = []
for realidx, realization in self._realizations.items():
args.append(
(
realidx,
realization,
time_index,
column_keys,
start_date,
end_date,
include_restart,
)
)
with ProcessPoolExecutor() as executor:
for realization in executor.map(self._load_smry, *zip(*args)):
self._realizations[realization.index] = realization
else:
for realidx, realization in self._realizations.items():
# We do not store the returned DataFrames here,
# instead we look them up afterwards using get_df()
# Downside is that we have to compute the name of the
# cached object as it is not returned.
logger.info("Loading smry from realization %s", realidx)
realization.load_smry(
time_index=time_index,
column_keys=column_keys,
cache_eclsum=cache_eclsum,
start_date=start_date,
end_date=end_date,
include_restart=include_restart,
)
if isinstance(time_index, list):
time_index = "custom"
return self.get_df("share/results/tables/unsmry--" + time_index + ".csv")
Expand Down Expand Up @@ -907,6 +1053,8 @@ def get_smry_dates(
Returns:
list of datetimes. Empty list if no data found.
"""
if USE_CONCURRENT:
cache_eclsum = False

# Build list of list of eclsum dates
eclsumsdates = []
Expand Down Expand Up @@ -1011,6 +1159,7 @@ def _get_smry_dates(eclsumsdates, freq, normalize, start_date, end_date):
datetimes = [start_date] + datetimes
if end_date and end_date not in datetimes:
datetimes = datetimes + [end_date]

return datetimes

def get_smry_stats(
Expand Down Expand Up @@ -1095,7 +1244,7 @@ def get_smry_stats(

return pd.concat(dframes, names=["STATISTIC"], sort=False)

def get_wellnames(self, well_match=None):
def get_wellnames(self, well_match=None, cache=True):
"""
Return a union of all Eclipse Summary well names
in all realizations (union). In addition, can return a list
Expand All @@ -1104,6 +1253,7 @@ def get_wellnames(self, well_match=None):
Args:
well_match: `Optional`. String (or list of strings)
with wildcard filter. If None, all wells are returned
cache (bool): `Optional`. Bool to set caching or not.
Returns:
list of strings with eclipse well names. Empty list if no
summary file or no matched well names.
Expand All @@ -1113,7 +1263,7 @@ def get_wellnames(self, well_match=None):
well_match = [well_match]
result = set()
for _, realization in self._realizations.items():
eclsum = realization.get_eclsum()
eclsum = realization.get_eclsum(cache=cache)
if eclsum:
if well_match is None:
result = result.union(set(eclsum.wells()))
Expand Down
Loading