-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrency implementation using batch processor #106
Conversation
2221324
to
8e8b029
Compare
Code runs through test. Using concurrency on the test workloads leads to slow-down, the penalty of sending dataframes back and forth between Python processes outweighs the benefit from concurrency. |
Self-assessment, concurrent code is missing in There is no concurrent code in EnsembleSet. Paralellizing this has very little potential, assuming the there are always more realizations in an ensemble, than ensembles in an ensembleset. |
Flownet might be an exception to that assumption @anders-kiaer ? |
I actually think @wouterjdb did some test on concurrency earlier, using |
This holds for FlowNet just as well. |
7a484fc
to
c2da73d
Compare
This is now potentially done, |
76ccda8
to
1759358
Compare
4f7cfbb
to
56934c4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good 👍 Left some comments
|
4555268
to
6ef5556
Compare
loaded_reals = [ | ||
executor.submit( | ||
ScratchRealization, | ||
realdir, | ||
realidxregexp=realidxregexp, | ||
autodiscovery=autodiscovery, | ||
batch=batch, | ||
).result() | ||
for realdir in globbedpaths | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really concurrent, as the .result()
waits for the subprocess to finish for each iteration. Should rather do something like below (which is how it is done in most of the rest of this PR, so this is probably just something you forgot to fix).
loaded_reals = [ | |
executor.submit( | |
ScratchRealization, | |
realdir, | |
realidxregexp=realidxregexp, | |
autodiscovery=autodiscovery, | |
batch=batch, | |
).result() | |
for realdir in globbedpaths | |
] | |
reals_futures = [ | |
executor.submit( | |
ScratchRealization, | |
realdir, | |
realidxregexp=realidxregexp, | |
autodiscovery=autodiscovery, | |
batch=batch, | |
) | |
for realdir in globbedpaths | |
] | |
loaded_reals = [x.result() for x in reals_futures] |
src/fmu/ensemble/ensemble.py
Outdated
with ProcessPoolExecutor() as executor: | ||
loaded_reals = [ | ||
executor.submit( | ||
ScratchRealization, | ||
row.runpath, | ||
index=int(row.index), | ||
autodiscovery=False, | ||
find_files=[row.eclbase + ".DATA", row.eclbase + ".UNSMRY",], | ||
batch=batch, | ||
).result() | ||
for row in runpath_df.itertuples() | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
with ProcessPoolExecutor() as executor: | |
loaded_reals = [ | |
executor.submit( | |
ScratchRealization, | |
row.runpath, | |
index=int(row.index), | |
autodiscovery=False, | |
find_files=[row.eclbase + ".DATA", row.eclbase + ".UNSMRY",], | |
batch=batch, | |
).result() | |
for row in runpath_df.itertuples() | |
] | |
with ProcessPoolExecutor() as executor: | |
reals_futures = [ | |
executor.submit( | |
ScratchRealization, | |
row.runpath, | |
index=int(row.index), | |
autodiscovery=False, | |
find_files=[row.eclbase + ".DATA", row.eclbase + ".UNSMRY",], | |
batch=batch, | |
).result() | |
for row in runpath_df.itertuples() | |
] | |
loaded_reals = [x.result() for x in reals_futures] |
if use_concurrent(): | ||
# In concurrent mode, caching is not used as | ||
# we do not pickle the loaded EclSum objects | ||
cache = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is significant. Each call to ScratchRealization.get_eclsum()
is quite heavy, and many methods are written with several calls to get_eclsum()
, such that the loss of caching might actually increase runtime even with concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also tested setting lazy_loading=True
in the EclSum
in get_eclsum
to reduce cost of each get_eclsum()
call, but that was not a game changer (in some cases slower).
As I see it, all methods that currently run get_eclsum()
somewhere in their dependencies are not safe to merge as it is now, without the risk of performance decrease for some users.
Can one of the admins verify this patch? |
Rebased on top of #182 |
* Batch processing after init on ensembles * Functionality for turning off concurrency * Concurrent apply() * Parallelize add_from_runpathfile * Allow running find_files at init of realizations * Parallelize get_smry()
str(env_var) == "0" | ||
or str(env_var).lower() == "false" | ||
or str(env_var).lower() == "no" | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this looks better and reduces the usage of str(..)
and lower()
?
env_var = str(os.environ[ENV_NAME]).lower()
if( env_var == "0" or env_var == "false" or env_var == "no"):
..
..
Superseded by #206 |
Very much work-in-progress.